Giter Site home page Giter Site logo

watermill-redisstream's People

Contributors

dickens7 avatar hlubek avatar jibbolo avatar minghsu0107 avatar sysradium avatar wk8 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

watermill-redisstream's Issues

didn't compatible with the github.com/redis/[email protected]

get errors below:

\github.com\!three!dots!labs\[email protected]\pkg\redisstream\publisher.go:93:23: p.client.XAdd undefined (type redis.UniversalClient has no field or method XAdd)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:203:25: s.client.XGroupCreateMkStream undefined (type redis.UniversalClient has no field or method XGroupCreateMkStream)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:297:25: s.client.XReadGroup undefined (type redis.UniversalClient has no field or method XReadGroup)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:307:25: s.client.XRead undefined (type redis.UniversalClient has no field or method XRead)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:372:23: s.client.XPendingExt undefined (type redis.UniversalClient has no field or method XPendingExt)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:396:24: s.client.XClaim undefined (type redis.UniversalClient has no field or method XClaim)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:448:25: s.client.XInfoConsumers undefined (type redis.UniversalClient has no field or method XInfoConsumers)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:461:23: s.client.XGroupDelConsumer undefined (type redis.UniversalClient has no field or method XGroupDelConsumer)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:558:7: p.XAck undefined (type redis.Pipeliner has no field or method XAck)

Subscribers can pick up up to 2 tasks while busy with another task

The current implementation for subscribers essentially starts 3 different routines:

  1. one reads messages from the stream, and pushes it to a channel
  2. another one monitors pending messages, and claims them when relevant, pushing them to that same channel
  3. the 3rd one listens to that channel, and actually processes them

The only thing that prevents the first 2 from reading too fast is that writes to the channel block while messages are being processed by the 3rd routine. However, nothing prevents each of the first 2 routines from claiming task while the 3rd routine is already busy processing an earlier message.

This might not be a problem for applications where at least one of the following is true:

  1. all tasks are processed very fast
  2. It doesn't really matter if some tasks wait a little longer than others to be picked up
    However, if one has tasks that can be rather slow to be processed, what ends up happening is that workers that are already busy pick up more work even though some other workers are sitting idle.

It's not too easy to solve this problem without also somewhat affecting performance for applications that have a lot of very quick tasks, since it would require to stop claiming tasks "ahead of time" while the worker is still processing an ongoing task. I have completely re-written subscribers in my fork to better fit my use case, but not sure how to make that a patch that you'd be okay merging here. Ideas/feedback welcome.

(on the plus side, not having all the extra routines does make the new implementation quite cleaner ;) less internal communication channels, everything is controlled simply with contexts)

Claim fails for consumers with multiple pending messages

Scenario:

  • Consuming messages in a consumer group -> consumer (i.e. subscriber) shuts down or breaks without ACK
  • Multiple messages are pending for the consumer (especially if handling a message takes longer)
  • Subscriber is restarted (gets a new consumer id) -> first message is claimed after DefaultMaxIdleTime
  • There are lost messages, since the consumer is deleted via XGroupDelConsumer

This is trivial to reproduce by e.g. producing numbers from 0-9, handling them in a subscriber and shutting it down while handling a message. After restarting the subscriber (with the same consumer group), one pending message is claimed and processed (after some time), but in most runs there will be at least one lost message due to the consumer being deleted before all pending messages were claimed.

See https://gist.github.com/hlubek/1a667ec6050bea703b58ba0036d26cc9 for an example program.

Solution:

There needs to be a check if the consumer has any pending message before deleting it. But this is not free from race conditions, since the consumer could come back (if an explicit consumer id is used) and get a message before being deleted.

It would be best to not delete consumers while claiming messages, but to have a max idle duration for consumers and check regularly via XInfoConsumers if a consumer has an idle duration longer than that threshold and doesn't have pending messages (I'd say that should usually be around multiple hours). There should be a way to opt out of deleting consumers automatically for special use-cases.

Feature request: add default Maxlens to Publisher config

Hello, thank you for developing this project.

I will try to make clear the reason for this feature request and give a description of what should look like.

Context:

  • I have an application that uses watermill to publish/subscribe to Redis streams;
  • The application will subscribe on one specific key;
  • Based on the received messages it will choose to publish into different keys;
  • Before receiving a message it is not clear yet what are the possible keys to publish onto;
  • The key where the application should publish is embedded in the message coming from subscribe.

Another important information is that my Redis installation is filling up the memory pretty quick with the stream messages. That's why I need to set a default Maxlen when publishing to any stream.

Feature:

  • Add a DefaultMaxlen that will be used to publish to any stream key
type PublisherConfig struct {
	Client               redis.UniversalClient
	Marshaller       Marshaller
	Maxlens           map[string]int64
	DefaultMaxlen int64
}
  • Then instead of choosing 0 directly as argument to MAXLEN, first check what the DefaultMaxlen should be
		maxlen, ok := p.config.Maxlens[topic]
		if !ok {
			maxlen = p.config.DefaultMaxlen
		}

Let me know if that is clear. I can put a PR in place ๐Ÿ‘

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.