Comments (6)
+1
I also need the ability to run concurrent poll requests, for reasonable scalability. In my case, I would ideally want to have all the poll requests handled by a single Consumer instance, since the logic and SQS configuration would be identical for all requests.
In fact, it would be great if the Consumer could accept a batchSize
option greater than 10, and automatically make multiple concurrent poll requests.
Going even further, the package could support a configurable polling policy to determine when it would actually create polling requests. This is because it would not be cost effective to create a lot of polling requests when the queue is (almost) empty. So, thinking off the top of my head, the default polling policy might be something like this:
- Start out by making a single long poll request.
- Every time a response comes back full, make 2 new requests (but if that would violate the appropriate maximum number of outstanding requests, make 1 new request).
- Otherwise, do not create a new request (unless that would leave us with no outstanding requests).
from sqs-consumer.
@shinzui you should be able to run multiple consumers in one process. In terms of running consumers for different queues, this is something we do in production already. We haven't tried multiple consumers for the same queue in the same process. If it doesn't work, let me know. As for priority, that is probably something beyond the scope of this library.
@cybersam - open to PRs that make the polling logic more sophisticated.
Closing this issue as I don't think we're looking to pick up any prioritisation logic, but if there's a specific bug with running multiple consumers please open it separately.
from sqs-consumer.
Actually just wanted to create an issue with running multiple consumers per process. There is a problem currently with that and it looks like its not well supported by this library.
Below a test I am running which gives me very inconsistent results. In particular in regards to the effects of the done() call. Namely in case of multiple consumers a done() call like below looks to not have any effect, and to not delete the message. That means the queue items are processed ad infinitum, every time after the visibilityWindow expires. It looks like a major inhibition to run this at proper scale. Just working with the batchSize=10 is way to low. It means you can only run 10 messages in parallel per instance even if the logic is mostly async (It should be close to 500+/instance). Looking into the code how done() is called and maybe do a pull request if I find the bug.
var SQSWorkers = function(config){
//initialization related stuff
...
this.numberWorkers = 40;
this.consumerWorkers = [];
this._setup(config);
}
SQSWorkers.prototype._setup = function(config){
var self = this;
for(var i=0; i<this.numberWorkers; i++){
this.consumerWorkers[i] = Consumer.create({
queueUrl: config.producerUrl,
messageAttributeNames: ["retried"],
handleMessage: function (message, done) {
var notificationStart = Date.now();
var messageBody = JSON.parse(message.Body);
var retried = message.MessageAttributes && message.MessageAttributes.retried ? Number(message.MessageAttributes.retried.StringValue) : 0;
var worker = this;
logger.info("ConvertStoreConsumer start handling event from SQS worker:%s start:%s msgId:%s msg: %j", worker.workerName, notificationStart, message.MessageId, message, {});
//processing the message happens on onEvent()
return self.onEvent(messageBody, function(err){
//on error message will be enqueued again for later retry
//TODO add some logs on error and elaborate more
if(err){
logger.error("ConvertStoreConsumer error on handling event from SQS. worker:%s error:%j msgId:%s retrying...", worker.workerName, err, message.MessageId, {});
return self._handleRetry(message.MessageId, message.Body, retried, function(){
return done();
});
}
//otherwise remove item from queue with done()
logger.info("ConvertStoreConsumer finished handling event from SQS worker:%s docid:%s tenant:%s v:%s dur:%s", worker.workerName, messageBody.id, messageBody.tenant, messageBody.v, Date.now() - notificationStart, {});
done();
});
}
});
this.consumerWorkers[i].workerName = "consumer-worker-" + i;
this.consumerWorkers[i].on('message_processed', function(message){
//otherwise remove item from queue with done()
logger.info("ConvertStoreConsumer message processing done - worker:%s msgId:%s", this.workerName, message.MessageId, Date.now(), {});
});
}
}
SQSWorkers.prototype.start = function(param){
//start all workers
if(this.consumerWorkers && this.consumerWorkers.length){
this.consumerWorkers.forEach(function(worker, pos){
worker.start();
});
}else{
// just log an error here and don't throw an error to not break on local
logger.error("ConvertStoreConsumer not initialized for SQS. ConvertStoreConsumer cannot start.", {});
}
}
SQSWorkers.prototype.onEvent = function(message, callback){
//this is the application logic, basically what is supposed to happen with a message
//Its mostly REST calls to other services
....
if(err){
return callback(err);
}
callback(null)
}
from sqs-consumer.
@edeliu2000 I have a slightly different use case. I want to have one process with multiple consumers consuming different queues.
from sqs-consumer.
Then you should be able to run the same code as above with the difference that you pass different queueUrls for the workers. Note that the code does not contain any authorization and such. Also you may run into the same issue I have with done() not behaving properly for many workers regardless of whether you are using one queue or multiple queues.
from sqs-consumer.
Looks like the issue with multiple concurrent consumers/process is in the async.series() call in _processMessage. I wonder if it has something to do with the consumers being initialized and started in a normal (synch) for-loop.
from sqs-consumer.
Related Issues (20)
- create a version of consumer with the aws sdk version for json HOT 3
- [Bug]: SQS receive message failed: Could not load credentials from any providers HOT 2
- [Bug]: Undocumented breaking change - acknowledgment to handleMessage HOT 7
- v5.8.0 if send message to sqs queue on other pc HOT 2
- [Bug]: ApproximateReceiveCount is not part of QueueAttributeName HOT 4
- The automated release is failing 🚨 HOT 1
- [Bug]: Version 8.1.5 of this library incompatible with @aws-sdk/client-sqs version 3.507.0 HOT 5
- Add a link to the docs at the top of the readme HOT 2
- Remove AWS SDK V2 Note
- Upgrade p-event to v6 HOT 1
- Upgrade chai to v5 HOT 1
- Remove the `handler_processing` debugger HOT 2
- [Bug]: CredentialsProviderError is not treated as a connection error HOT 2
- [Bug]: SQS receive message failed: connect ETIMEDOUT <ip>:443 every few hours HOT 4
- [Bug]: When consumer fails to react to SQS instant retry occurs HOT 2
- [Bug]: attributeNames does not support MessageSystemAttributeName HOT 2
- The automated release is failing 🚨
- [Bug]: When consumer fails to connect to SQS due to connectivity instant retry occurs HOT 2
- v10 isn't loading the type file correctly HOT 11
- [Feature Request]: Enhance SQSError with AWS error $response object for Better Debugging HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from sqs-consumer.