Giter Site home page Giter Site logo

goqite's Introduction

goqite

Logo

GoDoc Go codecov

goqite (pronounced Go-queue-ite) is a persistent message queue Go library built on SQLite and inspired by AWS SQS (but much simpler).

Made in ๐Ÿ‡ฉ๐Ÿ‡ฐ by maragu, maker of online Go courses.

Features

  • Messages are persisted in a SQLite table.
  • Messages are sent to and received from the queue, and are guaranteed to not be redelivered before a timeout occurs.
  • Support for multiple queues in one table.
  • Message timeouts can be extended, to support e.g. long-running tasks.
  • A job runner abstraction is provided on top of the queue, for your background tasks.
  • A simple HTTP handler is provided for your convenience.
  • No non-test dependencies. Bring your own SQLite driver.

Examples

Queue

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"github.com/maragudk/goqite"
)

func main() {
	// Bring your own database connection, since you probably already have it,
	// as well as some sort of schema migration system.
	// The schema is in the schema.sql file.
	// Alternatively, use the goqite.Setup function to create the schema.
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Fatalln(err)
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	if err := goqite.Setup(context.Background(), db); err != nil {
		log.Fatalln(err)
	}

	// Create a new queue named "jobs".
	// You can also customize the message redelivery timeout and maximum receive count,
	// but here, we use the defaults.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Send a message to the queue.
	// Note that the body is an arbitrary byte slice, so you can decide
	// what kind of payload you have. You can also set a message delay.
	err = q.Send(context.Background(), goqite.Message{
		Body: []byte("yo"),
	})
	if err != nil {
		log.Fatalln(err)
	}

	// Receive a message from the queue, during which time it's not available to
	// other consumers (until the message timeout has passed).
	m, err := q.Receive(context.Background())
	if err != nil {
		log.Fatalln(err)
	}

	fmt.Println(string(m.Body))

	// If you need more time for processing the message, you can extend
	// the message timeout as many times as you want.
	if err := q.Extend(context.Background(), m.ID, time.Second); err != nil {
		log.Fatalln(err)
	}

	// Make sure to delete the message, so it doesn't get redelivered.
	if err := q.Delete(context.Background(), m.ID); err != nil {
		log.Fatalln(err)
	}
}

Jobs

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"github.com/maragudk/goqite"
	"github.com/maragudk/goqite/jobs"
)

func main() {
	log := slog.Default()

	// Setup the db and goqite schema.
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Info("Error opening db", "error", err)
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	if err := goqite.Setup(context.Background(), db); err != nil {
		log.Info("Error in setup", "error", err)
	}

	// Make a new queue for the jobs. You can have as many of these as you like, just name them differently.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Make a job runner with a job limit of 1 and a short message poll interval.
	r := jobs.NewRunner(jobs.NewRunnerOpts{
		Limit:        1,
		Log:          slog.Default(),
		PollInterval: 10 * time.Millisecond,
		Queue:        q,
	})

	// Register our "print" job.
	r.Register("print", func(ctx context.Context, m []byte) error {
		fmt.Println(string(m))
		return nil
	})

	// Create a "print" job with a message.
	if err := jobs.Create(context.Background(), q, "print", []byte("Yo")); err != nil {
		log.Info("Error creating job", "error", err)
	}

	// Stop the job runner after a timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
	defer cancel()

	// Start the job runner and see the job run.
	r.Start(ctx)
}

Benchmarks

Just for fun, some benchmarks. ๐Ÿค“

On a MacBook Pro with M3 Ultra chip and SSD, sequentially sending, receiving, and deleting a message:

$ make benchmark
go test -cpu 1,2,4,8,16 -bench=.
goos: darwin
goarch: arm64
pkg: github.com/maragudk/goqite
BenchmarkQueue/send,_receive,_delete            	   21444	     54262 ns/op
BenchmarkQueue/send,_receive,_delete-2          	   17278	     68615 ns/op
BenchmarkQueue/send,_receive,_delete-4          	   16092	     73888 ns/op
BenchmarkQueue/send,_receive,_delete-8          	   15346	     78255 ns/op
BenchmarkQueue/send,_receive,_delete-16         	   15106	     79517 ns/op

Note that the slowest result above is around 12,500 messages / second with 16 parallel producers/consumers. The fastest result is around 18,500 messages / second with just one producer/consumer. (SQLite only allows one writer at a time, so the parallelism just creates write contention.)

goqite's People

Contributors

markuswustenberg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

goqite's Issues

Report id of database row inserted by Send.

Hi there. With reference to PR #41, it would be great if goqite could handle a particular use case I have.

I'm building a REST-ish API. Users submit jobs which are long-running (minutes/hours). Upon submission the API immediately responds with a '202 Accepted', and jobs are added to a goqite queue. According to received wisdom, the response sent to the user should include a 'Location' header with a URL where the user can check job progress, e.g. Location /task/123 where 123 is the ID of the job.

However, since the Send method of goqite.Queue does not return anything, I cannot reliably discern the ID of the item the user submitted. Therefore I can't build the /task/123 URL, and the user can't check progress.

I guess I could embed some identifying information into the job body, but this would need to be generated by the client whereas that's clearly a server-side responsibility. I'd also need to parse many large BLOBs in the database to look-up any item, which would be inefficient.

Consequently, I believe it would be useful if Send could return the ID of the row it inserts. This is trivial in SQL with a returning clause, supported in SQLite since version 3.35.0 (2021-03-12).

PR #41 provides this functionality. However, implementing it resulted in a cascade of changes throughout the code-base, which I appreciate isn't ideal. Happy to discuss alternatives! ๐Ÿ˜„

Don't panic when encountering an unregistered job

I don't see why the program should panic when the database contains a job for which a runner has not been registered for. This seems like an easy state to get into when updating an application; especially since there is nothing cleaning up stale jobs as far as I can tell.

Feature request: Add options for job retention

It would be nice if a given queue could be given one of three retention options, rather than automatically deleting the job once it's complete:

  • Retain forever
  • Retain until time.Duration
  • Don't retain (after completed)

While the logs do provide some insight in to what's happening, having the ability to retain logs can be great for auditing and monitoring purposes. That could also provide the ability to have a dashboard of some sort, seeing how many jobs ran, how many completed, failed, etc. You could also consider storing the execution time of each job, which can be another helpful thing to audit.

Delivery delay mechanism

I'm interested in adding a delivery delay mechanism to your library to allow messages to be scheduled for future delivery. It would prevent messages from being delivered to runners until a specific time has been reached. After this time has passed, the message would be available to any runner to be delivered. If no time is set or the time scheduled is in the past, the behavior would be the library's existing behavior.

Please let me know if you'd be interested in receiving a PR for this improvement and if you have any guidelines you'd like me to follow. If so, feel free to assign me to the issue and I will begin work on this.

This improvement would most likely require a database migration.

Suggestion: Update Create Table to be Create Table If Not Exists

Hi!

I've been using this library all morning and it is fantastic. I just wanted to pose this more as a question or usability suggestion. For most of my sqlite projects I initialize all my tables as CREATE TABLE IF NOT EXISTS and run my ddl on setup for my apps.

Currently using the library I have to comment out goqite.Setup() after the first run otherwise it fails since the table already exists. Would you be open to changing the default schema to CREATE TABLE IF NOT EXISTS or is there a preferred pattern / just catching and checking the error is for the 'table already exists' error and moving on?

Thanks!

Possible performance issue, with cause and solution included.

According to schema.sql, it currently stores message body and metadata in same table goqite.

create table goqite (
  id text primary key default ('m_' || lower(hex(randomblob(16)))),
  created text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  updated text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  queue text not null,
  body blob not null,
  timeout text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  received integer not null default 0
) strict;

We experienced (poor) performance issue with large BLOG with similar sql table, so i'd like to share our story briefly for your reference.

Our customer imported mailbox into the software which generated about 1000 records and 8GB in total (in SQLite, of course), each record is an email message about 8MB large. Our program had a bug and we had to update the SQL records manually for troubleshooting, while performing SQL command like UPDATE <table> SET received=1; which affects all rows, SQLite command line tool toke a long time (~ 8min) to finish. This is not right and we started checking SQLite doc to figure it out.

We found the root cause (improper sql table design) and solution (separate large BLOG and metadata), hope it helps.

Add Abort?

For when a message is unable to be handled for whatever reason. Kinda like NAck in other queues.

Idea: Add type-safe queues

One thing in particular that I've found to be a little annoying to deal with is the lack of type-safety. You can put any byte slice in to jobs.Create() or Send() which means potential run-time errors and no IDE support. You also have to manually deal with serialization on both the caller and receiver side for every single queue. I had the same issue with asynq.

Adding type-safety is possible. I put together a very basic POC that I believe will work. Each queue would have to be declared with the type you intend to receive and process.

Is this something you would be interested in? If not, since it's a rather large refactor, I completely understand.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.