Giter Site home page Giter Site logo

bbc / sqs-consumer Goto Github PK

View Code? Open in Web Editor NEW
1.7K 60.0 329.0 1.19 MB

Build Amazon Simple Queue Service (SQS) based applications without the boilerplate

Home Page: https://bbc.github.io/sqs-consumer/

License: Other

TypeScript 89.59% JavaScript 8.25% Gherkin 1.06% Shell 1.09%
sqs javascript polling aws nodejs

sqs-consumer's Introduction

sqs-consumer

NPM downloads Build Status Maintainability Test Coverage

Build SQS-based applications without the boilerplate. Just define an async function that handles the SQS message processing.

Installation

To install this package, simply enter the following command into your terminal (or the variant of whatever package manager you are using):

npm install sqs-consumer

Note This library assumes you are using AWS SDK v3. If you are using v2, please install v5.8.0:

npm install [email protected]

Node version

We will only support Node versions that are actively or security supported by the Node team. If you are still using an Node 14, please use a version of this library before the v7 release, if you are using Node 16, please use a version before the v7.3.0 release.

Documentation

Visit https://bbc.github.io/sqs-consumer/ for the full API documentation.

Usage

import { Consumer } from "sqs-consumer";

const app = Consumer.create({
  queueUrl: "https://sqs.eu-west-1.amazonaws.com/account-id/queue-name",
  handleMessage: async (message) => {
    // do some work with `message`
  },
});

app.on("error", (err) => {
  console.error(err.message);
});

app.on("processing_error", (err) => {
  console.error(err.message);
});

app.start();
  • The queue is polled continuously for messages using long polling.
  • Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
  • By default messages are processed one at a time โ€“ a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed here.
    • It's also important to await any processing that you are doing to ensure that messages are processed one at a time.
  • By default, messages that are sent to the handleMessage and handleMessageBatch functions will be considered as processed if they return without an error.
    • To acknowledge individual messages, please return the message that you want to acknowledge if you are using handleMessage or the messages for handleMessageBatch.
      • To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the alwaysAcknowledge option detailed here.
      • By default, if an object or an array is not returned, all messages will be acknowledged.
  • Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account).

Credentials

By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:

export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...

If you need to specify your credentials manually, you can use a pre-configured instance of the SQS Client client.

import { Consumer } from "sqs-consumer";
import { SQSClient } from "@aws-sdk/client-sqs";

const app = Consumer.create({
  queueUrl: "https://sqs.eu-west-1.amazonaws.com/account-id/queue-name",
  handleMessage: async (message) => {
    // ...
  },
  sqs: new SQSClient({
    region: "my-region",
    credentials: {
      accessKeyId: "yourAccessKey",
      secretAccessKey: "yourSecret",
    },
  }),
});

app.on("error", (err) => {
  console.error(err.message);
});

app.on("processing_error", (err) => {
  console.error(err.message);
});

app.on("timeout_error", (err) => {
  console.error(err.message);
});

app.start();

AWS IAM Permissions

Consumer will receive and delete messages from the SQS queue. Ensure sqs:ReceiveMessage, sqs:DeleteMessage, sqs:DeleteMessageBatch, sqs:ChangeMessageVisibility and sqs:ChangeMessageVisibilityBatch access is granted on the queue being consumed.

API

Consumer.create(options)

Creates a new SQS consumer using the defined options.

consumer.start()

Start polling the queue for messages.

consumer.stop(options)

Stop polling the queue for messages. You can find the options definition here.

By default, the value of abort is set to false which means pre existing requests to AWS SQS will still be made until they have concluded. If you would like to abort these requests instead, pass the abort value as true, like so:

consumer.stop({ abort: true })

consumer.status

Returns the current status of the consumer.

  • isRunning - true if the consumer has been started and not stopped, false if was not started or if it was stopped.
  • isPolling - true if the consumer is actively polling, false if it is not.

Note: This method is not available in versions before v9.0.0 and replaced the method isRunning to supply both running and polling states.

consumer.updateOption(option, value)

Updates the provided option with the provided value.

Please note that any update of the option pollingWaitTimeMs will take effect only on next polling cycle.

You can find out more about this here.

Events

Each consumer is an EventEmitter and emits these events.

Contributing

We welcome and appreciate contributions for anyone who would like to take the time to fix a bug or implement a new feature.

But before you get started, please read the contributing guidelines and code of conduct.

License

SQS Consumer is distributed under the Apache License, Version 2.0, see LICENSE for more information.

sqs-consumer's People

Contributors

adamrensel-artera avatar aklinkert avatar amilajack avatar aradbar avatar astrosam avatar charlescapps avatar deerawan avatar dependabot[bot] avatar dmitry-livchak avatar forbeslindesay avatar frsechet avatar greenkeeperio-bot avatar gregory avatar j-luong avatar jdecoster avatar jeanrauwers avatar jessegranger avatar lucleray avatar mogu4iy avatar nicholasgriffintn avatar nico-ulbricht avatar niklasr avatar nspragg avatar refactorr avatar rkt2spc avatar robertosousa1 avatar robinjmurphy avatar robtpaton avatar tomwinnington avatar yoavmoon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sqs-consumer's Issues

Message receiving stops if done() is not invoked

From the documentation of sqs-consumer, I understood that a message from the queue would be deleted only if "done()" is called in handleMessage. I have a usecase that I want the message to be left in the queue after being processed. But when the run the following code, node exits after processing a single message and only starts polling only if I call done()

Is this expected?

var Consumer = require('sqs-consumer');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('../config/aws-config.json');
var app = Consumer.create({
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/myQ',
  waitTimeSeconds: 5,
  handleMessage: function (message, done) {
    console.log(message);
    //done();
  },
});
app.on('error', function(err) {
    console.log(err.message);
});
app.start();

SQSError does not properly inherit from Error and has no stack trace.

SQSError objects apparently are intended to inherit from Error (since they use the Error prototype), but I was surprised to find that they have no stack trace.

To properly inherit from Error, this code:

function SQSError(message) {
  this.name = 'SQSError';
  this.message = (message || '');
}
SQSError.prototype = Error.prototype;

should be:

function SQSError(message) {
  Error.captureStackTrace(this, this.constructor);
  this.name = this.constructor.name;
  this.message = (message || '');
}
util.inherits(SQSError, Error);

Too abstracted from AWS SDK

Hi,

We're using sqs-consumer and it's working well for our nodejs microservice, but I feel that it's too abstracted from the AWS implementation, meaning that if we want to use other parameters in the receive function, we can't. It should be closer to how AWS allows us to do things.

As such, I've forked your code and added a new signature to the consumer function:

function Consumer(params, handler, sqs)

The first two parameters are required, while the last one is optional. The params is an object to be used in the receive function, and the handler is simply a function to process the message, while the sqs argument is to set a custom SQS object if needed but defaults to new AWS.SQS() if none specified.

By using this implementation, the library gets to concentrate on what it does best, which is the polling mechanism, and let's the AWS sdk does what it does best.

I wanted to know if this is something that you would consider pulling back in. If not, I would just create my own project since I don't want to spent the time rewriting the unit tests.

Make deleteMessage configurable?

Hey!

Is there any way to make deleting a message configurable? The scenario is that I want to delete a message when I know for sure I have processed it.

Cheers.

Retrieve the batchSize twice to prevent processing lock

It seems like the consumer only retrieves n messages and doesn't do another SQS request before the n messages have been handled.

Would it be possible to add an option to retrieve 2n messages? This would allow the consumer to have n messages being handled at all time, and not [1-n]. I realize that this would add another abstraction layer to the consumer, but it would potentially save a lot of execution time.

Problem trying to consume on a specific date

On day 14, between 07:00 a.m UTC and 10:00 a.m UTC, there was a problem consuming in a sqs queue.

I wonder if this kind of problem could happen.

Code:

var consumer = require('sqs-consumer');

function createTrip() {
var app = consumer.create({
     queueUrl: config.createTrip,

     handleMessage: function(message, done) {
                   done();
      }
});

app.on('error', function(error) {
           console.log(error);
 });

app.start();
}

The code above is what you use to consume as messages from the queue. If you need more data, please ask!

done(err) not working

hi! i'm use sqs-consumer.

it is great npm for AWS SQS. thank you.

my handlemessage fucntion write promise chain.

    findOrder()
    .then(pushToRider)
    .catch((err) => {
      logger.error('catch error', err)
      done(err) // or i'm try return done(err)
    })

i want left message from queue when occured error with promise chain.
and error message is move dead letter queue by redrive sqs policy.

but occured promise error when occured promise catch error.

10|bsqs    | (node:66103) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 2): Error: Callback was already called.

why this error occured???

thank you.

Support Deleting Messages in Batch

Have you considered supporting the DeleteMessageBatch request for SQS?

FYI - Using this module to replace our own implementation so its nice to remove clunky code from our app as much as possible. Thanks for this.

Callbacks for consumer.stop()

I'm running this as a long running process and using some external process to manage it (upstart/supervisor/etc). I don't want to forcibly kill the process as it could be in the middle of processing an event. I was hoping to pass a callback to consumer.stop() which would be called when the consumer stop and all existing messages are processed. The idea is to do something like this,

consumer.start();

process.on("SIGTERM", function() {
     consumer.stop(function(){
         process.exit(0);
     });
});

memory usage

Anyone facing high memory usage when deploying to production?
I'm using pm2 to keep process alive, but memory usage is almost 500MB. D:

If isAuthenticationError is true then why try to connect again?

isAuthenticationError might return true because of 'CredentialsError'.

Does it even make sense to do - setTimeout(this._poll.bind(this), this.authenticationErrorTimeout);

If the Credentials are wrong it will always fail to connect to the queue. There is no point of trying to connect after every 10 seconds. So, in case of wrong credentials the service should exit instead of keep trying to connect to the queue.

Error at Consumer._handleSqsResponse

I am getting the error below when I call .start();

/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:31
            throw err;
            ^

Error
    at Consumer._handleSqsResponse (/home/me/Documents/Repositories/my-sqs-listener/node_modules/sqs-consumer/index.js:124:24)
    at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:364:18)
    at Request.callListeners (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
    at Request.emit (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/sequential_executor.js:77:10)
    at Request.emit (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:682:14)
    at Request.transition (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:22:10)
    at AcceptorStateMachine.runTo (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/state_machine.js:14:12)
    at /home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/state_machine.js:26:10
    at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:38:9)
    at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:684:12)

Here is my code:

import debug from 'debug';
import Consumer from 'sqs-consumer';
import AWS from 'aws-sdk';

export class Queue {
    constructor(eventCallback) {
        this.debug = debug('gsl:lib/queue');
        this.debug('Loading...');

        AWS.config.update({
            region: 'eu-west-1',
            accessKeyId: 'KEY',
            secretAccessKey: 'ACCESS'
        });

        return Consumer.create({
            queueUrl: 'https://...my-sqs-service.fifo',
            handleMessage: eventCallback,
            sqs: new AWS.SQS()
        });
    }
}
import debug from 'debug';
import { Queue } from './lib/';

class App {
    constructor() {
        this.debug = debug('gsl:app');
        this.debug('Starting...');

        this.debug('Loading queue...');

        this.queue = new Queue(this.onMessage);
        this.queue.start();
    }

    onMessage(message, callback) {
        this.debug('Got new message!');
        this.debug(message);

        callback(null);
    }
}

global.app = new App();

does not gracefully handle thrown errors in handle message

We use this library all over the place at work and every handleMessage we write has to Promise.try(() => because sqs-consumer doesnt catch errors causing the consumer to fall over on the first thrown exception.

I think thats a surprising behavior. I suggest adding a try/catch that calls done(error) if a error bubbles up that far.

Can we set a Timeout value to worker?

I am running a worker, which works well generally.
However, if there is a Timeout error, for example download file online, Timeout might happen in http request. and the worker is going to wait for every.

handleMessage seems to kill the process if a promise is rejected in the callback function

const Consumer = require('sqs-consumer');

const app = Consumer.create({
    queueUrl: config.aws.delta.queueUrl,
    handleMessage: (message, done) => {
      logger.debug(message);
      processor.process(message).then((result) => {
        logger.trace(`Message processed.`);
        done();
      }).catch((err) => {
        logger.error("Failed to handle message", message);
      // uncomment next line to get it to work without bailing  
      //done();
      });
    }
  });


... 
app.start();

If I uncomment done() then it will work. But I often want the message to stay on the queue -- maybe I couldn't connect to the database momentarily and that's why it failed. As the code is, if I hit that .catch and don't call done() the process mysteriously exits, even if I have an uncaughtException/unhandledRejection listener on the process.

"sqs-consumer": "^3.6.1",

Node 6

Multiple consumers for one queue

There is a bug with running multiple consumers on one queue within the same process. Below a test I am running which gives very inconsistent results. In particular in regards to the effects of the done() call. Namely in case of multiple consumers a done() call like below looks to not have any effect, and to not delete the message. That means the queue items are processed ad infinitum, every time after the visibilityWindow expires. It looks like a major inhibition to run this library at proper scale. Just working with the batchSize=10 is too low. It means you can only run 10 messages in parallel per instance even if the logic is mostly async (It should be close to 500+/instance). The issue seems to be in the async.series() call in the libraries _processMessage function. It has something to do with the consumers being initialized and started in a normal (sync) for-loop. Its worth to see if it can get fixed by trying to start multiple consumers within an async loop

`
var SQSWorkers = function(config){
//initialization related stuff
...
this.numberWorkers = 40;
this.consumerWorkers = [];
this._setup(config);
}

SQSWorkers.prototype._setup = function(config){
var self = this;
for(var i=0; i<this.numberWorkers; i++){

    this.consumerWorkers[i] = Consumer.create({
        queueUrl: config.producerUrl,
        messageAttributeNames: ["retried"],
        handleMessage: function (message, done) {

            var notificationStart = Date.now();
            var messageBody = JSON.parse(message.Body);
            var retried = message.MessageAttributes && message.MessageAttributes.retried ? Number(message.MessageAttributes.retried.StringValue) : 0;
            var worker = this;

            logger.info("ConvertStoreConsumer start handling event from SQS worker:%s start:%s msgId:%s msg: %j", worker.workerName, notificationStart, message.MessageId, message, {});

            //processing the message happens on onEvent() 
            return self.onEvent(messageBody, function(err){

                //on error message will be enqueued again for later retry 
                //TODO add some logs on error and elaborate more 
                if(err){
                    logger.error("ConvertStoreConsumer error on handling event from SQS. worker:%s error:%j msgId:%s retrying...", worker.workerName, err, message.MessageId, {});
                    return self._handleRetry(message.MessageId, message.Body, retried, function(){
                        return done();
                    });
                }

                //otherwise remove item from queue with done()
                logger.info("ConvertStoreConsumer finished handling event from SQS worker:%s docid:%s tenant:%s v:%s dur:%s", worker.workerName, messageBody.id, messageBody.tenant, messageBody.v, Date.now() - notificationStart, {});
                done();
            });
        }
    });


    this.consumerWorkers[i].workerName = "consumer-worker-" + i;        
    this.consumerWorkers[i].on('message_processed', function(message){

        //otherwise remove item from queue with done()
        logger.info("ConvertStoreConsumer message processing done - worker:%s msgId:%s", this.workerName, message.MessageId, Date.now(), {});

    });


}

}

SQSWorkers.prototype.start = function(param){
//start all workers
if(this.consumerWorkers && this.consumerWorkers.length){
this.consumerWorkers.forEach(function(worker, pos){
worker.start();
});
}else{
// just log an error here and don't throw an error to not break on local
logger.error("ConvertStoreConsumer not initialized for SQS. ConvertStoreConsumer cannot start.", {});
}
}

SQSWorkers.prototype.onEvent = function(message, callback){
//this is the application logic, basically what is supposed to happen with a message
//Its mostly REST calls to other services
....

if(err){
    return callback(err);
}

callback(null)

}
`

Question about Consumer.create with batchSize > 1

Hi,

When using Consumer.create() with batchSize > 1, should the callback be executed once when all the messages have been handled, or once per message in the batch?

I have some messages stuck in flight since I activated the batchSize > 1, and I can't figure out how to avoid that.

Thanks.

Hook before starting processing messages

I need to change some settings before processing received messages from sqs.

What do you think about a interception function before starting processing?

It will be provided as options.beforeHandleMessages @ constructor:

  this.beforeHandleMessages = options.beforeHandleMessages || function(cb) { cb(); }

and it would be called as follow @ _handleSqsResponse method:

  this.beforeHandleMessages(function() { 
    async.each(response.Messages, this._processMessageBound, function () {
      // start polling again once all of the messages have been processed
      consumer._poll();
    });
  })

Do you have a better idea?

lfreneda@d741ab3

once you give me ok, I'll unit test it ๐Ÿ˜€

Differentiate between internal and external events

Is there a way to tell if an error was raised by client code (one of the message processing handlers) or by the library (sqs-consumer) internally without manually checking for all types of events sqs-consumer could possibly raise?

Process more than 10 messages in parallel

By default messages are processed one at a time โ€“ a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed below.
Can batchSize runs the different messages simultaneously at the same time and thus fetch remaining messages if any from the AWS SQS and then execute them as well or we have to apply parent and child process for parallel execution of multiple messages from the aws SQS

empty event not fired if no messages are returned

As per documentation empty event is used Fired when the queue is empty (All messages have been consumed). but code does not fire it when there are no messages in the queue

  if (response && response.Messages && response.Messages.length > 0) {
    async.each(response.Messages, this._processMessageBound, function () {
      // start polling again once all of the messages have been processed
      consumer._poll();
    });
  } else if (response && !response.Messages) {
    this.emit('empty');
    this._poll();
  } else if (err && isAuthenticationError(err)) {
    // there was an authentication error, so wait a bit before repolling
    debug('There was an authentication error. Pausing before retrying.');
    setTimeout(this._poll.bind(this), this.authenticationErrorTimeout);
  } else {
    // there were no messages, so start polling again
    this._poll();
  }
};

Stop consuming queue messages

Hi,

I'm doing some test locally with my queue. I'm sending 100 messages in a loop in my queue and using sqs-consumer to consume those messages but it always stop around 50-60 messages consumed. I'm using a batch size of 5 and everything works perfectly for the first 50-60 messages but he rest of the messages stay in Message in flight in AWS SQS and I'm not able to process them.

Thank you very much for any help!

Here is my code

const queueSendMessage = Consumer.create({
        messageAttributeNames:ย ["All"],
        batchSize: 5,
        queueUrl: process.env.AWS_SQS_QUEUE_URL,
        handleMessage: (message, done) => {
            done()
        },
        sqs: new AWS.SQS({
            accessKeyId: process.env.AWS_ACCESS_KEY_ID,
            secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
            region: 'us-east-1'
        })
    });
     
    queueSendMessage.on('error', (err) => {
        console.log("ERROR SQS:", err.message);
    });

queueSendMessage.start();

What if some message take very long time to consume?

I'm using the batchSize option (set to 10), consume times vary for diffierent messages (for example, a user with 5 followers VS one with 10k followers), and now sqs-consumer will wait for the slowest one to complete before next poll

How can I extend the Consumer object without touching the core code?

I want to include additional logic to _processMessage method but I don't want to touch the core code. Is it possible to do that? I was playing around with Consumer.prototype._processMessageBound and Consumer._processMessage but I always get an error "TypeError: Cannot read property 'emit' of undefined". See code below. I actually want to pass additional data in the options, and access it in the handleMessage.

Consumer.prototype._processMessage = (message, cb) => {
  var consumer = this;
  console.log('SOME CODES HERE');
  console.log(this.additionalData.version);

  this.emit('message_received', message);
  async.series([
    function handleMessage(done) {
      consumer.handleMessage(message, done);
    },
    function deleteMessage(done) {
      consumer._deleteMessage(message, done);
    }
  ], function (err) {
    if (err) {
      if (err.name === SQSError.name) {
        consumer.emit('error', err, message);
      } else {
        consumer.emit('processing_error', err, message);
      }
    } else {
      consumer.emit('message_processed', message);
    }
    cb();
  });
};
Consumer._processMessageBound = Consumer.prototype._processMessage.bind(Consumer);

Consumer.create({
  queueUrl: `https://sqs.ap-southeast-1.amazonaws.com/${AWS_SQS_QUEUE}`,
  handleMessage: handleMessage,
  sqs: new AWS.SQS(),
  batchSize: 10,
  additionalData: {version:2}
})

SQS delete message failed

I have a FIFO queue and I'm running into seemingly random occurrences of the following error after calling done();:

SQS delete message failed: Value AQEBWH9ttaf5P9olvqrYyaNik4axKT43ASxVhN+Hty/wT21Ip4cZ2QbaCFy7cKwghHrqxTszz00vtOncnn6NQAYezPNqrFQvnByQB1QhT5MCF7Y6rlkrLgVEVY19Jz0r5sfI8DPNvYo6EuBR2qJzEwUwWfVv9fCH552Ud67tjNmm89Hzm43zo/mZ+QY84XZMHLGfe7NrCVbdbWvSRoiCeSH7Mi1pOCiBCPiAJbBV7fsDkKCRDzjVE7W5LlIfjB1eQ8Rk7CB8WNsqGVXxctiDClKbnE8Gol+G3+d22YTUIe+a8= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired.'

I have only 1 consumer running. Here are the queue configuration details:

Default Visibility Timeout: 30 seconds
Maximum Message Size: 256 KB
Delivery Delay: 0 seconds
Queue Type: FIFO
Content-Based Deduplication: Enabled

I can provide more info if necessary.

Thanks!

Get messages by attributes

I would like to get messages that are more than 10 minutes, and leave as messages that are less this time. Is there any attribute to send for this to happen?

Read region from env vars

Today the region default value is hard coded:

  this.sqs = options.sqs || new AWS.SQS({
    region: options.region || 'eu-west-1'
  });

https://github.com/bbc/sqs-consumer/blob/master/index.js#L66

I suggest to read it from default AWS_REGION environment variable instead.
Like this:

  this.sqs = options.sqs || new AWS.SQS({
    region: options.region || process.env.AWS_REGION
  });

WDYT?
I can open pull request if you agree.

Idea: delayAndReQueue or Expose the deleteMessage method and perhaps a delayMessage

Currently I do:

  • done() which deletes a message when successful.
  • done(err) which does nothing with a message, when it is not successful...

That causes the message to be re-polled and processed, until SQS moves the message to a dead letter queue...

This is a great setup... But some of my messages, I know why and when they should be re-processed, I know they need to be delayed (on a schedule)...

It seems like, as an alternative to done() we could have a delaySeconds(60) which deletes the message and re-sends it with a delaySeconds=60

I know I could do this myself, with sqs-producer - but it seems a common and simple enough thing that building it in made sense.

thoughts?

(also - thanks - this is an excellent set of libraries for getting started with SQS fast)

Error: Callback was already called.

I have created a cluster process in and assigned one process to poll SQS. After processing message the worker process sends a message to Master process.send("Message"). But after that i am getting the following error.

Error: Callback was already called.
    at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:903:32
    at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3858:13
    at handleMessage (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\sqs-consumer\index.js:157:9)
    at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3853:24
    at replenish (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:946:17)
    at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:950:9
    at eachOfLimit (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:975:24)
    at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:980:16
    at _parallel (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3852:5)
    at Object.series (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:4708:5)

Long Polling with Fake SQS

I am using fake_sqs to emulate SQS for development. Unfortunately, fake_sqs does not acknowledge the ReceiveMessageWaitTimeSeconds attribute, thus causing sqs-consumer to rapidly make requests. I know this isn't a sqs-consumer bug but:

  1. I'm curious if there's a way to slow down the requests - specifically for development.

  2. If not, does anyone know of any other (preferably docker-wrapped) fake SQS implementations that would support long polling?

How about differenciate errors

At the moment the event emitter fires a single 'error' type for message errors and queue conection errors. Would be great if those errors are separated in 'error' and 'message_error'. For example if you want to leave the message in the queue because you can't process it at the moment a message_error would be fired.
Also would be nice if a re-poll option is added when polling the queue fires an error.
I can do a pr for that If you are agree.
Thanks

Any tips? "SQSError: SQS delete message failed" / "The receipt handle has expired"

Hi,

I get an exception thrown regularly, and can't find a way to fix the issue.

I'm hoping you have an idea about the cause?

I use FIFO queues, with the following settings:

I then have my code to send/receive the messages:


	var queue	= {
		send:	function(payload, options) {
			options	= _.extend({
				group:	'',
				queue:	'scores'
			}, options);
			
			var sqsParams = {
				MessageBody:			JSON.stringify(payload),
				QueueUrl:				'https://sqs.us-east-2.amazonaws.com/'+ftl.options.aws.client.id+'/'+ftl.options.env+'-'+options.queue+'.fifo'
			};
			sqsParams.MessageDeduplicationId	= options.id;
			sqsParams.MessageGroupId			= options.group;
			sqs.sendMessage(sqsParams, function(err, data) {});
		},
		getHandler:	function(name, callback) {
			return Consumer.create({
				region:				'us-east-2',	// OHIO, to get FIFO queues
				queueUrl:			'https://sqs.us-east-2.amazonaws.com/'+ftl.options.aws.client.id+'/'+ftl.options.env+'-'+name+'.fifo',
				batchSize:			1,
				visibilityTimeout:	20,
				handleMessage:		callback,
				sqs:				new AWS.SQS()
			});
		}
	};

To send data I use:

	queue.send({
		uuid:		'xxxxx',
		session:	'yyyyy',
		race:		'zzzzz'
	}, {
		group:	'yyyyy',
		id:		'some-long-unique-id',
		queue:	'race-start'
	});

To listen I use:

	var scoreQueue			= queue.getHandler('race-start', function(message, done) {
		ftl.data.sessionv2.classic.processScore(JSON.parse(message.Body), done);
	});

It works for a while, then randomly triggers this:

SQSError: SQS delete message failed: Value AQEB3XwPqs1p3WOXYP12JIR33hxePQ/Amls5OXgY2yaj2CKkFM3CuEPXgAPw4KbUWAffCgsHezZJqOEIOyDOlX4JTg+ndrTW4xu4RDEC9PJy8KQvuHU7ALbtIOb9N4U2yIN9TJwWj+GkK9vNKUeC9ZJslgeG5QUS/Xj4x2tWICsbdldlexuuP+V06qixVL/n7Q/8Ws6/IEWD7awwPy2zUehe0eZ16FfpCESoPICehhy+cEn/pm3+a7X2aCV9vqhNpEoGvHnEGyVAdzlAhR82cm2ejQO+SJYtHdpWN4tgI8j5TU4= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired.
    at Response.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\sqs-consumer\index.js:197:24)
    at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:364:18)
    at Request.callListeners (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:105:20)
    at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:77:10)
    at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:682:14)
    at Request.transition (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:22:10)
    at AcceptorStateMachine.runTo (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:14:12)
    at C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:26:10
    at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:38:9)
    at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:684:12)
    at Request.callListeners (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:115:18)
    at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:77:10)
    at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:682:14)
    at Request.transition (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:22:10)
    at AcceptorStateMachine.runTo (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:14:12)
    at C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:26:10

If you have any tip, it'd be greatly appreciated!

Interested in the design decision for handler and emitter

Hi,

Curious as to why the handleMessage function is mandatory when you have an event emitter for message_received implemented?

If handleMessage is mandatory does it not deem the event emitter redundant as you have to implement both. Any insights much appreciated as it is a great looking bit of code to learn from.

Thanks

Doubt about order of messages consumed

Hi, if you enter 10 messages in sqs. One every millisecond. The order that the sqs-consumer consumes the messages and in the order in which they were inserted?

Calling "done(message)" in order to remove a specific message from the queue

Currently the functionality is tied to the 10 batch queue offered by SQS, but what if I want to have one more layer of queue buffering keeping in mind that 10 is a quite small batch. Let's say i want to process queues in batches of 500

const queue = Consumer.create({
  queueUrl: '...',
  handleMessage: (message, done) => {
    someBuffer.push(message)

    if (someBuffer.length === 500) {
      // process all messages
      process(someBuffer, done)
    }
  }
})
process(buffer, done) {
  buffer.forEach(message => {
    // do something
    done(message.ReceiptHandle)
  })
}

Currently this is not possible because the done() callback is called against the message hitting the handleMessage function, they can't be "stored for later processing".

sqs request error

/xxxx/node_modules/aws-sdk/lib/request.js:30
            throw err;
                  ^
Error: Callback was already called.
    at /xxxx/node_modules/async/lib/async.js:30:31
    at /xxxx/node_modules/sqs-consumer/index.js:127:5
    at /xxxx/node_modules/async/lib/async.js:251:17
    at /xxxx/node_modules/async/lib/async.js:154:25
    at /xxxx/node_modules/async/lib/async.js:248:21
    at /xxxx/node_modules/async/lib/async.js:612:34
    at Response.<anonymous> (/xxxx/node_modules/sqs-consumer/index.js:141:5)
    at Request.<anonymous> (/xxxx/node_modules/aws-sdk/lib/request.js:353:18)
    at Request.callListeners (/xxxx/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
    at Request.emit (/xxxx/node_modules/aws-sdk/lib/sequential_executor.js:77:10)

Hi, does anyone have the same exception ?

Inaccessible host: sqs.eu-central-1.amazonaws.com

Hi,

I got this issue when I perform massive sending / treating messages, have you any idea where it could come from?

screen shot 2015-06-23 at 10 12 50 pm

I also go this error message from aws: This service may not be available in the 'eu-central-1' region but I'm certain that sqs is available in eu-central-1 region..

Workers stop processing messages

Would appreciate some help in figuring out this issue. I'm running 7 workers in parallel using pm2. After a few minutes, only one worker continues to process messages. A few minutes after that, no workers are receiving messages. Message Received and Message Processed are printed by all workers, then just a few workers, then one, and then none.

Here are my settings and relevant code.

Queue Settings

Default Visibility Timeout: 90 seconds
Message Retention Period: 4 days
Maximum Message Size: 256kb
Delivery Delay: 0s
Receive Message Wait Time: 10s

Code

import 'babel-polyfill'

/** Libraries */
import SQSConsumer from 'sqs-consumer'
import serializeError from 'serialize-error'

/** Internal */
import IndexingService from './services/indexing.service'
import logger from './utilities/logger'
import ravenClient from './utilities/raven'

const indexingService = new IndexingService()

const startProcess = async () => {
    const app = SQSConsumer.create({
        queueUrl: 'https://sqs.us-east-1.amazonaws.com/your-queue-here',
        waitTimeSeconds: 5,
        visibilityTimeout: 90,
        handleMessage: async (message, done) => {
            try {
                const parsedBody = JSON.parse(message.Body)
                await indexingService.startProcess(parsedBody)

                done()
            }
            catch (e) {
                logger.error({
                    'Error': serializeError(e)
                })

                if(process.env.NODE_ENV == 'production') {
                    //ravenClient.captureException(e)
                }

                done(e)
            }
        }
    })

    app.on('error', (e) => {
        logger.error({
            'Error': serializeError(e)
        })

        console.log(e);
    });

    app.on('processing_error', (e, message) => {
        logger.error({
            'Error': serializeError(e)
        })

        console.log(message);
    });

    app.on('stopped', () => {
        console.log('Worker Stopped.');
    });

    app.on('message_received', () => {
        console.log('Message Received.');
    });

    app.on('message_processed', () => {
        console.log('Message Processed.');
    });

    app.start()
}

startProcess()

document effects of done(err) in regard to visibilityTimeout

Looks like when calling done(err) on a processing error the message becomes visible immediately again whatever the configured visibilityTimeout is. That means the message becomes available again immediately and will get consumed again by other consumers (potentially from other instances) so unless the application handles an error differently, calling done(err) will cause immediate retrying.

I think it should be documented more clearly what the effects of done(err) are and how it affects retries etc.

FlowTypes

May be good to include some flowtypes

import type { SQS } from 'aws-sdk'
import type { EventEmitter } from 'events'

declare module 'sqs-consumer' {

  declare class SQSConsumer mixins events.EventEmitter {
    start(): void,
    stop(): void,
    on(event: 'error', cb: (e: Error) => void): this,
    on(event: 'processing_error', cb: (e: Error) => void): this,
    on(event: 'message_recieved', cb: (message: string) => void): this,
    on(event: 'stopped', cb: () => void): this,
    on(event: 'empty', cb: () => void): this
  }

  declare type SQSConsumerConfig = {|
    queueUrl: string,
    batchSize?: number,
    sqs?: SQS,
    region?: string,
    messageAttributeNames?: Array<string>,
    attributeNames?: Array<string>,
    terminateVisibilityTimeout?: boolean,
    visibilityTimeout?: number,
    waitTimeSeconds?: number,
    authenticationErrorTimeout?: number,
    handleMessage(message: string, done: () => void): mixed
  |}

  declare function create(config: SQSConsumerConfig): SQSConsumer
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.