Giter Site home page Giter Site logo

Comments (6)

cybersam avatar cybersam commented on May 26, 2024 1

+1

I also need the ability to run concurrent poll requests, for reasonable scalability. In my case, I would ideally want to have all the poll requests handled by a single Consumer instance, since the logic and SQS configuration would be identical for all requests.

In fact, it would be great if the Consumer could accept a batchSize option greater than 10, and automatically make multiple concurrent poll requests.

Going even further, the package could support a configurable polling policy to determine when it would actually create polling requests. This is because it would not be cost effective to create a lot of polling requests when the queue is (almost) empty. So, thinking off the top of my head, the default polling policy might be something like this:

  • Start out by making a single long poll request.
  • Every time a response comes back full, make 2 new requests (but if that would violate the appropriate maximum number of outstanding requests, make 1 new request).
  • Otherwise, do not create a new request (unless that would leave us with no outstanding requests).

from sqs-consumer.

robinjmurphy avatar robinjmurphy commented on May 26, 2024 1

@shinzui you should be able to run multiple consumers in one process. In terms of running consumers for different queues, this is something we do in production already. We haven't tried multiple consumers for the same queue in the same process. If it doesn't work, let me know. As for priority, that is probably something beyond the scope of this library.

@cybersam - open to PRs that make the polling logic more sophisticated.

Closing this issue as I don't think we're looking to pick up any prioritisation logic, but if there's a specific bug with running multiple consumers please open it separately.

from sqs-consumer.

edeliu2000 avatar edeliu2000 commented on May 26, 2024

Actually just wanted to create an issue with running multiple consumers per process. There is a problem currently with that and it looks like its not well supported by this library.

Below a test I am running which gives me very inconsistent results. In particular in regards to the effects of the done() call. Namely in case of multiple consumers a done() call like below looks to not have any effect, and to not delete the message. That means the queue items are processed ad infinitum, every time after the visibilityWindow expires. It looks like a major inhibition to run this at proper scale. Just working with the batchSize=10 is way to low. It means you can only run 10 messages in parallel per instance even if the logic is mostly async (It should be close to 500+/instance). Looking into the code how done() is called and maybe do a pull request if I find the bug.


var SQSWorkers  = function(config){
   //initialization related stuff
   ...
  this.numberWorkers = 40;
  this.consumerWorkers = [];
  this._setup(config);   
}

SQSWorkers.prototype._setup = function(config){
    var self = this;
    for(var i=0; i<this.numberWorkers; i++){

        this.consumerWorkers[i] = Consumer.create({
            queueUrl: config.producerUrl,
            messageAttributeNames: ["retried"],
            handleMessage: function (message, done) {

                var notificationStart = Date.now();
                var messageBody = JSON.parse(message.Body);
                var retried = message.MessageAttributes && message.MessageAttributes.retried ? Number(message.MessageAttributes.retried.StringValue) : 0;
                var worker = this;

                logger.info("ConvertStoreConsumer start handling event from SQS worker:%s start:%s msgId:%s msg: %j", worker.workerName, notificationStart, message.MessageId, message, {});

                //processing the message happens on onEvent() 
                return self.onEvent(messageBody, function(err){

                    //on error message will be enqueued again for later retry 
                    //TODO add some logs on error and elaborate more 
                    if(err){
                        logger.error("ConvertStoreConsumer error on handling event from SQS. worker:%s error:%j msgId:%s retrying...", worker.workerName, err, message.MessageId, {});
                        return self._handleRetry(message.MessageId, message.Body, retried, function(){
                            return done();
                        });
                    }

                    //otherwise remove item from queue with done()
                    logger.info("ConvertStoreConsumer finished handling event from SQS worker:%s docid:%s tenant:%s v:%s dur:%s", worker.workerName, messageBody.id, messageBody.tenant, messageBody.v, Date.now() - notificationStart, {});
                    done();
                });
            }
        });


        this.consumerWorkers[i].workerName = "consumer-worker-" + i;        
        this.consumerWorkers[i].on('message_processed', function(message){

            //otherwise remove item from queue with done()
            logger.info("ConvertStoreConsumer message processing done - worker:%s msgId:%s", this.workerName, message.MessageId, Date.now(), {});

        });


    }
}


SQSWorkers.prototype.start = function(param){
    //start all workers
    if(this.consumerWorkers && this.consumerWorkers.length){
        this.consumerWorkers.forEach(function(worker, pos){
            worker.start();
        });
    }else{
        // just log an error here and don't throw an error to not break on local 
        logger.error("ConvertStoreConsumer not initialized for SQS. ConvertStoreConsumer cannot start.", {});
    }
}

SQSWorkers.prototype.onEvent = function(message, callback){
    //this is the application logic, basically what is supposed to happen with a message
    //Its mostly REST calls to other services
    ....

    if(err){
        return callback(err);
    }

    callback(null)

}

from sqs-consumer.

shinzui avatar shinzui commented on May 26, 2024

@edeliu2000 I have a slightly different use case. I want to have one process with multiple consumers consuming different queues.

from sqs-consumer.

edeliu2000 avatar edeliu2000 commented on May 26, 2024

Then you should be able to run the same code as above with the difference that you pass different queueUrls for the workers. Note that the code does not contain any authorization and such. Also you may run into the same issue I have with done() not behaving properly for many workers regardless of whether you are using one queue or multiple queues.

from sqs-consumer.

edeliu2000 avatar edeliu2000 commented on May 26, 2024

Looks like the issue with multiple concurrent consumers/process is in the async.series() call in _processMessage. I wonder if it has something to do with the consumers being initialized and started in a normal (synch) for-loop.

from sqs-consumer.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.