Giter Site home page Giter Site logo

robinjoseph08 / redisqueue Goto Github PK

View Code? Open in Web Editor NEW
122.0 3.0 54.0 33 KB

redisqueue provides a producer and consumer of a queue that uses Redis streams

Home Page: https://godoc.org/github.com/robinjoseph08/redisqueue

License: MIT License

Makefile 4.25% Go 94.57% Shell 1.18%
go golang redis consumer producer producer-consumer streams

redisqueue's Introduction

redisqueue

Version GoDoc Build Status Coverage Status Go Report Card License

redisqueue provides a producer and consumer of a queue that uses Redis streams.

Features

  • A Producer struct to make enqueuing messages easy.
  • A Consumer struct to make processing messages concurrenly.
  • Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery.
  • A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer.
  • A max length on the stream so that it doesn't store the messages indefinitely and run out of memory.
  • Graceful handling of Unix signals (SIGINT and SIGTERM) to let in-flight messages complete.
  • A channel that will surface any errors so you can handle them centrally.
  • Graceful handling of panics to avoid crashing the whole process.
  • A concurrency setting to control how many goroutines are spawned to process messages.
  • A batch size setting to limit the total messages in flight.
  • Support for multiple streams.

Installation

redisqueue requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing redisqueue:

go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2

Import:

import "github.com/robinjoseph08/redisqueue/v2"

Example

Here's an example of a producer that inserts 1000 messages into a queue:

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,
		ApproximateMaxLength: true,
	})
	if err != nil {
		panic(err)
	}

	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",
			Values: map[string]interface{}{
				"index": i,
			},
		})
		if err != nil {
			panic(err)
		}

		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

And here's an example of a consumer that reads the messages off of that queue:

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,
		BlockingTimeout:   5 * time.Second,
		ReclaimInterval:   1 * time.Second,
		BufferSize:        100,
		Concurrency:       10,
	})
	if err != nil {
		panic(err)
	}

	c.Register("redisqueue:test", process)

	go func() {
		for err := range c.Errors {
			// handle errors accordingly
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()
	fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}

redisqueue's People

Contributors

nyergler avatar robinjoseph08 avatar theckman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

redisqueue's Issues

When multiple listeners are registered, a xreadgroup splicing error occurs.

// .....

queue.Register(global.OrderEvent, event.OrderHandle)
queue.Register(global.ImMessageEvent, event.ImMessageHandle)
queue.Register(global.PlayListEvent, event.PlaylistHandle)
go func() {
for err := range queue.Errors {
log.Println("Error occurred:", err)
}
}()
go queue.Run()

...................

Code as above, sometimes there will be errors

error reading redis stream: ERR Invalid stream ID specified as stream command argument

Go to the redis client to listen and you can see: xreadgroup group redisqueue DESKTOP-TMASQJ5 count 100 block 5000 streams im_message_event_queue order_event_queue playlist_event_queue order_event_queue > > > im_message_event_queue playlist_event_queue > > >

The correct one should be: xreadgroup group redisqueue DESKTOP-TMASQJ5 count 200 block 5000 streams order_event_queue im_message_event_queue playlist_event_queue > > >

Add ability to set up stream messages TTY with XTRIM MINID

This issue is some kind of a feature request. Redisqueue has a great ability to limit a stream size by evaluating StreamMaxLength field of the ProducerOptions:

// StreamMaxLength sets the MAXLEN option when calling XADD. This creates a
// capped stream to prevent the stream from taking up memory indefinitely.
// It's important to note though that this isn't the maximum number of
// completed messages, but the maximum number of total messages. This
// means that if all consumers are down, but producers are still enqueuing,
// and the maximum is reached, unprocessed message will start to be dropped.
// So ideally, you'll set this number to be as high as you can makee it.
// More info here: https://redis.io/commands/xadd#capped-streams.
StreamMaxLength int64

It works great but does not give proper flexibility in the problem of limiting streams by time which leads to hard stream size limits which could lead to huge stream size if it has big messages and big size in order to offer a huge capacity for lots of consumers or load.

Redis Streams message can not have TTY with expire command https://redis.io/commands/expire but there is a tread with another feature request for the Redis. redis/redis#4450 (comment) So we could emulate a TTY for a Redis Streams messages by this logic:

  • we have a method in this library that get a stream name and time string (ex. "-7 days" to store only messages for the last 7 days in the stream) that define the TTY point for stream messages and callback for error
  • this function has a Redis client inside
  • it makes a call to Redis with XINFO GROUPS command which receive the last delivered ids for each consumer group
  • we compare values from a previous step with TTY time object and check if the stream has undelivered messages
  • if all the messages were delivered to consumers we could run XTRIM MINID command to remove old messages (it will work only for Redis 6.2 https://redis.io/commands/xtrim#history)
  • if the stream has consumers which had not received messages that tend to be deleted, we can run a special error callback function with empty interfaces inside which gives the ability to developer to handle somehow this situation (logging, alerting, etc...)

Soon, we will need to develop such functionality to our services but I think that it would be great to not reinvent the wheel but have this code inside the Redisqueue.

Support creating groups from end of stream, not just beginning

Because the message ID we're specifying when creating the group is always 0, consumers will need to process all messages in the stream before getting to the latest message. For a system I'm building, I'm looking to have the semantics be more lossy when a new consumer comes to life.

Would it be possible to extend the API to include a way to specify whether we want all messages, or only new messages.

Here is the relevant line of code:

err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, "0").Err()

Add ability to inject *redis.Client instead of *redis.Options

I'd like to continually health check the Redis client by doing a simple SET / GET transaction every n seconds, and if it fails trigger a process restart.

Unfortunately, I have no way to use the same Redis pool as the publisher/consumer which puts extra load on the Redis cluster and doesn't fully test the ability to use Redis.

I was wondering if there'd be a strong objection to adding a *redis.Client to both ConsumerOptions and ProducerOptions. I'm thinking of retaining the *redis.Options for now, and to only use that if *redis.Client is nil.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.