abreits / amqp-ts Goto Github PK
View Code? Open in Web Editor NEWAmqpSimple, a RabbitMQ tested Amqp library written in and for Typescript
License: MIT License
AmqpSimple, a RabbitMQ tested Amqp library written in and for Typescript
License: MIT License
amqplib
calls callback function provided to connect
twice in some cases (e.g. when trying to connect with wrong username/password). This leads to exponential growth of retries which drains CPU and hangs application completely if retries limit is not set.
Dear all,
The amqp-ts version available in NPM is very obsolete, the last update was two years ago. We realize this when we look for a bug in the library related to automatic reconnections (#27 ). This bug has been fixed in your Git, but has not yet been fixed in the NPM library.
I strongly recommend keeping the NPM version up to date, this is really important for the use and dissemination of the library.
Kind regards,
Douglas Rafael.
It would be great if message.sendTo, queue.send etc... returned a Promise so callers can be assured the send completed correctly.
Any plans to do something like this?
PS: thx for this lib... its a great help!
Hi,
Could you add function to bind / consume queue without declaration them ?
First ,this is very useful project. thank you!
My question : Not every RPC request can be returned efficiently, so we need a timeout mechanism.
Hi,
I am currently trying to develop an app using Ionic 2 and Angular 2 with Typescript version. I decided to use the library amqp-ts to include messaging in my app. I installed the library via npm like:
npm install amqp-ts
Everything went fine and now I've got something like this:
/ app root directory
+ node_modules
- amqp-ts
- lib
- amqp-ts.d.ts
- node_modules
- amqplib
- bluebird
- winston
The problems begin now: I import the library into my component as it is done in the example of the documentation...
import * as Amqp from "amqp-ts";
... and when I try to deploy the app I get the next error messages:
TypeScript error: C:/APPs/Test/Ionic2Angular2App/node_modules/amqp-ts/lib/amqp-ts.d.ts(2,26): Error TS2307: Cannot find module 'bluebird'.
TypeScript error: C:/APPs/Test/Ionic2Angular2App/node_modules/amqp-ts/lib/amqp-ts.d.ts(50,12): Error TS2304: Cannot find name 'Buffer'.
1. The line related to the first error message
2. The line related to the second error message
I hope you can help me, please.
CURRENTLY STATUS: SOLVED
Watch the solution here
I followed your guide, but I found an error when I try to declareEXchange(...)
let exchange = connection.declareExchange("notifications");
==> Here are the error I got from browser console:
Angular is running in the development mode. Call enableProdMode() to enable the production mode. 2bluebird.js:1542 Unhandled rejection TypeError: QS.unescape is not a function at openFrames (http://localhost:4200/vendor.bundle.js:85237:16) at connect (http://localhost:4200/vendor.bundle.js:85315:14) at Object.connect (http://localhost:4200/vendor.bundle.js:84107:3) at Connection.tryToConnect (http://localhost:4200/vendor.bundle.js:83053:17) at http://localhost:4200/vendor.bundle.js:83025:19 at Promise._execute (http://localhost:4200/vendor.bundle.js:88871:9) at Promise._resolveFromExecutor (http://localhost:4200/vendor.bundle.js:91149:18) at new Promise (http://localhost:4200/vendor.bundle.js:90745:10) at Connection.rebuildConnection (http://localhost:4200/vendor.bundle.js:83024:28) at new Connection (http://localhost:4200/vendor.bundle.js:83013:14) at new AppComponent (http://localhost:4200/main.bundle.js:62:26) at createClass (http://localhost:4200/vendor.bundle.js:14408:26) at createDirectiveInstance (http://localhost:4200/vendor.bundle.js:14250:37) at createViewNodes (http://localhost:4200/vendor.bundle.js:15600:49) at createRootView (http://localhost:4200/vendor.bundle.js:15505:5) at callWithDebugContext (http://localhost:4200/vendor.bundle.js:16636:42) at Object.debugCreateRootView [as createRootView] (http://localhost:4200/vendor.bundle.js:16097:12) at ComponentFactory_.create (http://localhost:4200/vendor.bundle.js:13441:46) at ComponentFactoryBoundToModule.create (http://localhost:4200/vendor.bundle.js:7050:29) at ApplicationRef_.bootstrap (http://localhost:4200/vendor.bundle.js:8632:57) at http://localhost:4200/vendor.bundle.js:8419:79 printWarning @ bluebird.js:1542 formatAndLogError @ bluebird.js:1258 fireRejectionEvent @ bluebird.js:1283 Promise._notifyUnhandledRejection @ bluebird.js:729 (anonymous) @ bluebird.js:154 ZoneDelegate.invokeTask @ zone.js:398 onInvokeTask @ core.es5.js:4116 ZoneDelegate.invokeTask @ zone.js:397 Zone.runTask @ zone.js:165 ZoneTask.invoke @ zone.js:460 timer @ zone.js:1540 client?ffdb:40 [WDS] Warnings while compiling. client?ffdb:98 ./~/colors/lib/colors.js 138:29-43 Critical dependency: the request of a dependency is an expression warnings @ client?ffdb:98 sock.onmessage @ socket.js:37 EventTarget.dispatchEvent @ eventtarget.js:51 (anonymous) @ main.js:274 SockJS._transportMessage @ main.js:272 EventEmitter.emit @ emitter.js:50 WebSocketTransport.ws.onmessage @ websocket.js:35 wrapFn @ zone.js:1199 ZoneDelegate.invokeTask @ zone.js:398 Zone.runTask @ zone.js:165 ZoneTask.invoke @ zone.js:460
Do you know what did I missed? thanks.
FYI: i'm using node v7.8.0, npm v4.2.0, @angular/cli: 1.0.0 to create angular app.
I get the below exception when trying to use Exchange.send(...) rather than Exchange.publish(...) even though it is supposed to be deprecated.
Unhandled rejection TypeError: message.sendTo is not a function
at Exchange.send (...\node_modules\amqp-ts\lib\amqp-ts.js:498:17)
at ...\src\service\amqp\AmqpManager.js:48:33
at tryCatcher (...\node_modules\bluebird\js\release\util.js:16:23)
at Promise._settlePromiseFromHandler (...\node_modules\bluebird\js\release\promise.js:547:31)
at Promise._settlePromise (...\node_modules\bluebird\js\release\promise.js:604:18)
at Promise._settlePromise0 (...\node_modules\bluebird\js\release\promise.js:649:10)
at Promise._settlePromises (...\node_modules\bluebird\js\release\promise.js:729:18)
at Promise._fulfill (...\node_modules\bluebird\js\release\promise.js:673:18)
at PromiseArray._resolve (...\node_modules\bluebird\js\release\promise_array.js:127:19)
at PromiseArray._promiseFulfilled (...\node_modules\bluebird\js\release\promise_array.js:145:14)
at Promise._settlePromise (...\node_modules\bluebird\js\release\promise.js:609:26)
at Promise._settlePromise0 (...\node_modules\bluebird\js\release\promise.js:649:10)
at Promise._settlePromises (...\node_modules\bluebird\js\release\promise.js:729:18)
at _drainQueueStep (...\node_modules\bluebird\js\release\async.js:93:12)
at _drainQueue (...\node_modules\bluebird\js\release\async.js:86:9)
at Async._drainQueues (...\node_modules\bluebird\js\release\async.js:102:5)
connection.completeConfiguration() doesn't reject the Promise if the connection string passed is wrong.
//wrong connection string
let connString:string='amqp://donald:duck@ducktales:'+cfg.Port;
let connection = new Amqp.Connection(connString);
let queue = connection.declareQueue("QUEUE_1", { durable: false, prefetch:1, });
queue.activateConsumer(this.onMessage, { noAck: false });
connection.completeConfiguration().then(() => {
$log.debug('[RabbitConsumer] END - Connection completed');
}).catch((err)=>{
//never log this
$log.error('[RabbitConsumer] ERROR: ['+err+']');
});
private onMessage=(message: Amqp.Message)=>{
...
}
Hi,
I am new to this, trying to use this library from my angular project but getting lot of missing library error after installing amqp-ts and trying to create connection. Please check following stackblitz sample.
https://stackblitz.com/edit/angular-ivy-paatz6
If there is any working sample from angular project will be helpful.
Please add feature in RPC server
return Promise
example
queue.activateConsumer((msg:Message) => {
return new Promise((resolve, reject) => {
setTimout(() => resolve({key:'value'}), 1000);
});
});
The example lists the following example code
queue.activateConsumer((message) => {
Should end with }) instead of }
Under https://github.com/abreits/amqp-ts/wiki/Queue-class#rpc, the documentation says that the function has a routingKey argument. However, in fact it does not.
The publish version of this library (on NPM) is not JS.
If we use this library... ts will try to compile... and it will break.. if you have a different version, from the one built, and this lib is kind of old.
When publishing to npm you should publish only the build version (js), not the ts files.
throw error
Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - fast reply consumer does not exist
Hi! I've realized that each .declareExchange or .declareQueue opens a new channel.
Can someone explain why was it designed like that instead of using a single channel?
TypeScript 3.0 introduced the unknown
type. It is type safer than any
type for this case, because it forces you to check message type before using it.
I don't exactly know how but very often i see the tcp connection to the rabbitmq server keeps incresing gradually. The increasing connection is quite problematic as it floods the rabbitmq server where the live connection gets blocked. When the connection increases i can see the following logs porduced by amqp-ts in client application:
Unhandled rejection IllegalOperationError: Channel closed
at Channel.<anonymous> (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:149:11)
at Channel.C._rpc (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:131:8)
at Channel.rpc (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/callback_model.js:73:8)
at Channel.bindQueue (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/callback_model.js:121:17)
at /tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqp-ts/lib/amqp-ts.js:1088:36
at tryCatcher (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:547:31)
at Promise._settlePromise (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:604:18)
at Promise._settlePromiseCtx (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/promise.js:641:10)
at _drainQueueStep (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:97:12)
at _drainQueue (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:86:9)
at Async._drainQueues (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:102:5)
at Immediate.Async.drainQueues [as _onImmediate] (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/bluebird/js/release/async.js:15:14)
at runCallback (timers.js:694:18)
at tryOnImmediate (timers.js:665:5)
Unhandled rejection Error: Channel ended, no reply will be forthcoming
at rej (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:190:7)
at Channel.C._rejectPending (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:192:28)
at Channel.C.toClosed (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:160:8)
at Connection.C._closeChannels (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:392:18)
at Connection.C.toClosed (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:399:8)
at Connection.C.onSocketError (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:353:10)
at Connection.emit (events.js:182:13)
at Connection.EventEmitter.emit (domain.js:442:20)
at Socket.go (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:479:12)
at Socket.emit (events.js:182:13)
at Socket.EventEmitter.emit (domain.js:442:20)
at emitReadable_ (_stream_readable.js:534:12)
at process._tickCallback (internal/process/next_tick.js:63:19)
Unhandled rejection Error: Operation failed: QueueDeclare; 405 (RESOURCE-LOCKED) with message "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '54:b2:03:0b:12:d1_ota' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration."
at reply (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:127:17)
at Channel.C.accept (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/channel.js:401:7)
at Connection.mainAccept [as accept] (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:63:33)
at Socket.go (/tmp/.mount_mps-ui3OFi3Q/resources/app.asar/node_modules/amqplib/lib/connection.js:476:48)
i.e. there seems to appear IllegalOperationError: Channel closed
along with QueueDeclare; 405 (RESOURCE-LOCKED)
unhandled rejection errors. In addition to that there is no exact steps to recreate the issue as of now, but this issue is arising quite often.
I am receiving the message, with all the right details, and then I am seeing this error. Any ideas?
I'm encountering an issue with message acknowledgment when the connection is reconnected or re-established. After successfully reconnecting, I attempt to acknowledge the received message, but it results in an "IllegalOperationError: Channel closed" error.
Hi and thank you for the project 👍
I was testing some error scenarios and found unexpected behaviour when using the Connection class' reconnectStrategy
.
amqp-ts seems to handle closing connections differently in scenario 1 and 2. Scenario 1 works as expected while scenario 2 doesn't reconnect, but leaves consumers hanging as if nothing happened.
By hooking up the 'close' callback, I see that only Scenario 1 results in a isFatalError=true
from the dependency amqplib
. The 'close' callback also allows me to detect that Scenario 2 doesn't reconnect and results in isFatalError=false
.
If Scenario 1 happens, the 'close' callback is only invoked on the first 'close' event. Since I have to handle Scenario 2 manually this unfortunately renders the reconnect feature unuseful for me, since I can't handle the manual reconnects if a successful reconnect has already happened.
Scenario 1: reconnects as expected ✅
kill -9 <PID>
)true
.Scenario 2: reconnect doesn't happen on CONNECTION-FORCED ❌
sudo systemctl restart rabbitmq-server.service
)false
and error: "Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'".Scenario 3: 'close' callback isn't invoked after succesfull reconnect (scenario 1) ❌
kill -9 <PID>
)sudo systemctl restart rabbitmq-server.service
)Errors while compiling:
node_modules/amqp-ts/lib/amqp-ts.d.ts:9:26 - error TS7016: Could not find a declaration file for module 'amqplib/callback_api'.
node_modules/amqp-ts/lib/amqp-ts.d.ts:10:26 - error TS7016: Could not find a declaration file for module 'bluebird'.
I solved it "temporally" adding these types to my own project:
npm install -D @types/bluebird
npm install -D @types/amqplib
I ran into a delivery loop (event always failed, always redelivered), and found out that allUpTo is false when I pass false to nack, but requeue isn't being passed through.
http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
This is the suspect line, when I added false as the second parameter, the problem went away
https://github.com/abreits/amqp-ts/blob/master/src/amqp-ts.ts#L393
in the mean time we are using ack instead to deal with this.
I'm unable to listen on events from the Connection class since the Connection class does not extend the EventEmitter eventhough the typings files states that the Connection does extend the EventEmitter, same does the sourcecode of amqplib (https://github.com/squaremo/amqp.node/blob/master/lib/connection.js#L56)
Should I create a pull request or could you fix it yourself? Should be a very quick fix.
Best Regards
Thomas
I'm troubleshooting some slow performance in my application that I've narrowed down to the amqp-ts consumer component. I have a go-lang based sender that sends messages to a queue and a typescript/node component that listens on that queue (using amqp-ts) and sends the response back on a different queue. A basic test script is at the bottom.
It seems like message.ack() returns almost immediately (<1ms), but instrumenting the various parts of my application has shown that the go-lang receiver gets the message and sends a new message before the first message is actually shown as acknowledged in rabbitmq. Said differently:
With prefetch = 1, this results in about 40ms of latency being added to the response. If I bump prefech to 2, then every other message is +40ms with other messages only being 10ms..
What is message.ack() doing? is it acknowledging receipt in the queue (e.g. RabbitMQ) or in the consumer of the queue (e.g. my go-lang based component)?
I'm using RabbitMQ 3.8.8
Code:
import * as Amqp from "amqp-ts";
const conn = new Amqp.Connection(
"amqp://user:pass@rabbitmq:5672/",
{ timeout: 1000 },
{ retries: 1, interval: 1000 },
);
conn.completeConfiguration().then(() => {
const recvQueue = conn.declareQueue("rpc-ugabuga", {
durable: true,
autoDelete: false,
prefetch: 1,
});
const respQueue = conn.declareQueue("ugabuga", {
durable: true,
autoDelete: false,
prefetch: 1,
});
console.log("consumer activating...");
recvQueue.activateConsumer(
(message: Amqp.Message) => {
const req = message.getContent();
const resp = {
correlationId: req.correlationId,
timestamps: req.timestamps,
};
const msg = new Amqp.Message(resp);
msg.properties.correlationId = "correlation-id-1";
respQueue.send(msg);
message.ack();
},
{ consumerTag: "my-consumer", manualAck: true },
);
});
When I enable amqp-ts in my angular 8.0.0 project, I am getting the following error.
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'D:\backup_projects\angular\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'D:\backup_projects\angular\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'D:\backup_projects\angular\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
In the DeclarationOptions there is a prefetch but it is not present in the DeclarationOptions in the compiled lib.
Hi, I'm not sure if I'm doing a noob mistake here, my code is something like this
const connection = await new amqp.Connection("amqp://...");
const queue = await connection.declareQueue('rpc_queue', {durable: false});
const numbers = [5, 10, 15, 20];
console.time("calc");
// eslint-disable-next-line id-length
const calc = numbers.map((n) => queue.rpc(n));
const results : amqp.Message[] = await Promise.all(calc);
results.forEach((response) => {
console.info("getting value ", response.getContent());
});
console.timeEnd("calc");
and I'm getting
(node:81089) UnhandledPromiseRejectionWarning: Error: amqp-ts: Queue.rpc error: Operation failed: BasicConsume; 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - reply consumer already set"
at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/lib/amqp-ts.js:804:32
at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/callback_model.js:61:10
at /Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/callback_model.js:189:12
at reply (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/channel.js:127:14)
at Channel.C.accept (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/channel.js:401:7)
at Connection.mainAccept [as accept] (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/connection.js:63:33)
at Socket.go (/Users/Admin/Downloads/proj/avanti/bita_dashboard_websocket_backend/node_modules/amqp-ts/node_modules/amqplib/lib/connection.js:476:48)
at Socket.emit (events.js:210:5)
at Socket.EventEmitter.emit (domain.js:475:20)
at emitReadable_ (_stream_readable.js:575:12)
at processTicksAndRejections (internal/process/task_queues.js:79:21)
(node:81089) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:81089) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
it's possible achieve something like what am I trying or it's not possible? thank you so much!
Hello,
I have a question for you, here a use case :
...
// define the consumer for the queue
queue.activateConsumer( async (message) => {
let msg = message.getContent();
if (await service.isExist(message.content.id)) {
// other logic here
}
}, {noAck: true});
I can't do something like this because activateConsumer will call my arrow function without awaiting. So it will be like "fire and forget pattern". if service.isExist fails, I will get an unhandledexception.
The solution would be :
// define the consumer for the queue
queue.activateConsumer((message) => {
let msg = message.getContent();
service.isExist(message.content.id).then((isExist)=> {
if(isExist) {
// other logic here
}
}).catch(doSomething);
}, {noAck: true});
Is there a better alternative ? It would be nice to use es6 features.
Line 533 in a9b6300
err
on this line is implicitly any
, which fails typechecking if you're using strict TS.
I want to use [prefetch](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue)
.
API reference in this wiki looks forget the option.
Hi, i am getting the following error:
> tsc
node_modules/amqp-ts/lib/amqp-ts.d.ts:13:33 - error TS2694: Namespace 'winston' has no exported member 'LoggerInstance'.
13 export declare var log: winston.LoggerInstance;
Found 1 error.
I think that this package is still using the old winston version (2.2.0) and my project is using winston version 3. Maybe thats the issue. In that case can we update this package to use latest winston?
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'net' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
i 「wdm」: Failed to compile.
i 「wdm」: Compiling...
Date: 2019-07-16T08:35:44.021Z - Hash: b43aaf5a02261fe67b79
15 unchanged chunks
Time: 11871ms
WARNING in ./node_modules/amqp-ts/node_modules/colors/lib/colors.js 127:29-43
Critical dependency: the request of a dependency is an expression
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'crypto' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'fs' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'http' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'https' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'net' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/exception.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/console.js
Module not found: Error: Can't resolve 'os' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/lib/amqp-ts.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'path' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connection.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_writable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/readable-stream/lib/_stream_readable.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\readable-stream\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/common.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/logger.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/http.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
ERROR in ./node_modules/isstream/isstream.js
Module not found: Error: Can't resolve 'stream' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\isstream'
ERROR in ./node_modules/amqp-ts/node_modules/amqplib/lib/connect.js
Module not found: Error: Can't resolve 'tls' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\amqplib\lib'
ERROR in ./node_modules/amqp-ts/node_modules/winston/lib/winston/transports/file.js
Module not found: Error: Can't resolve 'zlib' in 'E:\BACK_OFFICE\NEWThemeCode\node_modules\amqp-ts\node_modules\winston\lib\winston\transports'
The error traces to
https://github.com/abreits/amqp-ts/blob/master/src/amqp-ts.ts#L17
I've only seen it in the repl, It goes away if I set AMQPTS_APPLICATIONNAME in the env, but it seems a little unnecessary to kill the require if it isn't set.
to reproduce, open a node repl in a folder that has amqp-ts installed in the node_modules. Then require('amqp-ts')
Hi, thanks for this useful library. I was wondering if you have plans to upgrade amqlib to the latest 0.8.1?
First, really appreciate this library, we are using it in our production system and so far so good. The only thing is that the build-up of retries in 1.0.0 causes a really scary side effect when credentials are the reason for the inability to connect:
I appreciate you getting a fix out in 1.0.1, I might just recommend publishing a warning in the npm module for 1.0.0 during install since this can knock over the rabbitmq cluster.
I try to send a message to the queue, after that I see this error.
Queue.onMessage RPC promise returned error: mapper_parsing_exception module=amqp-ts
I cannot understand what I am doing wrong. This is my worker code:
const exchange = connection.declareExchange(process.env.OBJECT_CHILD_QUEUE);
const queue = connection.declareQueue(process.env.OBJECT_CHILD_QUEUE, {
prefetch: 1,
});
queue.bind(exchange);
queue.activateConsumer(
async (message) => {
const messageID = message.getContent();
await service.subscribe(
messageID,
(retry: boolean) => {
if (retry) {
message.nack();
} else {
message.ack();
}
}
);
},
{ noAck: false, manualAck: true }
);
I am using amqp-ts to communicate between my microservices. This is so far the easiest to understand library i was able to found. Thank you for writing it!
However at the moment I'm not sure what to do because I would need to create an RPC worker which gets data from a server and than sends the results in a reply to the other service. Unfortunately, when I try to make multiple request I get this error:
Error: amqp-ts: Queue.rpc error: Operation failed: BasicConsume; 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - reply consumer already set"
I was searching the internet and I understand so far that I need to sort the ansewrs by correlationID to be able to receive multiple messages. But I've found nothing useful about this error. I'm not really sure either how to attach the correlation ID. I even tried to create a new connection for every call but that doesn't seem efficient and it also resulted errors.
Any help is appreciated.
I've been reviewing your code with the idea of possibly using it in production, however it appears that there is an issue with sending a message, the Exchange.send function (and the underlying publish method) don't return promises, so there is no way for the user of the app to confirm that the message they think they sent actually got to the broker.
If I'm reading the code correctly, if the code looses connection to the broker it will try to reconnect and these "in flight" messages may get sent, but in the event of a long outage and a restart of the app these messages will just get lost. Since there is no promise for the sender to wait on, there is also no way of limiting the number of messages stuck waiting on getting published
hi, how to mark the message as persistent or delivery mode propery as 2?
When a connection attempt fails (integration-test provided with incorrect credentials), the Promises bound to:
connection.completeConfiguration()
connection.initialized
are not rejected, instead, they are resolved and all errors are silently ignored. Reconnects continue to happen, even when reconnect-strategy is configured without retries: { retries: 0, interval: 1500 }
Hi,
Could you add passive queue declaration option ?
The function getContent() always returns a string which, in the case of binary data, causes data loss. Maybe the code should look in the contentType to determine if a string value has to be returned and raw data otherwise like this:
Message.prototype.getContent = function () {
console.log("getContent::Message "+this.properties.contentType+" received: " + this.content.length);
var content = this.content;
if(this.properties.contentType == "text/plain") {
console.log("getContent::Message received plain text: " + content.length);
content = this.content.toString();
}
I have tested this with:
cat /usr/share/pixmaps/debian-logo.png | amqp-publish -u amqp://RABBITNODE -e images -C application/octet-stream
and the data is indeed of the correct size.
Function doesn't resolve created promise.
i'm trying to convert native this code using amqp-ts
amqp.connect(configuration.amqpUrl, function (err: any, conn: any) {
conn.createChannel(function (err: any, ch: any) {
ch.assertQueue('', { exclusive: true }, function (err: any, q: any) {
let corr = uuid.v4();
ch.consume(q.queue, function (msg: any) {
if (msg.properties.correlationId == corr && msg.content != null) {
let resData = JSON.parse(msg.content);
res.json(resData);
}
}, { noAck: true });
ch.sendToQueue(queue, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: q.queue });
});
});
});
but createChannel , assertQueue not implemented or not there ?
http://www.squaremobius.net/amqp.node/channel_api.html#model_createChannel
Hi,
I have to close channels and connection after message send.
If i try connection.close() it doesn't work, it reconnect immediately...
How i can solve this?
No commits during 2017. Docs seem broken/no API docs. Claims to be production ready, but is it really?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.