Giter Site home page Giter Site logo

Comments (6)

cressie176 avatar cressie176 commented on June 3, 2024

Hi @vahidsabet,

There are a few individual errors/oversights in your code sample which I'll highlight first...

  1. If you are using a connection url, specify the heartbeat as a query parameter, e.g. amqp://localhost:5672?heartbeat=30 not as a second string parameter
  2. await channel.bind
  3. await channel.prefetch
  4. register an error listener on the connection
  5. register an error listener on the channel
  6. handle a connection error or channel error - the easiest option is to restart the application. Alternatively you may need to re-establish both and consume again.
  7. handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
  8. handle a poison message (e.g. one which isn't JSON)

The PRECONDITION_FAILED error is occurring because one of your consumers is not acknowledging the message within 180s. This is not a fault with amqplib however. It could be because an error is occurring, preventing channel.ack from being reached. Using wireshark would confirm whether the channel.ack is being sent.

Regarding your questions

  • How to consume fanout exchange and some queue efficiently? Need I create consumers separately in different containers and different connection for each?

It depends on whether a single container using a shared connection can handle the throughput of messages. A single container with one consumer would be the most scalable (providing you did not exceed the number of connections the broker can handle), but could also be unnecessary.

  • How to consume a queue for db Log, aqueue for bot log, a queue for send data via axios and the last one for emit message?

The way you are doing it is OK providing you fix the above issues

from amqplib.

vahidsabet avatar vahidsabet commented on June 3, 2024

Thanks dear @cressie176 and appreciate your response as I found you expert of message brokers.
I've changed issues you mentioned as follows:

  const mbConnect = async () => {
  
      let conn = await amqp.connect(amqpUrl+"?heartbeat=30")
      let channel = await conn.createChannel()
    
      conn.on("error", function(err) {
        if (err.message !== "Connection closing") {
          console.error('new catch Err' + err.message)
        }
      });
    
      conn.on("close", function() {
        console.error('@avsvahid', "AMQP reconnecting")
        return setTimeout(mbConnect , 10000);
      })
    
      channel.on("error", function(err) {
         console.error('AMQP channel error' + err.message)
         return setTimeout(mbConnect , 10000);
      })
    
      channel.on("close", function() {
        console.error('AMQP channel closed' + err.message)
      })
    
      return channel
  }

    const channel = await mbConnect();
    process.once('SIGINT', async () => {
        logger.log('got sigint, closing connection ');
        await channel.close();
        await connection.close();
        process.exit(0);
});

await channel.assertExchange(logsEx,"fanout", {durable: true});
const tellogQueue = await channel.assertQueue("tellog");
const dblogQueue = await channel.assertQueue("dblog");
const assetQueue = await channel.assertQueue("getasset");

await  channel.bindQueue(tellogQueue.queue,logsEx);
await  channel.bindQueue(dblogQueue.queue,logsEx);
await channel.bindQueue(assetQueue.queue,logsEx);

await channel.prefetch(1);

await channel.consume(tellogQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());

    await axios.post('https://api.telegram.org/bot/sendMessage', {
      chat_id: chid,
      text: "tellogQueue consumed ",
      parse_mode:'HTML'
    }
    .then(async response => {
        await channel.ack(msg);
    })
    .catch(err => {
        console.error(err);
    })
}, { noAck: false });

await channel.consume(dblogQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());
    console.log('dblogQueue consumed ' + mo.data);
    channel.ack(msg);
}, { noAck: false });

await channel.consume(assetQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());
    console.log('assetQueue consumed ' + mo.data);
    channel.ack(msg);
}, { noAck: false });

I didn't actually get the point of your latest comments:

  • handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
  • handle a poison message (e.g. one which isn't JSON)
    could you please explain it in my code?

It's my server on docker. Is it ok for my scenario? Transactions come to the queues and consome for different purposes and I most be sure that all the messages arrived to the destination and acked. Fanout is better or topic?
Sometimes I get Error: connect ETIMEDOUT AxiosError [AggregateError]

image

from amqplib.

cressie176 avatar cressie176 commented on June 3, 2024

handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)

If the broker cancels a consumer amqplib will invoke the consumer callback with a null message. You should therefore check for this at the start of your consumer function.

handle a poison message (e.g. one which isn't JSON)

If you receive a bad or "poison" message which causes your consumer function to throw an error, the channel will be closed and the message automatically rolled back. If your application restarts/resubscribes, the message will be redelivered, causing the error to be rethrown indefinitely. There are a couple of ways to avoid this, one is to use a quorum queue with a delivery limit. Another is to catch any errors caused by an invalid message and nack them. In your example, someone might publish a message which isn't JSON, causing the JSON.parse line to fail.

connect ETIMEDOUT AxiosError [AggregateError]

This sounds like it's the HTTP request which is failing intermittently. It's most likely to do with the server or the network, but nothing to do with RabbitMQ or amqplib. You could reduce the chance of it happening by retrying

Fanout is better or topic?

I tend to prefer topics because they are more flexible. I believe fanouts are faster though.

from amqplib.

vahidsabet avatar vahidsabet commented on June 3, 2024

Dear @cressie176,
Is my mbConnect function safe enough to reconnect on any error?

could you please handle cancel in my script:

await channel.consume(dblogQueue.queue, async (msg) => {
//Handle cancel and nack the message
let mo = JSON.parse(msg.content.toString());
console.log('dblogQueue consumed ' + mo.data);
channel.ack(msg);
}, { noAck: false });

"connect ETIMEDOUT AxiosError [AggregateError]" issue was about using "node:alpine" image

In my scenario, consumers most execute and guarantied for send messages to endpoints.

from amqplib.

cressie176 avatar cressie176 commented on June 3, 2024

Is my mbConnect function safe enough to reconnect on any error?

You're going in the right direction but unfortunately it ends up being a little more complicated. It's possible to get multiple events for the same scenario. For example, if you force close a connection from the RabbitMQ admin UI, you get both a channel close event and a connection close event. Similarly you can get multiple error events from the same connection. Therefore it is best to remove all your event handlers whenever an event is received. You will also need to re-consume once you've created the new channel.

could you please handle cancel in my script:

await channel.consume(dblogQueue.queue, async (msg) => {
if (message === null) {
  // reconsume
}

You will need to do this for all consumers not just the dblog queoe

from amqplib.

vahidsabet avatar vahidsabet commented on June 3, 2024

It resolved dear @cressie176 according your guidance. Thank you very much.

from amqplib.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.