Giter Site home page Giter Site logo

amqp-ts's Issues

CPU drain when trying to connect with wrong credentials.

amqplib calls callback function provided to connect twice in some cases (e.g. when trying to connect with wrong username/password). This leads to exponential growth of retries which drains CPU and hangs application completely if retries limit is not set.

Node Package Manager (NPM) Deprecated

Dear all,

The amqp-ts version available in NPM is very obsolete, the last update was two years ago. We realize this when we look for a bug in the library related to automatic reconnections (#27 ). This bug has been fixed in your Git, but has not yet been fixed in the NPM library.

I strongly recommend keeping the NPM version up to date, this is really important for the use and dissemination of the library.

Kind regards,
Douglas Rafael.

message.sendTo (etc) don't return Promise

It would be great if message.sendTo, queue.send etc... returned a Promise so callers can be assured the send completed correctly.

Any plans to do something like this?

PS: thx for this lib... its a great help!

error TS2307: Cannot find module 'bluebird' (SOLVED)

Hi,
I am currently trying to develop an app using Ionic 2 and Angular 2 with Typescript version. I decided to use the library amqp-ts to include messaging in my app. I installed the library via npm like:

npm install amqp-ts

Everything went fine and now I've got something like this:

/ app root directory
    + node_modules
        - amqp-ts
            - lib 
                -  amqp-ts.d.ts
            - node_modules
                -  amqplib
                -  bluebird
                -  winston

The problems begin now: I import the library into my component as it is done in the example of the documentation...

import * as Amqp from "amqp-ts";

... and when I try to deploy the app I get the next error messages:

TypeScript error: C:/APPs/Test/Ionic2Angular2App/node_modules/amqp-ts/lib/amqp-ts.d.ts(2,26): Error TS2307: Cannot find module 'bluebird'.
TypeScript error: C:/APPs/Test/Ionic2Angular2App/node_modules/amqp-ts/lib/amqp-ts.d.ts(50,12): Error TS2304: Cannot find name 'Buffer'.

1. The line related to the first error message

image

2. The line related to the second error message

image

I hope you can help me, please.

CURRENTLY STATUS: SOLVED
Watch the solution here

WARNING in ./~/colors/lib/colors.js 138:29-43 Critical dependency: the request of a dependency is an expression

I followed your guide, but I found an error when I try to declareEXchange(...)

let exchange = connection.declareExchange("notifications");
==> Here are the error I got from browser console:
Angular is running in the development mode. Call enableProdMode() to enable the production mode. 2bluebird.js:1542 Unhandled rejection TypeError: QS.unescape is not a function at openFrames (http://localhost:4200/vendor.bundle.js:85237:16) at connect (http://localhost:4200/vendor.bundle.js:85315:14) at Object.connect (http://localhost:4200/vendor.bundle.js:84107:3) at Connection.tryToConnect (http://localhost:4200/vendor.bundle.js:83053:17) at http://localhost:4200/vendor.bundle.js:83025:19 at Promise._execute (http://localhost:4200/vendor.bundle.js:88871:9) at Promise._resolveFromExecutor (http://localhost:4200/vendor.bundle.js:91149:18) at new Promise (http://localhost:4200/vendor.bundle.js:90745:10) at Connection.rebuildConnection (http://localhost:4200/vendor.bundle.js:83024:28) at new Connection (http://localhost:4200/vendor.bundle.js:83013:14) at new AppComponent (http://localhost:4200/main.bundle.js:62:26) at createClass (http://localhost:4200/vendor.bundle.js:14408:26) at createDirectiveInstance (http://localhost:4200/vendor.bundle.js:14250:37) at createViewNodes (http://localhost:4200/vendor.bundle.js:15600:49) at createRootView (http://localhost:4200/vendor.bundle.js:15505:5) at callWithDebugContext (http://localhost:4200/vendor.bundle.js:16636:42) at Object.debugCreateRootView [as createRootView] (http://localhost:4200/vendor.bundle.js:16097:12) at ComponentFactory_.create (http://localhost:4200/vendor.bundle.js:13441:46) at ComponentFactoryBoundToModule.create (http://localhost:4200/vendor.bundle.js:7050:29) at ApplicationRef_.bootstrap (http://localhost:4200/vendor.bundle.js:8632:57) at http://localhost:4200/vendor.bundle.js:8419:79 printWarning @ bluebird.js:1542 formatAndLogError @ bluebird.js:1258 fireRejectionEvent @ bluebird.js:1283 Promise._notifyUnhandledRejection @ bluebird.js:729 (anonymous) @ bluebird.js:154 ZoneDelegate.invokeTask @ zone.js:398 onInvokeTask @ core.es5.js:4116 ZoneDelegate.invokeTask @ zone.js:397 Zone.runTask @ zone.js:165 ZoneTask.invoke @ zone.js:460 timer @ zone.js:1540 client?ffdb:40 [WDS] Warnings while compiling. client?ffdb:98 ./~/colors/lib/colors.js 138:29-43 Critical dependency: the request of a dependency is an expression warnings @ client?ffdb:98 sock.onmessage @ socket.js:37 EventTarget.dispatchEvent @ eventtarget.js:51 (anonymous) @ main.js:274 SockJS._transportMessage @ main.js:272 EventEmitter.emit @ emitter.js:50 WebSocketTransport.ws.onmessage @ websocket.js:35 wrapFn @ zone.js:1199 ZoneDelegate.invokeTask @ zone.js:398 Zone.runTask @ zone.js:165 ZoneTask.invoke @ zone.js:460

Do you know what did I missed? thanks.
FYI: i'm using node v7.8.0, npm v4.2.0, @angular/cli: 1.0.0 to create angular app.

Unhandled rejection TypeError: message.sendTo is not a function

I get the below exception when trying to use Exchange.send(...) rather than Exchange.publish(...) even though it is supposed to be deprecated.

Unhandled rejection TypeError: message.sendTo is not a function
at Exchange.send (...\node_modules\amqp-ts\lib\amqp-ts.js:498:17)
at ...\src\service\amqp\AmqpManager.js:48:33
at tryCatcher (...\node_modules\bluebird\js\release\util.js:16:23)
at Promise._settlePromiseFromHandler (...\node_modules\bluebird\js\release\promise.js:547:31)
at Promise._settlePromise (...\node_modules\bluebird\js\release\promise.js:604:18)
at Promise._settlePromise0 (...\node_modules\bluebird\js\release\promise.js:649:10)
at Promise._settlePromises (...\node_modules\bluebird\js\release\promise.js:729:18)
at Promise._fulfill (...\node_modules\bluebird\js\release\promise.js:673:18)
at PromiseArray._resolve (...\node_modules\bluebird\js\release\promise_array.js:127:19)
at PromiseArray._promiseFulfilled (...\node_modules\bluebird\js\release\promise_array.js:145:14)
at Promise._settlePromise (...\node_modules\bluebird\js\release\promise.js:609:26)
at Promise._settlePromise0 (...\node_modules\bluebird\js\release\promise.js:649:10)
at Promise._settlePromises (...\node_modules\bluebird\js\release\promise.js:729:18)
at _drainQueueStep (...\node_modules\bluebird\js\release\async.js:93:12)
at _drainQueue (...\node_modules\bluebird\js\release\async.js:86:9)
at Async._drainQueues (...\node_modules\bluebird\js\release\async.js:102:5)

connection.completeConfiguration() doesn't reject the Promise if the connection string passed is wrong.

connection.completeConfiguration() doesn't reject the Promise if the connection string passed is wrong.

//wrong connection string
let connString:string='amqp://donald:duck@ducktales:'+cfg.Port;
let connection = new Amqp.Connection(connString);

let queue = connection.declareQueue("QUEUE_1", { durable: false, prefetch:1, });
queue.activateConsumer(this.onMessage, { noAck: false });

connection.completeConfiguration().then(() => {
       $log.debug('[RabbitConsumer] END - Connection completed');
}).catch((err)=>{
        //never log this
       $log.error('[RabbitConsumer] ERROR: ['+err+']');
});
private onMessage=(message: Amqp.Message)=>{
...
}

Error: Can't resolve 'crypto'

I am new with AMQP, node and angular. I have installed it in my project but I have facing certain issues . Is there any prerequisite I am missing?

amqp-ts

Promise for RPC server

Please add feature in RPC server
return Promise
example

queue.activateConsumer((msg:Message) => {

  return new Promise((resolve, reject) => {
      setTimout(() => resolve({key:'value'}), 1000);
  });

});

wrong number of haakjes :)

The example lists the following example code
queue.activateConsumer((message) => {
Should end with }) instead of }

The NPM published version is not JS

The publish version of this library (on NPM) is not JS.

If we use this library... ts will try to compile... and it will break.. if you have a different version, from the one built, and this lib is kind of old.

When publishing to npm you should publish only the build version (js), not the ts files.

Channel for each queue and exchange

Hi! I've realized that each .declareExchange or .declareQueue opens a new channel.
Can someone explain why was it designed like that instead of using a single channel?

IllegalOperationError: Channel closed

I don't exactly know how but very often i see the tcp connection to the rabbitmq server keeps incresing gradually. The increasing connection is quite problematic as it floods the rabbitmq server where the live connection gets blocked. When the connection increases i can see the following logs porduced by amqp-ts in client application:

Unhandled rejection IllegalOperationError: Channel closed
    at Channel.<anonymous> (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:149:11)
    at Channel.C._rpc (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:131:8)
    at Channel.rpc (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/callback_model.js:73:8)
    at Channel.bindQueue (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/callback_model.js:121:17)
    at /tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqp-ts/lib/amqp-ts.js:1088:36
    at tryCatcher (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:547:31)
    at Promise._settlePromise (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:604:18)
    at Promise._settlePromiseCtx (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:641:10)
    at _drainQueueStep (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:97:12)
    at _drainQueue (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:86:9)
    at Async._drainQueues (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:102:5)
    at Immediate.Async.drainQueues [as _onImmediate] (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:15:14)
    at runCallback (timers.js:694:18)
    at tryOnImmediate (timers.js:665:5)
Unhandled rejection Error: Channel ended, no reply will be forthcoming
    at rej (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:190:7)
    at Channel.C._rejectPending (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:192:28)
    at Channel.C.toClosed (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:160:8)
    at Connection.C._closeChannels (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:392:18)
    at Connection.C.toClosed (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:399:8)
    at Connection.C.onSocketError (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:353:10)
    at Connection.emit (events.js:182:13)
    at Connection.EventEmitter.emit (domain.js:442:20)
    at Socket.go (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:479:12)
    at Socket.emit (events.js:182:13)
    at Socket.EventEmitter.emit (domain.js:442:20)
    at emitReadable_ (_stream_readable.js:534:12)
    at process._tickCallback (internal/process/next_tick.js:63:19)
Unhandled rejection Error: Operation failed: QueueDeclare; 405 (RESOURCE-LOCKED) with message "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '54:b2:03:0b:12:d1_ota' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration."
    at reply (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:127:17)
    at Channel.C.accept (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:401:7)
    at Connection.mainAccept [as accept] (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:476:48)

i.e. there seems to appear IllegalOperationError: Channel closed along with QueueDeclare; 405 (RESOURCE-LOCKED) unhandled rejection errors. In addition to that there is no exact steps to recreate the issue as of now, but this issue is arising quite often.

IllegalOperationError: Channel closed

I'm encountering an issue with message acknowledgment when the connection is reconnected or re-established. After successfully reconnecting, I attempt to acknowledge the received message, but it results in an "IllegalOperationError: Channel closed" error.

Missing reconnect and 'close' event

Hi and thank you for the project 👍

I was testing some error scenarios and found unexpected behaviour when using the Connection class' reconnectStrategy.

  1. reconnect doesn't happen if the connection is closed due to "CONNECTION-FORCED"
  2. after a successful reconnect, the 'close' callback isn't invoked

amqp-ts seems to handle closing connections differently in scenario 1 and 2. Scenario 1 works as expected while scenario 2 doesn't reconnect, but leaves consumers hanging as if nothing happened.

By hooking up the 'close' callback, I see that only Scenario 1 results in a isFatalError=true from the dependency amqplib. The 'close' callback also allows me to detect that Scenario 2 doesn't reconnect and results in isFatalError=false.

If Scenario 1 happens, the 'close' callback is only invoked on the first 'close' event. Since I have to handle Scenario 2 manually this unfortunately renders the reconnect feature unuseful for me, since I can't handle the manual reconnects if a successful reconnect has already happened.

Scenario 1: reconnects as expected ✅

  1. amqp-ts connects (successfully) to a rabbit-instance in a 3-node cluster
  2. terminate the rabbitmq instance the client connected to (with kill -9 <PID>)
  3. amqp-ts reconnects
  4. (repeat step 2 and 3)
    Setting up a callback on 'close' shows that amqplib's isFatalError returns true.

Scenario 2: reconnect doesn't happen on CONNECTION-FORCED ❌

  1. amqp-ts connects (successfully) to a rabbit-instance in a 3-node cluster
  2. restart the rabbitmq service (Ubuntu) the client connected to (with sudo systemctl restart rabbitmq-server.service)
  3. no reconnect
  4. 'close' callback is invoked and consumer is hanging without consuming events
    Setting up a callback on 'close' shows that amqplib's isFatalError returns false and error: "Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'".

Scenario 3: 'close' callback isn't invoked after succesfull reconnect (scenario 1) ❌

  1. amqp-ts connects (successfully) to a rabbit-instance in a 3-node cluster
  2. terminate the rabbitmq instance the client connected to (with kill -9 <PID>)
  3. amqp-ts reconnects
  4. restart the rabbitmq service (Ubuntu) the client connected to (with sudo systemctl restart rabbitmq-server.service)
  5. 'close' callback is NOT invoked and consumer is hanging without consuming events

missing @types/amqplib and @types/bluebird

Errors while compiling:

node_modules/amqp-ts/lib/amqp-ts.d.ts:9:26 - error TS7016: Could not find a declaration file for module 'amqplib/callback_api'.

node_modules/amqp-ts/lib/amqp-ts.d.ts:10:26 - error TS7016: Could not find a declaration file for module 'bluebird'.

I solved it "temporally" adding these types to my own project:

npm install -D @types/bluebird
npm install -D @types/amqplib

nack(false) still requeues, nack needs allUpTo parameter

I ran into a delivery loop (event always failed, always redelivered), and found out that allUpTo is false when I pass false to nack, but requeue isn't being passed through.
http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack

This is the suspect line, when I added false as the second parameter, the problem went away
https://github.com/abreits/amqp-ts/blob/master/src/amqp-ts.ts#L393

in the mean time we are using ack instead to deal with this.

message.ack() takes 40+ milliseconds to actually ack

I'm troubleshooting some slow performance in my application that I've narrowed down to the amqp-ts consumer component. I have a go-lang based sender that sends messages to a queue and a typescript/node component that listens on that queue (using amqp-ts) and sends the response back on a different queue. A basic test script is at the bottom.

It seems like message.ack() returns almost immediately (<1ms), but instrumenting the various parts of my application has shown that the go-lang receiver gets the message and sends a new message before the first message is actually shown as acknowledged in rabbitmq. Said differently:

  1. publish message-1
  2. consume message-1
  3. receive response for message-1
  4. publish message-2
  5. ack finishes for message-1

With prefetch = 1, this results in about 40ms of latency being added to the response. If I bump prefech to 2, then every other message is +40ms with other messages only being 10ms..

What is message.ack() doing? is it acknowledging receipt in the queue (e.g. RabbitMQ) or in the consumer of the queue (e.g. my go-lang based component)?

I'm using RabbitMQ 3.8.8

Code:

import * as Amqp from "amqp-ts";

const conn = new Amqp.Connection(
	"amqp://user:pass@rabbitmq:5672/",
	{ timeout: 1000 },
	{ retries: 1, interval: 1000 },
);

conn.completeConfiguration().then(() => {
	const recvQueue = conn.declareQueue("rpc-ugabuga", {
		durable: true,
		autoDelete: false,
		prefetch: 1,
	});

	const respQueue = conn.declareQueue("ugabuga", {
		durable: true,
		autoDelete: false,
		prefetch: 1,
	});

	console.log("consumer activating...");
	recvQueue.activateConsumer(
		(message: Amqp.Message) => {
			const req = message.getContent();
			const resp = {
				correlationId: req.correlationId,
				timestamps: req.timestamps,
			};
			const msg = new Amqp.Message(resp);
			msg.properties.correlationId = "correlation-id-1";
			respQueue.send(msg);

			message.ack();
		},
		{ consumerTag: "my-consumer", manualAck: true },
	);
});

Using amqp-ts for angular 8.0.0 throws error

When I enable amqp-ts in my angular 8.0.0 project, I am getting the following error.

ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'D:\backup_projects\angular\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'

RPC: how can I send multiples messages to some queue (error reply consumer already set)

Hi, I'm not sure if I'm doing a noob mistake here, my code is something like this

  const connection = await new amqp.Connection("amqp://...");
  const queue = await connection.declareQueue('rpc_queue', {durable: false});
  const numbers = [5, 10, 15, 20];


  console.time("calc");
    // eslint-disable-next-line id-length
  const calc = numbers.map((n) => queue.rpc(n));
  const results : amqp.Message[] = await Promise.all(calc);
  results.forEach((response) => {
    console.info("getting value ", response.getContent());
  });
  console.timeEnd("calc");

and I'm getting

(node:81089) UnhandledPromiseRejectionWarning: Error: amqp-ts: Queue.rpc error: Operation failed: BasicConsume; 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - reply consumer already set"
    at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/lib/amqp-ts.js:804:32
    at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/callback_model.js:61:10
    at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/callback_model.js:189:12
    at reply (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/channel.js:127:14)
    at Channel.C.accept (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/channel.js:401:7)
    at Connection.mainAccept [as accept] (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/connection.js:476:48)
    at Socket.emit (events.js:210:5)
    at Socket.EventEmitter.emit (domain.js:475:20)
    at emitReadable_ (_stream_readable.js:575:12)
    at processTicksAndRejections (internal/process/task_queues.js:79:21)
(node:81089) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:81089) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

it's possible achieve something like what am I trying or it's not possible? thank you so much!

Take "onMessage: Promise<void>" instead of "onMessage: void" for activateConsumer

Hello,

I have a question for you, here a use case :

...
// define the consumer for the queue
queue.activateConsumer( async (message) => {
  let msg = message.getContent();
  if (await service.isExist(message.content.id)) {
    // other logic here
  }
}, {noAck: true});

I can't do something like this because activateConsumer will call my arrow function without awaiting. So it will be like "fire and forget pattern". if service.isExist fails, I will get an unhandledexception.

The solution would be :

// define the consumer for the queue
queue.activateConsumer((message) => {
  let msg = message.getContent();
  service.isExist(message.content.id).then((isExist)=> {
    if(isExist) {
      // other logic here
    }
  }).catch(doSomething);
}, {noAck: true});

Is there a better alternative ? It would be nice to use es6 features.

Namespace 'winston' has no exported member 'LoggerInstance'.

Hi, i am getting the following error:

> tsc

node_modules/amqp-ts/lib/amqp-ts.d.ts:13:33 - error TS2694: Namespace 'winston' has no exported member 'LoggerInstance'.

13 export declare var log: winston.LoggerInstance;
                 
Found 1 error.

I think that this package is still using the old winston version (2.2.0) and my project is using winston version 3. Maybe thats the issue. In that case can we update this package to use latest winston?

Not able to find these modules

ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'net' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
i 「wdm」: Failed to compile.
i 「wdm」: Compiling...

Date: 2019-07-16T08:35:44.021Z - Hash: b43aaf5a02261fe67b79
15 unchanged chunks

Time: 11871ms

WARNING in ./node_modules/amqp-ts/node_modules/colors/lib/colors.js 127:29-43
Critical dependency: the request of a dependency is an expression

ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'net' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'

Side effect of bug in 1.0.0 (fixed in 1.0.1)

First, really appreciate this library, we are using it in our production system and so far so good. The only thing is that the build-up of retries in 1.0.0 causes a really scary side effect when credentials are the reason for the inability to connect:

  • try to connect with a user that doesn't exist e.g. guest2:test
  • provision the user without restarting any systems
  • rabbitmq will go down, the system trying to connect opens thousands of connections until it crashes

I appreciate you getting a fix out in 1.0.1, I might just recommend publishing a warning in the npm module for 1.0.0 during install since this can knock over the rabbitmq cluster.

Queue.onMessage RPC promise returned error: mapper_parsing_exception module=amqp-ts

I try to send a message to the queue, after that I see this error.

Queue.onMessage RPC promise returned error: mapper_parsing_exception module=amqp-ts

I cannot understand what I am doing wrong. This is my worker code:

const exchange = connection.declareExchange(process.env.OBJECT_CHILD_QUEUE);
  const queue = connection.declareQueue(process.env.OBJECT_CHILD_QUEUE, {
    prefetch: 1,
  });
  queue.bind(exchange);
  queue.activateConsumer(
    async (message) => {
      const messageID = message.getContent();
      await service.subscribe(
        messageID,
        (retry: boolean) => {
          if (retry) {
            message.nack();
          } else {
            message.ack();
          }
        }
      );
    },
    { noAck: false, manualAck: true }
  );

Error when trying to send multiple RPC messages

I am using amqp-ts to communicate between my microservices. This is so far the easiest to understand library i was able to found. Thank you for writing it!

However at the moment I'm not sure what to do because I would need to create an RPC worker which gets data from a server and than sends the results in a reply to the other service. Unfortunately, when I try to make multiple request I get this error:

Error: amqp-ts: Queue.rpc error: Operation failed: BasicConsume; 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - reply consumer already set"

I was searching the internet and I understand so far that I need to sort the ansewrs by correlationID to be able to receive multiple messages. But I've found nothing useful about this error. I'm not really sure either how to attach the correlation ID. I even tried to create a new connection for every call but that doesn't seem efficient and it also resulted errors.

Any help is appreciated.

Publisher confirms

I've been reviewing your code with the idea of possibly using it in production, however it appears that there is an issue with sending a message, the Exchange.send function (and the underlying publish method) don't return promises, so there is no way for the user of the app to confirm that the message they think they sent actually got to the broker.

If I'm reading the code correctly, if the code looses connection to the broker it will try to reconnect and these "in flight" messages may get sent, but in the event of a long outage and a restart of the app these messages will just get lost. Since there is no promise for the sender to wait on, there is also no way of limiting the number of messages stuck waiting on getting published

persistent message

hi, how to mark the message as persistent or delivery mode propery as 2?

Promise not rejected on failed connection attempts.

When a connection attempt fails (integration-test provided with incorrect credentials), the Promises bound to:

    connection.completeConfiguration()
    connection.initialized

are not rejected, instead, they are resolved and all errors are silently ignored. Reconnects continue to happen, even when reconnect-strategy is configured without retries: { retries: 0, interval: 1500 }

getContent() always returns a string or json

The function getContent() always returns a string which, in the case of binary data, causes data loss. Maybe the code should look in the contentType to determine if a string value has to be returned and raw data otherwise like this:

    Message.prototype.getContent = function () {
    console.log("getContent::Message "+this.properties.contentType+" received: " + this.content.length);

      var content = this.content;
      if(this.properties.contentType == "text/plain") {
        console.log("getContent::Message received plain text: " + content.length);
        content = this.content.toString();
      }

I have tested this with:

cat /usr/share/pixmaps/debian-logo.png | amqp-publish -u amqp://RABBITNODE -e images -C application/octet-stream

and the data is indeed of the correct size.

what about createChannel ?

i'm trying to convert native this code using amqp-ts

amqp.connect(configuration.amqpUrl, function (err: any, conn: any) {
        conn.createChannel(function (err: any, ch: any) {
            ch.assertQueue('', { exclusive: true }, function (err: any, q: any) {
                let corr = uuid.v4();
                ch.consume(q.queue, function (msg: any) {
                    if (msg.properties.correlationId == corr && msg.content != null) {
                        let resData = JSON.parse(msg.content);
                        res.json(resData);
                    }
                }, { noAck: true });

                ch.sendToQueue(queue, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: q.queue });
            });
        });
    });

but createChannel , assertQueue not implemented or not there ?
http://www.squaremobius.net/amqp.node/channel_api.html#model_createChannel

Close connection

Hi,

I have to close channels and connection after message send.
If i try connection.close() it doesn't work, it reconnect immediately...

How i can solve this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.