Comments (11)
amqp-ts was designed to make it easy to connect to amqp, without the need to create channels or wait for callbacks or promises to resolve before continuing. It uses the library you mention as a base.
The code you give could be written as follows with amqp.ts in typescript:
import * as Amqp from "amqp-ts";
// connect to the amqp server
var connection = new Amqp.Connection(configuration.amqpUrl);
// define a new queue
var queue = connection.declareQueue("", {exclusive: true});
// define the consumer for the queue
queue.activateConsumer( message => {
let msg = message.getContent();
if (msg.properties.correlationId == corr && msg.content != null) {
let resData = JSON.parse(msg.content);
res.json(resData);
}
}, {noAck: true});
// send a message to the queue
queue.send(new Message({ correlationId: corr, replyTo: q.queue });
// or, if you want to absolutely make sure that the queue is created before you send something
connection.completeConfiguration().then(() => {
queue.send(new Message({ correlationId: corr, replyTo: q.queue });
}).catch(err => {
console.log("An error occurred: " + err.message);
});
from amqp-ts.
Oops, just noticed that I messed up.
This should be better:
import * as Amqp from "amqp-ts";
// connect to the amqp server
var connection = new Amqp.Connection(configuration.amqpUrl);
// define a new queue
var queue = connection.declareQueue("", {exclusive: true});
let corr = uuid.v4();
// define the consumer for the queue
queue.activateConsumer( msg => {
let content = msg.getContent();
if (msg.properties.correlationId == corr && content != null) {
res.json(content); // I don't know what this line does
}
}, {noAck: true});
// send a message to the queue
queue.send(new Message(req, { correlationId: corr, replyTo: q.queue });
// or, if you want to absolutely make sure that the everything is initialized before you send something
connection.completeConfiguration().then(() => {
queue.send(new Message(req, { correlationId: corr, replyTo: q.queue });
}).catch(err => {
console.log("An error occurred: " + err.message);
});
from amqp-ts.
Hello @abreits thanks for the help i'm referring RPC call on my implementation. i have microservice to consume messages on queue. but above implementation api request it self consume the request from queue. but i need something like this
may be using this code it will possible
queue.rpc(num).then(function(result) {
console.log(' [.] Got ', result.getContent());
});
but problem was we cannot pass Message to rpc method (cannot pass correlationId, replyTo as properties) or i might understood this in wrong way. could you please help me to undestand.
from amqp-ts.
Do you mean that you have an existing rpc server and you need to write the client, or that you can both write the server and client (if the latter, then you can use the code in tutorial 6:
https://github.com/abreits/amqp-ts/tree/master/tutorials/6_rpc
If however you have an already existing server implementation then you need to do the following:
- connect to the reply_to queue
- connect the consumer that processes the rpc response to the reply_to queue
- connect to the rpc queue
- send the message to the rpc queue (with the correct parameters)
something like this:
// connect to the reply_to queue
// amqp-ts does not work well with autogenerated queue names, so we generate a name
let responseQueueName = "rpc-response-" + uuid.v4();
let rpcResponseQueue = connection.declareQueue(responseQueueName, {exclusive: true});
//create a correlation id (if needed)
let corr = uuid.v4();
// define the consumer function (can also be done inline)
function rpcConsumer (msg: Amqp.Message) {
rpcResponse = msg.getContent();
// process rpc response, e.g.
if (msg.properties.correlationId == corr && rpcResponse != null) {
res.json(rpcResponse);
}
}
}
// connect to consumer to the queue
rpcResponseQueue.activateConsumer(rpcConsumer, {noAck: true});
// connect to the rpc queue
let rpcQueue = connection.declareQueue(rpcQueueName);
// send a message to the rpc queue
let rpcRequest = new Amqp.Message(rpcRequestContent, {correlation_id: corr, reply_to: responseQueueName});
rpcQueue.send(rpcRequest);
from amqp-ts.
Thanks you very much!! That did the job!! thanks again..
from amqp-ts.
Sorry for the bothering is there any specific reason to use exclusive = true
because server cannot access to exclusive queue to give the response
from amqp-ts.
No reason, I just added it because it was present in your original example. If it causes problems just remove it.
from amqp-ts.
when consuming message i'm getting error
error: Queue.onMessage consumer function returned error: content is not a buffer module=amqp-ts
i thought it because of queue option but something else
from amqp-ts.
According to the error something went wrong in your consumer function, try to debug it there.
from amqp-ts.
i have dig in to source code and realized i'm calling this function to initialize consumer
var connection = new amqp.Connection(configuration["amqpUrl"]);
var queueReq = connection.declareQueue(queue, { durable: false });
queueReq.activateConsumer((message: amqp.Message) => {
/* some logic go there */
}, { noAck: true })
but when i call queueReq.activateConsumer
as a parameter message
typeof Message getting
but in source this line
it again try to create message var message = new Message(msg.content, msg.properties);
but msg.content
is a already Buffer
from amqp-ts.
That is not a problem, a new Message can be created with a Buffer, it will then be used as is.
see how the content of a new Message object is created:
https://github.com/abreits/amqp-ts/blob/master/src/amqp-ts.ts#L327
from amqp-ts.
Related Issues (20)
- Not able to find these modules HOT 2
- Implicit any type, fails strict TS HOT 1
- Minor queue.rpc documentation issue HOT 2
- Unhandled rejection TypeError: message.sendTo is not a function HOT 2
- Namespace 'winston' has no exported member 'LoggerInstance'. HOT 2
- IllegalOperationError: Channel closed HOT 2
- Using amqp-ts for angular 8.0.0 throws error HOT 5
- RPC: how can I send multiples messages to some queue (error reply consumer already set)
- Queue.onMessage RPC promise returned error: mapper_parsing_exception module=amqp-ts
- How to apply the timeout mechanism in RPC.
- Exchange.rpc function doesn't work correctly.
- Channel for each queue and exchange
- Close connection
- message.ack() takes 40+ milliseconds to actually ack HOT 5
- when network is bad ,then reconnection ,and the RPC don't work HOT 1
- Change Message.getContent() return to "unknown" type
- Upgrade to latest amqlib version
- missing @types/amqplib and @types/bluebird
- Unable to Connect RabbitMq from Browser using amqp-ts HOT 2
- IllegalOperationError: Channel closed
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amqp-ts.