Giter Site home page Giter Site logo

Comments (4)

nspragg avatar nspragg commented on May 23, 2024 4

We've not been able to reproduce this issue. Tested using the following script, varying the number of consumer instances and batchsize. After each test, all messages were dequeued. No errors.

Test script:

'use strict';

const Consumer = require('sqs-consumer');
const QUEUE = ''; // removed intentionally

function createConsumer() {
  return Consumer.create({
    queueUrl: QUEUE,
    batchSize: 1, 
    handleMessage: function (message, done) {
      console.log(message);
      done();
    }
  });
}

function startMultipleConsumers(n) {
  for (let i = 0; i < n; i++) {
    const consumer = createConsumer();
    consumer.on('error', function (err) {
      console.log(err.message);
    });
    consumer.start();
    console.log('started consumer...');
  }
}

// startMultipleConsumers(10);
// startMultipleConsumers(100);
startMultipleConsumers(1000);

from sqs-consumer.

nspragg avatar nspragg commented on May 23, 2024

Closing issue - if further discussion is required, will reopen.

from sqs-consumer.

unDemian avatar unDemian commented on May 23, 2024

I am having the same issue. @nspragg your example works ok. But I am using a collection to hold all created consumers so I can stop some of them similar to how @edeliu2000 is using an array. Even with 3 consumers for same queue and 1k messages the consumer gets crazy.

  • Some messages are not sent to the handler function even though consumed from the queue.
  • Polling continues even though not all done were called so messages are overlapping.

I am not sure exactly what happens but my best guess is that it has something to do with the async series. Someone is messing the references for done or for the handleMessage if you save multiple consumers in an array or collection.

I was creating the objects in a loop then starting the consumers in another loop. I busted my head around this for several days now and I cannot understand what happens. I just used the logic in your example and it works fine but this means that I have no reference to created consumers and no control over stopping them.

It's pretty hard to post all my code and logic, but please let me know if I can help you with anything in resolving this issue.

Thanks

from sqs-consumer.

brandonmp avatar brandonmp commented on May 23, 2024

@unDemian had a similar use case. haven't tested extensively but so far this works. I took the snippet w/ the for loop and just added an eventEmitter in the upper scope to keep track of stuff

adapting previous example

'use strict';

const Consumer = require('sqs-consumer');
const QUEUE = ''; // removed intentionally

const events = require('events')
const killSignalEmitter = new events.EventEmitter()

function createConsumer() {
  return Consumer.create({
    queueUrl: QUEUE,
    batchSize: 1, 
    handleMessage: function (message, done) {
      console.log(message);
      done();
    }
  });
}

const startMultipleConsumers = (n) => {
    for (let i = 0; i < totalWorkers; i++) {
        const consumer = createConsumer()
        consumer.on('error', (err) => {
              console.error(err)
        })
        consumer.start()
        
        // listen for kill message from upper scope
        killSignalEmitter.on('killSwitch', () => {
            console.log("KILL RECEIVED")
            consumer.stop()
        })

        // notify upper scope that a worker has finished shutting down
        consumer.on('stopped', () => {
            console.log('Finished queue shutdown')
            killSignalEmitter.emit('workerDown')
        })
    }
}

var totalWorkers = 1000
var deadWorkers = 0 

// intercept kill signal and shutdown workers gracefully
process.on('SIGINT', () => {
    console.log('Starting queue shutdown')
    killSignalEmitter.emit('killSwitch')
})

// tally our casualties. when all workers have shuffled off mortal coil, shut her down
killSignalEmitter.on('workerDown', () => {
    deadWorkers++
    if (deadWorkers >= totalWorkers) process.exit(0)
})

startMultipleConsumers(totalWorkers);

from sqs-consumer.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.