Giter Site home page Giter Site logo

goqite's Introduction

goqite

Logo

GoDoc Go codecov

goqite (pronounced Go-queue-ite) is a persistent message queue Go library built on SQLite and inspired by AWS SQS (but much simpler).

Made in 🇩🇰 by maragu, maker of online Go courses.

Features

  • Messages are persisted in a SQLite table.
  • Messages are sent to and received from the queue, and are guaranteed to not be redelivered before a timeout occurs.
  • Support for multiple queues in one table.
  • Message timeouts can be extended, to support e.g. long-running tasks.
  • A job runner abstraction is provided on top of the queue, for your background tasks.
  • A simple HTTP handler is provided for your convenience.
  • No non-test dependencies. Bring your own SQLite driver.

Examples

Queue

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"github.com/maragudk/goqite"
)

func main() {
	// Bring your own database connection, since you probably already have it,
	// as well as some sort of schema migration system.
	// The schema is in the schema.sql file.
	// Alternatively, use the goqite.Setup function to create the schema.
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Fatalln(err)
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	if err := goqite.Setup(context.Background(), db); err != nil {
		log.Fatalln(err)
	}

	// Create a new queue named "jobs".
	// You can also customize the message redelivery timeout and maximum receive count,
	// but here, we use the defaults.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Send a message to the queue.
	// Note that the body is an arbitrary byte slice, so you can decide
	// what kind of payload you have. You can also set a message delay.
	err = q.Send(context.Background(), goqite.Message{
		Body: []byte("yo"),
	})
	if err != nil {
		log.Fatalln(err)
	}

	// Receive a message from the queue, during which time it's not available to
	// other consumers (until the message timeout has passed).
	m, err := q.Receive(context.Background())
	if err != nil {
		log.Fatalln(err)
	}

	fmt.Println(string(m.Body))

	// If you need more time for processing the message, you can extend
	// the message timeout as many times as you want.
	if err := q.Extend(context.Background(), m.ID, time.Second); err != nil {
		log.Fatalln(err)
	}

	// Make sure to delete the message, so it doesn't get redelivered.
	if err := q.Delete(context.Background(), m.ID); err != nil {
		log.Fatalln(err)
	}
}

Jobs

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"github.com/maragudk/goqite"
	"github.com/maragudk/goqite/jobs"
)

func main() {
	log := slog.Default()

	// Setup the db and goqite schema.
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Info("Error opening db", "error", err)
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	if err := goqite.Setup(context.Background(), db); err != nil {
		log.Info("Error in setup", "error", err)
	}

	// Make a new queue for the jobs. You can have as many of these as you like, just name them differently.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Make a job runner with a job limit of 1 and a short message poll interval.
	r := jobs.NewRunner(jobs.NewRunnerOpts{
		Limit:        1,
		Log:          slog.Default(),
		PollInterval: 10 * time.Millisecond,
		Queue:        q,
	})

	// Register our "print" job.
	r.Register("print", func(ctx context.Context, m []byte) error {
		fmt.Println(string(m))
		return nil
	})

	// Create a "print" job with a message.
	if err := jobs.Create(context.Background(), q, "print", []byte("Yo")); err != nil {
		log.Info("Error creating job", "error", err)
	}

	// Stop the job runner after a timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
	defer cancel()

	// Start the job runner and see the job run.
	r.Start(ctx)
}

Benchmarks

Just for fun, some benchmarks. 🤓

On a MacBook Pro with M3 Ultra chip and SSD, sequentially sending, receiving, and deleting a message:

$ make benchmark
go test -cpu 1,2,4,8,16 -bench=.
goos: darwin
goarch: arm64
pkg: github.com/maragudk/goqite
BenchmarkQueue/send,_receive,_delete            	   21444	     54262 ns/op
BenchmarkQueue/send,_receive,_delete-2          	   17278	     68615 ns/op
BenchmarkQueue/send,_receive,_delete-4          	   16092	     73888 ns/op
BenchmarkQueue/send,_receive,_delete-8          	   15346	     78255 ns/op
BenchmarkQueue/send,_receive,_delete-16         	   15106	     79517 ns/op

Note that the slowest result above is around 12,500 messages / second with 16 parallel producers/consumers. The fastest result is around 18,500 messages / second with just one producer/consumer. (SQLite only allows one writer at a time, so the parallelism just creates write contention.)

goqite's People

Contributors

markuswustenberg avatar

Stargazers

Muash avatar Ibrahim Mohammed avatar LiuVaayne avatar Jonathan Hult avatar Mike Waters avatar Luke Kalbfleisch avatar  avatar Ilmari Vacklin avatar Daniel Lee avatar Arc avatar  avatar Tony Thomas avatar Marcis Bergmanis avatar Amjad Abujamous avatar  avatar kimihito avatar Jevgeni Sillar avatar Sean Prouty avatar Alex Goodman avatar Ton Luong avatar Atakan Özceviz avatar Juan Alvarez avatar Jacob Lerche avatar Alex Stockwell avatar Can Eldem avatar Olivier Melcher avatar Ümit Cebi avatar Diogo Pinto avatar Diego Armando Gutiérrez Ayala avatar Market Data avatar Chris Haen avatar  avatar Zander Hill avatar Yavonix avatar Berk Beceren avatar Antonio Alvarado Hernández avatar  avatar  avatar  avatar Mike Stefanello avatar ik5 avatar Thomas avatar André Santos avatar Jenia avatar Serge Reinov avatar Miguel Rodríguez avatar cwrlos avatar Claudinei C Junior avatar  avatar Misha Abramovich avatar Lemuel Okoli avatar acaor avatar Maksim avatar Denis Chernyshov avatar Amr M. Elsayed avatar Bahman Shams avatar Victor Ray avatar Justin Burnham avatar Calvin McLean avatar  avatar  avatar Erfan Sahebi avatar  avatar Abolfazl Nasr avatar Dowon avatar Quang Le avatar  avatar Marcos Viana avatar Sean Knowles avatar Marek Markiewka avatar _.fmendes._ avatar Neco avatar Khoa Nguyen avatar YS Liu avatar Manuel Durán Aguete avatar James Chong avatar Richard Jones avatar Daniel Blendea avatar Song Liu avatar Andrius Kucinskas avatar Ross McDermott avatar kokoro avatar Cody Craven avatar  avatar Evsyukov Denis avatar جمال | Jamel | ジャメル | 🍉 avatar Tomás Weigenast avatar Ahmed Alkabir avatar  avatar Vadim Shchepotev avatar  avatar  avatar Stephen Afam-Osemene avatar  avatar  avatar void avatar Ahmad Ammar Asyraaf avatar Dmitry Chuev avatar Paolo Donadeo avatar Shashank Priyadarshi avatar

Watchers

Evsyukov Denis avatar  avatar

goqite's Issues

Idea: Add type-safe queues

One thing in particular that I've found to be a little annoying to deal with is the lack of type-safety. You can put any byte slice in to jobs.Create() or Send() which means potential run-time errors and no IDE support. You also have to manually deal with serialization on both the caller and receiver side for every single queue. I had the same issue with asynq.

Adding type-safety is possible. I put together a very basic POC that I believe will work. Each queue would have to be declared with the type you intend to receive and process.

Is this something you would be interested in? If not, since it's a rather large refactor, I completely understand.

Report id of database row inserted by Send.

Hi there. With reference to PR #41, it would be great if goqite could handle a particular use case I have.

I'm building a REST-ish API. Users submit jobs which are long-running (minutes/hours). Upon submission the API immediately responds with a '202 Accepted', and jobs are added to a goqite queue. According to received wisdom, the response sent to the user should include a 'Location' header with a URL where the user can check job progress, e.g. Location /task/123 where 123 is the ID of the job.

However, since the Send method of goqite.Queue does not return anything, I cannot reliably discern the ID of the item the user submitted. Therefore I can't build the /task/123 URL, and the user can't check progress.

I guess I could embed some identifying information into the job body, but this would need to be generated by the client whereas that's clearly a server-side responsibility. I'd also need to parse many large BLOBs in the database to look-up any item, which would be inefficient.

Consequently, I believe it would be useful if Send could return the ID of the row it inserts. This is trivial in SQL with a returning clause, supported in SQLite since version 3.35.0 (2021-03-12).

PR #41 provides this functionality. However, implementing it resulted in a cascade of changes throughout the code-base, which I appreciate isn't ideal. Happy to discuss alternatives! 😄

Add Abort?

For when a message is unable to be handled for whatever reason. Kinda like NAck in other queues.

Feature request: Add options for job retention

It would be nice if a given queue could be given one of three retention options, rather than automatically deleting the job once it's complete:

  • Retain forever
  • Retain until time.Duration
  • Don't retain (after completed)

While the logs do provide some insight in to what's happening, having the ability to retain logs can be great for auditing and monitoring purposes. That could also provide the ability to have a dashboard of some sort, seeing how many jobs ran, how many completed, failed, etc. You could also consider storing the execution time of each job, which can be another helpful thing to audit.

Delivery delay mechanism

I'm interested in adding a delivery delay mechanism to your library to allow messages to be scheduled for future delivery. It would prevent messages from being delivered to runners until a specific time has been reached. After this time has passed, the message would be available to any runner to be delivered. If no time is set or the time scheduled is in the past, the behavior would be the library's existing behavior.

Please let me know if you'd be interested in receiving a PR for this improvement and if you have any guidelines you'd like me to follow. If so, feel free to assign me to the issue and I will begin work on this.

This improvement would most likely require a database migration.

Suggestion: Update Create Table to be Create Table If Not Exists

Hi!

I've been using this library all morning and it is fantastic. I just wanted to pose this more as a question or usability suggestion. For most of my sqlite projects I initialize all my tables as CREATE TABLE IF NOT EXISTS and run my ddl on setup for my apps.

Currently using the library I have to comment out goqite.Setup() after the first run otherwise it fails since the table already exists. Would you be open to changing the default schema to CREATE TABLE IF NOT EXISTS or is there a preferred pattern / just catching and checking the error is for the 'table already exists' error and moving on?

Thanks!

Don't panic when encountering an unregistered job

I don't see why the program should panic when the database contains a job for which a runner has not been registered for. This seems like an easy state to get into when updating an application; especially since there is nothing cleaning up stale jobs as far as I can tell.

Possible performance issue, with cause and solution included.

According to schema.sql, it currently stores message body and metadata in same table goqite.

create table goqite (
  id text primary key default ('m_' || lower(hex(randomblob(16)))),
  created text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  updated text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  queue text not null,
  body blob not null,
  timeout text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  received integer not null default 0
) strict;

We experienced (poor) performance issue with large BLOG with similar sql table, so i'd like to share our story briefly for your reference.

Our customer imported mailbox into the software which generated about 1000 records and 8GB in total (in SQLite, of course), each record is an email message about 8MB large. Our program had a bug and we had to update the SQL records manually for troubleshooting, while performing SQL command like UPDATE <table> SET received=1; which affects all rows, SQLite command line tool toke a long time (~ 8min) to finish. This is not right and we started checking SQLite doc to figure it out.

We found the root cause (improper sql table design) and solution (separate large BLOG and metadata), hope it helps.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.