threedotslabs / watermill-redisstream Goto Github PK
View Code? Open in Web Editor NEWRedis Pub/Sub for the Watermill project, leveraging Redis Stream.
License: MIT License
Redis Pub/Sub for the Watermill project, leveraging Redis Stream.
License: MIT License
Hello, thank you for developing this project.
I will try to make clear the reason for this feature request and give a description of what should look like.
Another important information is that my Redis installation is filling up the memory pretty quick with the stream messages. That's why I need to set a default Maxlen
when publishing to any stream.
DefaultMaxlen
that will be used to publish to any stream keytype PublisherConfig struct {
Client redis.UniversalClient
Marshaller Marshaller
Maxlens map[string]int64
DefaultMaxlen int64
}
MAXLEN
, first check what the DefaultMaxlen
should be maxlen, ok := p.config.Maxlens[topic]
if !ok {
maxlen = p.config.DefaultMaxlen
}
Let me know if that is clear. I can put a PR in place ๐
Scenario:
DefaultMaxIdleTime
XGroupDelConsumer
This is trivial to reproduce by e.g. producing numbers from 0-9, handling them in a subscriber and shutting it down while handling a message. After restarting the subscriber (with the same consumer group), one pending message is claimed and processed (after some time), but in most runs there will be at least one lost message due to the consumer being deleted before all pending messages were claimed.
See https://gist.github.com/hlubek/1a667ec6050bea703b58ba0036d26cc9 for an example program.
Solution:
There needs to be a check if the consumer has any pending message before deleting it. But this is not free from race conditions, since the consumer could come back (if an explicit consumer id is used) and get a message before being deleted.
It would be best to not delete consumers while claiming messages, but to have a max idle duration for consumers and check regularly via XInfoConsumers
if a consumer has an idle duration longer than that threshold and doesn't have pending messages (I'd say that should usually be around multiple hours). There should be a way to opt out of deleting consumers automatically for special use-cases.
get errors below:
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\publisher.go:93:23: p.client.XAdd undefined (type redis.UniversalClient has no field or method XAdd)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:203:25: s.client.XGroupCreateMkStream undefined (type redis.UniversalClient has no field or method XGroupCreateMkStream)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:297:25: s.client.XReadGroup undefined (type redis.UniversalClient has no field or method XReadGroup)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:307:25: s.client.XRead undefined (type redis.UniversalClient has no field or method XRead)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:372:23: s.client.XPendingExt undefined (type redis.UniversalClient has no field or method XPendingExt)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:396:24: s.client.XClaim undefined (type redis.UniversalClient has no field or method XClaim)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:448:25: s.client.XInfoConsumers undefined (type redis.UniversalClient has no field or method XInfoConsumers)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:461:23: s.client.XGroupDelConsumer undefined (type redis.UniversalClient has no field or method XGroupDelConsumer)
\github.com\!three!dots!labs\[email protected]\pkg\redisstream\subscriber.go:558:7: p.XAck undefined (type redis.Pipeliner has no field or method XAck)
The current implementation for subscribers essentially starts 3 different routines:
The only thing that prevents the first 2 from reading too fast is that writes to the channel block while messages are being processed by the 3rd routine. However, nothing prevents each of the first 2 routines from claiming task while the 3rd routine is already busy processing an earlier message.
This might not be a problem for applications where at least one of the following is true:
It's not too easy to solve this problem without also somewhat affecting performance for applications that have a lot of very quick tasks, since it would require to stop claiming tasks "ahead of time" while the worker is still processing an ongoing task. I have completely re-written subscribers in my fork to better fit my use case, but not sure how to make that a patch that you'd be okay merging here. Ideas/feedback welcome.
(on the plus side, not having all the extra routines does make the new implementation quite cleaner ;) less internal communication channels, everything is controlled simply with contexts)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.