Comments (6)
Hi @vahidsabet,
There are a few individual errors/oversights in your code sample which I'll highlight first...
- If you are using a connection url, specify the heartbeat as a query parameter, e.g.
amqp://localhost:5672?heartbeat=30
not as a second string parameter - await channel.bind
- await channel.prefetch
- register an error listener on the connection
- register an error listener on the channel
- handle a connection error or channel error - the easiest option is to restart the application. Alternatively you may need to re-establish both and consume again.
- handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
- handle a poison message (e.g. one which isn't JSON)
The PRECONDITION_FAILED error is occurring because one of your consumers is not acknowledging the message within 180s. This is not a fault with amqplib however. It could be because an error is occurring, preventing channel.ack from being reached. Using wireshark would confirm whether the channel.ack is being sent.
Regarding your questions
- How to consume fanout exchange and some queue efficiently? Need I create consumers separately in different containers and different connection for each?
It depends on whether a single container using a shared connection can handle the throughput of messages. A single container with one consumer would be the most scalable (providing you did not exceed the number of connections the broker can handle), but could also be unnecessary.
- How to consume a queue for db Log, aqueue for bot log, a queue for send data via axios and the last one for emit message?
The way you are doing it is OK providing you fix the above issues
from amqplib.
Thanks dear @cressie176 and appreciate your response as I found you expert of message brokers.
I've changed issues you mentioned as follows:
const mbConnect = async () => {
let conn = await amqp.connect(amqpUrl+"?heartbeat=30")
let channel = await conn.createChannel()
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error('new catch Err' + err.message)
}
});
conn.on("close", function() {
console.error('@avsvahid', "AMQP reconnecting")
return setTimeout(mbConnect , 10000);
})
channel.on("error", function(err) {
console.error('AMQP channel error' + err.message)
return setTimeout(mbConnect , 10000);
})
channel.on("close", function() {
console.error('AMQP channel closed' + err.message)
})
return channel
}
const channel = await mbConnect();
process.once('SIGINT', async () => {
logger.log('got sigint, closing connection ');
await channel.close();
await connection.close();
process.exit(0);
});
await channel.assertExchange(logsEx,"fanout", {durable: true});
const tellogQueue = await channel.assertQueue("tellog");
const dblogQueue = await channel.assertQueue("dblog");
const assetQueue = await channel.assertQueue("getasset");
await channel.bindQueue(tellogQueue.queue,logsEx);
await channel.bindQueue(dblogQueue.queue,logsEx);
await channel.bindQueue(assetQueue.queue,logsEx);
await channel.prefetch(1);
await channel.consume(tellogQueue.queue, async (msg) => {
let mo = JSON.parse(msg.content.toString());
await axios.post('https://api.telegram.org/bot/sendMessage', {
chat_id: chid,
text: "tellogQueue consumed ",
parse_mode:'HTML'
}
.then(async response => {
await channel.ack(msg);
})
.catch(err => {
console.error(err);
})
}, { noAck: false });
await channel.consume(dblogQueue.queue, async (msg) => {
let mo = JSON.parse(msg.content.toString());
console.log('dblogQueue consumed ' + mo.data);
channel.ack(msg);
}, { noAck: false });
await channel.consume(assetQueue.queue, async (msg) => {
let mo = JSON.parse(msg.content.toString());
console.log('assetQueue consumed ' + mo.data);
channel.ack(msg);
}, { noAck: false });
I didn't actually get the point of your latest comments:
- handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
- handle a poison message (e.g. one which isn't JSON)
could you please explain it in my code?
It's my server on docker. Is it ok for my scenario? Transactions come to the queues and consome for different purposes and I most be sure that all the messages arrived to the destination and acked. Fanout is better or topic?
Sometimes I get Error: connect ETIMEDOUT AxiosError [AggregateError]
from amqplib.
handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
If the broker cancels a consumer amqplib will invoke the consumer callback with a null message. You should therefore check for this at the start of your consumer function.
handle a poison message (e.g. one which isn't JSON)
If you receive a bad or "poison" message which causes your consumer function to throw an error, the channel will be closed and the message automatically rolled back. If your application restarts/resubscribes, the message will be redelivered, causing the error to be rethrown indefinitely. There are a couple of ways to avoid this, one is to use a quorum queue with a delivery limit. Another is to catch any errors caused by an invalid message and nack them. In your example, someone might publish a message which isn't JSON, causing the JSON.parse line to fail.
connect ETIMEDOUT AxiosError [AggregateError]
This sounds like it's the HTTP request which is failing intermittently. It's most likely to do with the server or the network, but nothing to do with RabbitMQ or amqplib. You could reduce the chance of it happening by retrying
Fanout is better or topic?
I tend to prefer topics because they are more flexible. I believe fanouts are faster though.
from amqplib.
Dear @cressie176,
Is my mbConnect function safe enough to reconnect on any error?
could you please handle cancel in my script:
await channel.consume(dblogQueue.queue, async (msg) => {
//Handle cancel and nack the message
let mo = JSON.parse(msg.content.toString());
console.log('dblogQueue consumed ' + mo.data);
channel.ack(msg);
}, { noAck: false });
"connect ETIMEDOUT AxiosError [AggregateError]" issue was about using "node:alpine" image
In my scenario, consumers most execute and guarantied for send messages to endpoints.
from amqplib.
Is my mbConnect function safe enough to reconnect on any error?
You're going in the right direction but unfortunately it ends up being a little more complicated. It's possible to get multiple events for the same scenario. For example, if you force close a connection from the RabbitMQ admin UI, you get both a channel close event and a connection close event. Similarly you can get multiple error events from the same connection. Therefore it is best to remove all your event handlers whenever an event is received. You will also need to re-consume once you've created the new channel.
could you please handle cancel in my script:
await channel.consume(dblogQueue.queue, async (msg) => {
if (message === null) {
// reconsume
}
You will need to do this for all consumers not just the dblog queoe
from amqplib.
It resolved dear @cressie176 according your guidance. Thank you very much.
from amqplib.
Related Issues (20)
- no error event on createChannel since V10.0.0 HOT 1
- Consider removing `readable-stream`? HOT 1
- Set a custom header before nack-ing a message HOT 5
- No Error emitted when Queue disappears or is deleted HOT 2
- SDK connecting without errors after RabbitMQ service is shut down HOT 9
- Error emitted before handler is attached causes "Unhandled 'error' event" HOT 3
- consume does not respect promises? HOT 5
- Trailing dot in URL causes TLS failure HOT 2
- docs: channel api github page has broken top links HOT 2
- docs: confusing and inconsistent usage of terms "bork", "splode", "close" HOT 7
- global prefetch become 0 if per-consume prefetch set HOT 2
- ERR_SSL_EMS_NOT_ENABLED HOT 6
- How to determine if channel.ack() is successful? HOT 2
- CPU intensive controller prevents AMQP connection from receiving heartbeat, hence connections gets closed. HOT 2
- How to know if message is published ? HOT 6
- Aborting a Message Processing when RMQ Channel Closes HOT 8
- I'm encountering an "Unexpected close" error HOT 2
- Module not found: Can't resolve 'querystring' HOT 2
- Feature: Add support for AMQP 0.9.1 "update-secret" HOT 11
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amqplib.