Giter Site home page Giter Site logo

amqp-node / amqplib Goto Github PK

View Code? Open in Web Editor NEW
3.6K 70.0 466.0 1006 KB

AMQP 0-9-1 library and client for Node.JS

Home Page: https://amqp-node.github.io/amqplib/

License: Other

Makefile 0.61% JavaScript 99.39%
amqp rabbitmq-client amqp-0-9-1 nodejs rabbitmq

amqplib's People

Contributors

ben-page avatar caioaugustoo avatar carlhoerberg avatar chkimes avatar cressie176 avatar furstenheim-geoblink avatar hippich avatar jcrugzz avatar jfromaniello avatar jimmywarting avatar johanneswuerbach avatar kibertoad avatar kurtas avatar luddd3 avatar mohd-akram avatar nfantone avatar olegabr avatar oronnadiv avatar paulrbr-fl avatar ravshansbox avatar renarsvilnis avatar revington avatar rexxars avatar squaremo avatar thomasgawlitza avatar timwatsonruffer avatar uzlopak avatar woozyking avatar xamgore avatar zweifisch avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqplib's Issues

Illegal Operation

I'm getting Illegal Operation errors quite often under actual usage.

How can I track down who's at fault? It's hard to tell if this is this Rabbit....amqplib or my own code.

Channel can provoke connection error in publish/sendToQueue

As per #10 (comment), it's possible to make a channel send an illegal sequence of frames by sending a broken message inside a message handler. This provokes the server into closing the entire connection.

The sequence is:

  1. receive a message, and in the handler,
  2. send a message with, e.g., a null content
  3. the client sends the basic.deliver frame, but throws an exception while sending properties
  4. the client catches this exception and treats it as an application error, so attempts to close the channel by sending channel.close

Non-futures API

More feedback from an experienced Node.js person: it would be nicer if amqp.node could provide a non-futures-based API. I recall there is a "lower-level API" but not sure if it is callbacks-based or not.

Then people who want to use futures can do that on top of callbacks. I don't see why
anyone would want to do that but have to trust the reviewer on this.

Can we maintain both "lower-level" API and an opinionated futures-based one? I'd certainly offer a hand if that's possible. Bunny 0.9+ does this to some extent.

This is an important choice because we will (I hope) use amqp.node in the rabbitmq.com tutorials.

support for custom username/passwords on connect?

Hi there,

Great project! really enjoying using it. Question: do you have support or plan to support providing custom username/passwords on connect? I've disabled the guest account on my production instance, and I'm not sure if I can supply a custom pair.

Thanks!

Clarify the "Argument must be a string" error?

I'm still pretty new to this, and I keep running in to errors from passing incorrect parameters in to methods. The only real problem is that the error message tells me nothing about where the error happened.

Here's an example output:

'TypeError: Argument must be a string
    at encodeBasicPublish (/Users/derickbailey/dev/projects/seabass/app/node_modules/amqplib/lib/defs.js:2145:31)
    at module.exports.encodeMethod (/Users/derickbailey/dev/projects/seabass/app/node_modules/amqplib/lib/defs.js:3527:12)
    at Connection.C.sendMessage (/Users/derickbailey/dev/projects/seabass/app/node_modules/amqplib/lib/connection.js:511:16)
    at Channel.C.sendMessage (/Users/derickbailey/dev/projects/seabass/app/node_modules/amqplib/lib/channel.js:79:26)
    at Channel.C.publish (/Users/derickbailey/dev/projects/seabass/app/node_modules/amqplib/lib/channel_model.js:343:15)

I like that I get an error telling me the parameter type was wrong... but what parameter, on what method call was that? I have a lot of code in my system, and I have no idea where the error is being thrown.

Can this error message be adjusted to specify what parameter on what method call is wrong? Or is there something I'm doing wrong that is killing my stack trace?

Do not use exceptions for errors

I've shown the client to a very experienced Node.js developer and he suggested
that we don't raise exceptions because
they are virtually impossible to catch in callback-driven environments. This is certainly true.

I understand that that particular branch is for a case that should never happen in practice, but maybe an errback can be used.

Alternate exchange not showing correctly in management plugin

Exchanges created with ch.declareExchange('my-name', 'type', { alternateExchange: 'my-alter'}) don't show up proper "AE" tag when viewing their properties in Management plugin (RabbitMQ 3.0.4 and above, I suppose). The fix is pretty trivial:
file: channel_api.js:
method: C.assertExchange
Replace: args.alternateExchange = options.alternateExchange
To : args['alternate-exchange'] = options.alternateExchange

channel.sendToQueue( ... ) sometimes fail to send some message

I don't know if it's me or my setup, or if it's really a bug, but for some reason, channel.sendToQueue( ... ) randomly fail to send some message. It's usually the first message of a series of message that got "forget", meaning if I'm idle for a while or I just restarted my app, the first message will not be send. But again, it's not all the time. I cannot tell when it will happen. It just seems so random. Also, I do not get any error or exception about this (from what I know).

When I use a ConfirmChannel, I'm always able to get the Ack (Message acked in the log), even for those messages who are not sent. I'm able to tell that the message is not received by using the rabbitmqctl utility.

My code look like this (complete file is available here):

channel.sendToQueue(queue, new Buffer(JSON.stringify(data2)), {}, function(err, ok) {
    if (err !== null) {
        console.warn(' [*] Message nacked');
    } else {
        console.log(' [*] Message acked');
    }
});

Is it a known issue? How can I solve or mitigate this problem?

EDIT: Can it be caused by the fact that I'm using the same channel to send and consume to/from different queues?

Socket closed abruptly during opening handshake

Hi

Im trying to make a simple one-way event proxy. Which means that other parts of our system publishes messages to rabbitmq, and this proxy is just supposed to subscribe to them and pipe them on to socket io. ie, i wont publish to rabbit from this script.

problem is i always get "Socket closed abruptly during opening handshake",
Ive tried with our production rabbitmq, my local, our staging server....

What to do?

2 Questions re: defs.js and heartbeats

I have two questions:

  1. Is there a reason you did not include the file lib/defs.js in your repository?
  2. I noticed you just committed heartbeat code. If an error occurs with a heartbeat, does it affect the connection? With a channel, I noticed that many 'errors', such as checking a queue that does not exist will "bork" the channel. I realize that an error on a heartbeat may indicate a problem with the connection.

Document Public/Private API

It would be nice, for reference, if the code itself contained more documentation. The comments are excellent, but more organization and a clearer specification of whether a method/function is public or private would be nice.

Consider using JavaDoc/JSDoc syntax:

/**
 * Construct a new Connection object.
 *
 * @param {Stream} underlying The stream the Connection should bind to.
 * @constructor
 */
function Connection(underlying) {
  ...
}

/**
 * Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally ignores
 * these; maybe it logs them. Returns a promise that will be resolved when the
 * CloseOk has been received; NB the 'close' event will be emitted once the
 * underlying stream is ended.
 *
 * @param {string} reason
 * @param {number}
 * @public
 */
C.closeBecause = function(reason, code) {
  ...
};

/**
 * A close has been initiated. Repeat: a close has been initiated. This means we
 * should not send more frames, anyway they will be ignored. We also have to
 * shut down all the channels.
 *
 * @private
 */
C.stop = function() {
  ...
};

prefetch default to no limit?

Seems like prefetch defaults to no limit. This seems to me be quite an unsafe default. Should the default rather be something much lower... People can always override if they want higher? This means the default is 'less performance than optimal in some circumstances (rather than 'explodes in some circumstances)...?

Interested in thoughts. Thanks for the great module!

Handling exceptions in callback API

Many operations in the promise and the callback APIs can give rise to errors, either because

  1. the server didn't like what you asked for and signalled an error
  2. the client code broke
  3. the application code (i.e., the callback) threw an exception

In general, 1. is handled by emitting an error from the channel; 2. is a bug, so bets are off.

With 3., the promises library will redirect an exception thrown in a success continuation to the (eventual) error continuation. However, this sort of behaviour isn't possible for the callback API, which essentially invokes callbacks in the top-level context.

The simplest solution is probably to wrap the outermost frame handlers in try-catch blocks, and redirect any exceptions as 'error' events.

about long-lived connection and message to send

Thanks for your great work.
I am a newbie of node & rabbitmq,now i want to use rabbitmq to send message and receive. I got your example code and updated it. I have two questions about it.
1>can i keep a long-lived connection to send message and don't close it?
2>how can i send message with my data?

senddao.js,

var amqp = require('amqplib');
var when = require('when');
var config=require('../config');
//var msgtosend='';

var     url='amqp://'+config.mq.username+':'+config.mq.password+'@'+config.mq.host+':'+config.mq.port;
var connection = amqp.connect(url);
var channelaction=function (sendchannel) {
    var q = 'task_queue';
    var ok = sendchannel.assertQueue(q, {durable: true});
    return ok.then(function () {
        var msg = "Hello World!"
      //  msg=msgtosend;  //can i do this? Isn't there any async issue?
        sendchannel.sendToQueue(q, new Buffer(msg), {deliveryMode: true});
        console.log(" [x] Sent '%s'", msg);
        return sendchannel.close();
    });
};
var channel = function (conn) {
    var trans = when(conn.createChannel().then(channelaction));
    trans.ensure(function () {
       // conn.close();
       // need to close?if i close this connection
       // ,when i need to send another msg then i need to connect the rabbitmq again.
       //So i want to keep the connection long-lived to avoid connecting again.
       //can i do this?
    });
};

exports.sendtomq=function(msg){
//    msgtosend=msg;  //if i do this,isn't there any async issue?
    //how can i send this msg to the queue instead of "Hello World!"?
    connection.then(channel).then(null, console.warn);
};

Not found './defs'

Installed amqplib:
npm install git://github.com/squaremo/amqp.node

Now i'm getting this error message:

Error: Cannot find module './defs'
at Function.Module._resolveFilename (module.js:338:15)
at Function.Module._load (module.js:280:25)
at Module.require (module.js:364:17)
at require (module.js:380:17)
at Object. (/example/node_modules/amqplib/lib/connection.js:7:12)
at Module._compile (module.js:456:26)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Module.require (module.js:364:17)

Remove buffer-more-ints requirement

Like MySQL, you need to support 64-bit integers. Node, of course, does not play nice with such numbers, content to only play nice with doubles.

The popular node-mysql library handles this by delegating to the bignumber library if the user specifies the supportBigNumbers config option.

amqp.node handles 64-bit integers by altering the global Buffer object and prototype with the buffer-more-ints module. This may seem like a harmless change, but in doing so you alter the Buffer objects of everyone that uses amqp.node and all modules they may depend on. I shouldn't need to explain why this could become problematic, but take bnoordhuis/node-buffertools#39 as an example--you override a common mechanism and the original mechanism is no longer available. If it changes, your users may wish to use the updated version, but you cannot change your code because that may break some other a user's code that doesn't want or use the updated feature.

Random crashes when publishing

While I am testing my own code I have come across a thrown error from the amqp.node library. The result is that my server crashes with and uncaughtException. This happens randomly. My requests to publish are virtually the same for each cycle of publishing to the Rabbit Queue from a RESTful interface.

I send a publish request from a REST client to my REST API which in turn does the actual publish. As I said, the requests from the client are virtually identical. I can publish dozens of messages without problem before seeing my server crash...or the server could crash on the first attempt to publish....very random behavior.

Below is the stack trace.

{"column":18,"file":"/usr/local/lib/node_modules/amqplib/lib/defs.js","function":"encodeChannelClose","line":991,"method":null,"native":false},
{"column":26,"file":"/usr/local/lib/node_modules/amqplib/lib/defs.js","function":"module.exports.encodeMethod","line":117,"method":"exports.encodeMethod","native":false},
{"column":15,"file":"/usr/local/lib/node_modules/amqplib/lib/frame.js","function":"Connection.F.sendMethod","line":133,"method":"F.sendMethod","native":false},
{"column":26,"file":"/usr/local/lib/node_modules/amqplib/lib/channel.js","function":"Channel.C.sendImmediately","line":47,"method":"C.sendImmediately","native":false},
{"column":8,"file":"/usr/local/lib/node_modules/amqplib/lib/channel.js","function":"Channel.C.closeBecause","line":173,"method":"C.closeBecause","native":false},
{"column":10,"file":"/usr/local/lib/node_modules/amqplib/lib/channel.js","function":"Channel.C.acceptMessageFrame","line":216,"method":"C.acceptMessageFrame","native":false},
{"column":17,"file":"/usr/local/lib/node_modules/amqplib/lib/channel.js","function":"Channel.C.accept","line":305,"method":"C.accept","native":false},
{"column":33,"file":"[as accept](/usr/local/lib/node_modules/amqplib/lib/connection.js","function":"Connection.mainAccept","line":36,"method":"mainAccept","native":false},
{"column":12,"file":"/usr/local/lib/node_modules/amqplib/lib/frame.js","function":"Socket.go","line":76,"method":"go","native":false},
{"column":17,"file":"events.js","function":"Socket.EventEmitter.emit","line":92,"method":"EventEmitter.emit","native":false}],
"stack":[
"TypeError: Argument must be a string","
at encodeChannelClose %28/usr/local/lib/node_modules/amqplib/lib/defs.js:991:18)","
at module.exports.encodeMethod (/usr/local/lib/node_modules/amqplib/lib/defs.js:117:26)","
at Connection.F.sendMethod (/usr/local/lib/node_modules/amqplib/lib/frame.js:133:15)","
at Channel.C.sendImmediately (/usr/local/lib/node_modules/amqplib/lib/channel.js:47:26)","
at Channel.C.closeBecause (/usr/local/lib/node_modules/amqplib/lib/channel.js:173:8)","
at Channel.C.acceptMessageFrame (/usr/local/lib/node_modules/amqplib/lib/channel.js:216:10)","
at Channel.C.accept (/usr/local/lib/node_modules/amqplib/lib/channel.js:305:17)","
at Connection.mainAccept as accept","
at Socket.go (/usr/local/lib/node_modules/amqplib/lib/frame.js:76:12)","
at Socket.EventEmitter.emit (events.js:92:17)"],
"level":"error","message":"uncaughtException: Argument must be a string","timestamp":"2013-10-04T02:39:55.112Z"}

Overlapping channel close / connection close

Following on from #10,

It's still a problem that the connection can call stop on an already closed channel

The question is, are there scenarios in which it will happen? The essence is that a channel breaks and stops itself, while also sending something to provoke the connection being closed; the channel waits for a ChannelClose frame before unregistering, but the connection gets a ConnectionClose frame first.

There's at least one such, legal within the protocol: the client closes a channel and the server closes the connection (as a result of operator intervention through a management tool, say) at the "same time".

See #20 re other channel/connection closing scenarios.

How to close channel and connection

Hi,

I am new enough to Node and amqplib, I am trying to write a simple node app that sends a message to amqp server. I have it working via the examples but as my system is very busy I now have well over 100 connections to the MQ server. I would like to close the channel and then the connection after the message is sent but I cant seem to figure it out. Can somebody show me an example using sendToQueue.
So i am trying to do the following
Create Connection, Create Channel, send msg, close channel, close connection.

My code is as follows.

function sendMessage(oMsgDetails, source, logger){
        //declare queue and server to connect to
        var q = myConf.rabbitQ;
        var server = myConf.rabbitIP;
        logger.info("DepID: " + oMsgDetails.i + " About to attempt Rabbit MQ connection.");
        //open connection to amqp server
        var open = amqp.connect('amqp://guest:guest@'+server);
        //Publisher - publish message to rabbit MQ server
        logger.info("DepID: " + oMsgDetails.i + " Connection established, going to attempt to create a channel.");
        open.then(function(conn) {
          var ok = conn.createChannel();
          logger.info("DepID: " + oMsgDetails.i + " Channel created, going to attempt to send a message.");
          ok = ok.then(function(ch) {
                ch.assertQueue(q);
                ch.sendToQueue(q, new Buffer(oMsgDetails.n+';'+oMsgDetails.i+';'+oMsgDetails.v+';'+oMsgDetails.a));
                logger.info("DepID: " + oMsgDetails.i + " Message sent Payload: "+oMsgDetails.n+';'+oMsgDetails.i+';'+oMsgDetails.v+';'+oMsgDetails.a+" sent to server: "+server$
                ch.close();
                //conn.close();
          });
          //conn.close();
          return ok;
        }).then(null, mailer.error);
}

(edited to pre-ify code example)

Why publish message failure?

Hi everybody,
I run the taskwork example when send message failure๏ผŒ but I delete conn.close(); in new_work.js ensure method it 's ok, who can tell me why? other examples have the same question.

new_task.js

var amqp = require('amqplib');
var when = require('when');

amqp.connect('amqp://guest:guest@localhost/productcfg?frameMax=4096000').then(function(conn) {
  return when(conn.createChannel().then(function(ch) {
    var q = 'task_queue';
    var ok = ch.assertQueue(q, {durable: true});

    return ok.then(function() {
      var msg = "Hello World!"
      ch.sendToQueue(q, new Buffer(msg), {deliveryMode: 2,noAck:false});
      console.log(" [x] Sent '%s'", msg);
    });
  })).ensure(function() {
          //conn.close();   //after commented this code rabbitmq can receive message
      });
}).then(null, console.warn);

Stability issues under load

(This is using master)

Today I put a small logging service into production that uses amqplib. Things chug along happily at around 200 publishes/s for stretches between 10 minutes and an hour (according to the RabbitMQ web interface). Then there will be a flood of "Channel ended, no reply will be forthcoming" errors, then a few "Channel is closed" errors, before the thread throws "Connection closed" from lib/connection.js:324.

It seems like something is killing the connection, though there's nothing in the application's logs that helps, and all RabbitMQ says is:

=WARNING REPORT==== 1-Oct-2013::16:03:53 ===
closing AMQP connection <0.31379.133> (xxx -> yyy):
connection_closed_abruptly

So... it looks like the connection is being closed by amqplib, probably due to an error, though I'm not seeing any errors in the log. The differences in time stretches suggest it might be triggered by consuming bad data... Do you have any advice on where to look for the error, or what information might help debug the issue?

Apologies for the vagueness.

Order of operations on error/close with connections/channels

The docs say that channels implicitly emit close events when a connection closes, but I can't see any code in channel_model.js or channel.js that does this (I assume it's precipitated lower down?)

Either way, say you have a connection with two channels. Is there a defined order that the events will trigger in? If it errors, will I receive three close events and three error events? If it closes, will I receive three close events?

Will the connection emit its event first, or the channels?

Is there a way to tell in the channel close/error event if it was caused by the connection closing? Is there a property to determine the current state of a connection/channel in general?

Sorry if any of these are answered in the docs, I did look first.

Is Nagle's algorithm disabled?

I want Nagle's algorithm disabled on the socket connection. Is it already disabled? If not then how can I access the socket to disable it?

Returned messages

Today I was looking into using the mandatory option when publishing, which the docs say should result in the message being returned if no route exists, however it does not say where they would be returned to?

Unhelpful error messages in defs.js

I've found myself repeatedly browsing defs.js due to errors such as:

TypeError: Argument must be a string

Fortunately, the line number gives enough info, but since the file seems to be autogenerated, perhaps I could persuade you to give a little more context? All the === undefined checks work, but Buffer.byteLength seems to require a string, so perhaps !== 'string' would work better.

Also, I don't know if this is intentional or not, but the script to generate defs.js is not in the npm package.

Violation of SRP

First of all awesome library. Thanks. I just think people from all walks of life could benefit from not having to learn then promise pattern (which is a fail as far as I am concerned because it hides natural elements of the javascript language) while trying to engage something REALLY separate like SOA using RabbitMQ.

This project deserves kudos, exposure would be improved by removing when.js.

Question Regarding callback api

Is this going to be a new minor/major version release when it is ready, and when do you think it will be ready for consumption?

Support Auto-reconnection

At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only throw and there's no reconnect support.

Even in a data center, connections can fail or time out, and reconnection would allow the connection to fail and recover from the failure, instead of simply giving up.

This might need to happen at the connect.js level given the current setup, but that would put messages that have been sent but not acked into an uncertain place where it's unclear if the message has reached the server or just fallen. I know from experience that trying to get reconnection to work well can be tricky. For a counterexample, see node-amqp's reconnection, which appears to leak everywhere.

How can I get full stack of error log amqplib

my code :

var open = amqp.connect('amqp://locahost');
open.then(function(conn) {
var ok = conn.createChannel();
ok = ok.then(function(ch) {
ch.assertQueue(taskName);
var a;
a.get();
})
return ok;
}).then(null, console.log);
}

and error is "TypeError: Cannot call method 'get' of undefined"

I'm expecting a full stack of this error.

How do I ack messages after the callback has executed in Channel#consume?

Apologies for a noob question - I can't figure out how to ack messages after the callback has executed in channel.consume. Is there any way for me to get a handle to the open channel so I can ack the message?

If I close the connection, the message will be put back on the stack and if I open a new channel, the message is invisible to acknowledgement (because the original channel has a lock on it).

Any help would be greatly appreciated.

I'm using your example with the the modification that doWork is now outside the connection loop -

open.then(function(conn) {
    conn.once('SIGINT', function() { conn.close(); });
    return conn.createChannel().then(function(ch) {
        var ok = ch.assertQueue(jobq, {durable: true});
        ok = ok.then(function() { ch.prefetch(1); });
        ok = ok.then(function() {
            ch.consume(jobq, doWork, {noAck: false});
            console.log(" [*] Waiting for messages. To exit press CTRL+C");
        });
        return ok;
    });
}).then(null, console.warn);

var doWork = function(msg) { doSomething}

Memory Leak

NOTE: Second comment contains a memwatch heapDiff

We've noticed that memory is being retained indefinitely when consuming from a channel. I'm hoping that I'm making some poor assumptions or doing something terribly wrong.

The following is a quick example of how to reproduce this. In this example, I'm continuously publishing to a queue every millisecond until one minute has passed. After one minute, I'm clearing the interval. The consumer's memory grows continuously over the course of that minute, but stays consistently high after is stops receiving messages. I've run this same test using the current #master branch, seeing that you've committed a fix to some global leaks. This hasn't fixed the issue.

consumer.js

var amqplib = require('amqplib');
var when = require('when');
var connPromise = amqplib.connect();
var queueName = 'bench_queue';

function getChannelFixture (qn) {
    return connPromise.then(function(conn){
        return conn.createChannel().then(function(ch){
            return when.all([
                ch.prefetch(10),
                ch.assertQueue(qn,{durable:true})
            ]).then(function(){
                return when(ch);
            })
        });
    });
}

getChannelFixture(queueName).then(function(ch){

    ch.consume(queueName,function(m){
        console.log('\tgot message %s',m.content.toString('utf8'))
        setTimeout(function(){
            ch.ack(m);
        },100)
    })

}).then(null,function(err){
    console.warn(err);
});

producer.js:

var amqplib = require('amqplib');
var when = require('when');
var connPromise = amqplib.connect();
var queueName = 'bench_queue';

function getChannelFixture (qn) {
    return connPromise.then(function(conn){
        return conn.createChannel().then(function(ch){
            return ch.assertQueue(qn,{durable:true}).then(function(){
                return when(ch);
            })
        });
    });
}

getChannelFixture(queueName).then(function(ch){
    var ct = 0;
    var pubInterval = setInterval(function(){
        var msg = { ct: ++ct, foo:1, bar:2, baz:3, time: Date.now() };
        console.log('publishing message %s',msg.ct);
        ch.sendToQueue(queueName, new Buffer(JSON.stringify(msg)), {
            deliveryMode: true, 
            contentType: 'application/json' 
        });
    },1);

    setTimeout(function(){
        console.log('stopping publish interval');
        clearInterval(pubInterval)
    },60000)

}).then(null,function(err){
    console.warn(err);
});

Mocking library

For TDD and BDD is would be awesome to have a mocking library like what nock does for http requests. I found one for node-amqp but, obviously, I'd prefer to use amqplib.

Error when an empty message is consumed.

Currently we use an RPC style system that uses status codes in the header (as well as correlationId & replyTo) to flag the status of the call (the body is used to hold results or an error message/stacktrace).

This often results in an empty message body but with the header & properties fields populated.

On this condition we seem to be getting an error at the library level:

events.js:72
        throw er; // Unhandled 'error' event
              ^
Error: Expected content frame after headers
    at Channel.C.closeBecause (/Users/bjc/Workspace/ol-trackingid-tag-upload/node_modules/amqplib/lib/channel.js:183:11)
    at Channel.C.acceptMessageFrame (/Users/bjc/Workspace/ol-trackingid-tag-upload/node_modules/amqplib/lib/channel.js:216:10)
    at Channel.C.accept (/Users/bjc/Workspace/ol-trackingid-tag-upload/node_modules/amqplib/lib/channel.js:305:17)
    at Connection.mainAccept [as accept] (/Users/bjc/Workspace/ol-trackingid-tag-upload/node_modules/amqplib/lib/connection.js:36:33)
    at Socket.go (/Users/bjc/Workspace/ol-trackingid-tag-upload/node_modules/amqplib/lib/frame.js:76:12)
    at Socket.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)
    at emitReadable (_stream_readable.js:404:5)
    at readableAddChunk (_stream_readable.js:165:9)
    at Socket.Readable.push (_stream_readable.js:127:10)

Sorry not to have a patch or anything more helpful than this, and thanks for all the hard work on this library.

Apologies, just found issue #10. Now if only I could retract this as a duplicate...

Bump version?

I'm encountering the issue fixed by #10, and it would be great to get the fix on NPM.

Nice library by the way, I'm finding it easier to use than node-amqp.

should amqp.node browserification work?

Hullo
I'm trying to use amqp.node in a browser to directly connect to various external rabbitmq connections. The example Consumer works as expected in Node, connecting to a rabbit exchange/queue on another computer. However, when I bundle the consumer up with browserify, it seems that line 113:

sock = require('net').connect(sockopts, onConnect);

is failing to get the require to work, as "Undefined is not a function".

I'm not very experienced with javascript, but if I put the require('net') at the top of the embedded javascript, all is fine and the function is resolved.

Should I be able to just drop the amqp.node into a browser like this?

If I should, then I'll try to pull together the simplest example of the failure, but I don't want to spend a lot of time doing that if I'm trying something stupid in the first place.

Also, if I shouldn't be raising this as an issue, pls show me the correct channel.

regards
Tim

Expected content frame after headers -- on consume

The following code is throwing an error. Can someone please confirm is the error is mine or if it is in amqp.node? Thank you very much. BTW: So fare I like amqp.node, seems much simpler to use overall when compared to node-amqp

mq.conn comes from a home spun sigleton
My publish code works very well so far, just having problems in consuming. Preferably I would like to consume only 1 message at a time.

mq.conn.createChannel().then( function( channel ) {
channel.assertQueue( self.queue_name ).then( function( queue_status ) {
if( queue_status.messageCount ) {
channel.consume( self.queue_name, function( message ) {
channel.ack( message );
channel.close();
pay.send_reply( 200, message );
});
}
else {
channel.close();
pay.send_reply( 204 );
}
});
}).then( null, console.warn);

Here is a partial stack trace

at Socket.go (/usr/local/lib/node_modules/amqplib/lib/frame.js:76:12)"," at Socket.EventEmitter.emit (events.js:92:17)"," at emitReadable_ (_stream_readable.js:408:10)"," at emitReadable (_stream_readable.js:404:5)"," at readableAddChunk (_stream_readable.js:165:9)"," at Socket.Readable.push (_stream_readable.js:127:10)"],"level":"error","message":"uncaughtException: Expected content frame after headers","timestamp":"2013-09-09T21:49:13.259Z"}

Question: relationship between Rabbit.js and amqp.node

This is not an issue but rather a question.

It think it's time to develop RabbitMQ tutorials for Node and we need to pick a client library. I strongly prefer amqp.node to node-amqp (which seems to be abandoned) but
not sure how rabbit.js relates to them.

Which one would you recommend for the tutorials? Also, maybe you would want to lend us a hand with writing them ;)

Thanks.

Tests failing?

I forked the repo so I could swap 'when' for 'bluebird' (I'm getting snobby about my promise libs :P), but the tests won't pass as-is. A random group of tests fails near the end(?) with ECONNRESET and write after end errors. Presumably something closed the channel but the tests went on, oblivious. None of the errors seem to indicate what, though. Any pointers on tracking down what's happening?

channel.ack not acknowledging messages

@squaremo here's the full code example as requested. I created a demo app using the source, it's at https://github.com/jazlalli/amqplib-consumer-example.

you should be able clone it, place your amqp URL in the config.js file and then npm install and node server.js.

In my use case, there are a bunch of message already sitting on the queue, but when processing them, they remain unacked, despite the call to channel.ack.

For reference:
http://stackoverflow.com/q/21526974/1146431

thanks

Publish multiple messages

I'm getting caught up in publishing messages in a loop. I see no method to ensure the data is getting written before exiting the process. Only time it is writing more than one message is if i sleep after writing a batch and before process exit. Here is some sample code

var q = require('q');
var connection = amqp.connect('amqp://localhost').then(function(conn) {
  var deferred = q.defer();
  deferred.resolve(conn);
  return deferred.promise;
});

var channel = connection.then(function(conn) {
  return conn.createChannel().then(function(channel) {
    var deferred = q.defer();
    deferred.resolve(channel);
    return deferred.promise;
  });
});

var exchange = channel.then(function(ch) {
    return ch.assertExchange(exchangeName, 'fanout', {durable: true, autoDelete: false, confirm: true}).then(function() {
      return ch.assertQueue(queueName, {durable: true, exclusive: false, autoDelete: false}).then(function() {
        return ch.bindQueue(queueName, exchangeName, routingKey, {}).then(function() {
          var deferred = q.defer();
          deferred.resolve(ch);
          return deferred.promise;
        });
      });
    });
});

exchange.then(function(ch) {
  for(var i = 1; i < 100; i++) {
    ch.publish(exchangeName, routingKey, new Buffer(i));
  }
  process.exit(0);
});

Expose Basic.Reject in the Channel API

Basic.Nack is great, but I'm connecting to an old RabbitMQ release that doesn't support it. The old server does support Basic.Reject, but it's not exposed in the channel API.

As a work-around I'm importing the AMQP defs and sending the message myself:

sourceChannel.sendImmediately(amqpDefs.BasicReject, {
  deliveryTag: message.fields.deliveryTag,
  requeue: true
});

have assertExchange return exchange info in promise callback?

I like the way assertQueue returns an object that has the queue name on it.

ch.assertQueue('whatever').then(function(q){
  q.queue; // the queue name: whatever
});

Can we get the assertExchange to do the same? Right now, it only seems to return an empty JS object. It would be very helpful for the code I'm writing to have the exchange name returned in the same way, so that I don't have to create a string var in so many places.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.