Giter Site home page Giter Site logo

onebeyond / rascal Goto Github PK

View Code? Open in Web Editor NEW
434.0 17.0 69.0 2.12 MB

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.

License: MIT License

JavaScript 99.98% Shell 0.02%
rascal

rascal's Introduction

Rascal

Rascal is a rich pub/sub wrapper around amqplib.

NPM version NPM downloads Node.js CI Code Climate Test Coverage rascal Discover zUnit

About

Rascal is a rich pub/sub wrapper for the excellent amqplib. One of the best things about amqplib is that it doesn't make assumptions about how you use it. Another is that it doesn't attempt to abstract away AMQP Concepts. As a result the library offers a great deal of control and flexibility, but the onus is on you adopt appropriate patterns and configuration. You need to be aware that:

  • Messages are not persistent by default and will be lost if your broker restarts
  • Messages that crash your app will be infinitely retried
  • Without prefetch a sudden flood of messages may bust your event loop
  • Dropped connections and borked channels will not be automatically recovered
  • Any connection or channel errors are emitted as "error" events. Unless you handle them or use domains these will cause your application to crash
  • If a message is published using a confirm channel, and the broker fails to acknowledge, the flow of execution may be blocked indefinitely

Rascal seeks to either solve these problems, make them easier to deal with or bring them to your attention by adding the following to amqplib

  • Config driven vhosts, exchanges, queues, bindings, producers and consumers
  • Cluster connection support
  • Transparent content parsing
  • Transparent encryption / decryption
  • Automatic reconnection and resubscription
  • Advanced error handling including delayed, limited retries
  • RPC Support
  • Redelivery protection
  • Channel pooling
  • Flow control
  • Publication timeouts
  • Safe defaults
  • Promise and callback support
  • TDD support

Concepts

Rascal extends the existing RabbitMQ Concepts of Brokers, Vhosts, Exchanges, Queues, Channels and Connections with two new ones

  1. Publications
  2. Subscriptions

A publication is a named configuration for publishing a message, including the destination queue or exchange, routing configuration, encryption profile and reliability guarantees, message options, etc. A subscription is a named configuration for consuming messages, including the source queue, encryption profile, content encoding, delivery options (e.g. acknowledgement handling and prefetch), etc. These must be configured and supplied when creating the Rascal broker. After the broker has been created the subscriptions and publications can be retrieved from the broker and used to publish and consume messages.

Breaking Changes

Please refer to the Change Log

Special Note

RabbitMQ 3.8.0 introduced quorum queues. Although quorum queues may not be suitable in all situations, they provide poison message handling without the need for an external redelivery counter and offer better data safety in the event of a network partition. You can read more about them here and here.

Examples

Async/Await

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config');

(async () => {
  try {
    const broker = await Broker.create(config);
    broker.on('error', console.error);

    // Publish a message
    const publication = await broker.publish('demo_publication', 'Hello World!');
    publication.on('error', console.error);

    // Consume a message
    const subscription = await broker.subscribe('demo_subscription');
    subscription
      .on('message', (message, content, ackOrNack) => {
        console.log(content);
        ackOrNack();
      })
      .on('error', console.error);
  } catch (err) {
    console.error(err);
  }
})();

Callbacks

const Broker = require('rascal').Broker;
const config = require('./config');

Broker.create(config, (err, broker) => {
  if (err) throw err;

  broker.on('error', console.error);

  // Publish a message
  broker.publish('demo_publication', 'Hello World!', (err, publication) => {
    if (err) throw err;
    publication.on('error', console.error);
  });

  // Consume a message
  broker.subscribe('demo_subscription', (err, subscription) => {
    if (err) throw err;
    subscription
      .on('message', (message, content, ackOrNack) => {
        console.log(content);
        ackOrNack();
      })
      .on('error', console.error);
  });
});

See here for more examples.

Avoiding Potential Message Loss

There are three situations when Rascal will nack a message without requeue, leading to potential data loss.

  1. When it is unable to parse the message content and the subscriber has no 'invalid_content' listener
  2. When the subscriber's (optional) redelivery limit has been exceeded and the subscriber has neither a 'redeliveries_error' nor a 'redeliveries_exceeded' listener
  3. When attempting to recover by republishing, forwarding, but the recovery operation fails.

The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.

Very Important Section About Event Handling

amqplib emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering.

There are four places where you need to register error handlers.

  1. Immediately after obtaining a broker instance

    broker.on('error', (err, { vhost, connectionUrl }) => {
      console.error('Broker error', err, vhost, connectionUrl);
    });
  2. After subscribing to a channel

    // Async/Await
    try {
      const subscription = await broker.subscribe('s1');
      subscription
        .on('message', (message, content, ackOrNack) => {
          // Do stuff with message
        })
        .on('error', (err) => {
          console.error('Subscriber error', err);
        });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    // Callbacks
    broker.subscribe('s1', (err, subscription) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      subscription
        .on('message', (message, content, ackOrNack) => {
          // Do stuff with message
        })
        .on('error', (err) => {
          console.error('Subscriber error', err);
        });
    });
  3. After publishing a message

    // Async/Await
    try {
      const publication = await broker.publish('p1', 'some text');
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    // Callbacks
    broker.publish('p1', 'some text', (err, publication) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    });
  4. After forwarding a message

    // Async/Await
    try {
      const publication = await broker.forward('p1', message);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    // Callbacks
    broker.forward('p1', message, (err, publication) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    });

Other Broker Events

vhost_initialised

The broker emits the vhost_initialised event after recovering from a connection error. An object containing the vhost name and connection url (with obfuscated password) are passed to the event handler. e.g.

broker.on('vhost_initialised', ({ vhost, connectionUrl }) => {
  console.log(`Vhost: ${vhost} was initialised using connection: ${connectionUrl}`);
});

blocked / unblocked

RabbitMQ notifies clients of blocked and unblocked connections, which rascal forwards from the connection to the broker. e.g.

broker.on('blocked', (reason, { vhost, connectionUrl }) => {
  console.log(`Vhost: ${vhost} was blocked using connection: ${connectionUrl}. Reason: ${reason}`);
});
broker.on('unblocked', ({ vhost, connectionUrl }) => {
  console.log(`Vhost: ${vhost} was unblocked using connection: ${connectionUrl}.`);
});

Configuration

Rascal is highly configurable, but ships with what we consider to be sensible defaults (optimised for reliability rather than speed) for production and test environments.

var rascal = require('rascal');
var definitions = require('./your-config.json');
var config = rascal.withDefaultConfig(definitions);

or

var rascal = require('rascal');
var definitions = require('./your-test-config.json');
var config = rascal.withTestConfig(definitions);

We advise you to review these defaults before using them in an environment you care about.

The most common configuration options are

A simple configuration is shown below. You can reference Rascal's JSON schema from the config to enable validation and suggestions in compatible IDEs.

{
  "$schema": "./node_modules/rascal/lib/config/schema.json",
  "vhosts": {
    "/": {
      "connection": {
        "url": "amqp://user:[email protected]:5742/"
      },
      "exchanges": ["demo_ex"],
      "queues": ["demo_q"],
      "bindings": ["demo_ex[a.b.c] -> demo_q"],
      "publications": {
        "demo_pub": {
          "exchange": "demo_ex",
          "routingKey": "a.b.c"
        }
      },
      "subscriptions": {
        "demo_sub": {
          "queue": "demo_q",
          "prefetch": 3
        }
      }
    }
  }
}

Vhosts

connection

The simplest way to specify a connection is with a url

{
  "vhosts": {
    "v1": {
      "connection": "amqp://guest:[email protected]:5672/v1?heartbeat=10"
    }
  }
}

As of Rascal v18.0.0 you must URL encode special characters appearing in the username, password and vhost, e.g. amqp://guest:secr%[email protected]:5672/v1?heartbeat=10

Alternatively you can specify the individual connection details

{
  "vhosts": {
    "v1": {
      "connection": {
        "slashes": true,
        "protocol": "amqp",
        "hostname": "localhost",
        "user": "guest",
        "password": "guest",
        "port": 5672,
        "vhost": "v1",
        "options": {
          "heartbeat": 5
        },
        "socketOptions": {
          "timeout": 10000
        }
      }
    }
  }
}

Special characters do not need to be encoded when specified in this form.

Any attributes you add to the "options" sub document will be converted to query parameters. Any attributes you add in the "socketOptions" sub document will be passed directly to amqplib's connect method (which hands them off to net or tls. Providing you merge your configuration with the default configuration rascal.withDefaultConfig(config) you need only specify the attributes you want to override

{
  "vhosts": {
    "v1": {
      "connection": {
        "hostname": "broker.example.com",
        "user": "bob",
        "password": "secret"
      }
    }
  }
}

Rascal also supports automatic connection retries. It's enabled in the default config, or you want enable it specifically as follows.

{
  "vhosts": {
    "v1": {
      "connection": {
        "retry": {
          "min": 1000,
          "max": 60000,
          "factor": 2,
          "strategy": "exponential"
        }
      }
    }
  }
}

or

{
  "vhosts": {
    "v1": {
      "connection": {
        "retry": {
          "min": 1000,
          "max": 5000,
          "strategy": "linear"
        }
      }
    }
  }
}

The exponential configuration will cause rascal to retry the connection at exponentially increasing intervals to a maximum of one minute. The intervals are adjusted by a random amount so that if you have multiple services they will not all reconnect at the same time.

The linear configuration will cause rascal to retry the connection at linearly increasing intervals, between one and five seconds.

Cluster Connections

If you specify an array of connections instead of a single connection object Rascal will order then as per the connection strategy at startup, and cycle through until it obtains a connection or exhausts all hosts.

{
  "vhosts": {
    "v1": {
      "connectionStrategy": "random",
      "connections": ["amqp://guest:[email protected]:5672/v1?heartbeat=10", "amqp://guest:[email protected]:5672/v1?heartbeat=10", "amqp://guest:[email protected]:5672/v1?heartbeat=10"]
    }
  }
}

The default connection strategy is random, but if you prefer an active/passive configuration you should use fixed.

broker.getConnections()

You can see the list of Rascal managed connections by calling broker.getConnections(). This will return an array similar to the following...

[
  { "vhost": "/", "connectionUrl": "amqp://guest:***@localhost:5672?heartbeat=50&connection_timeout=10000&channelMax=100" }
  { "vhost": "other", "connectionUrl": "amqp://guest:***@localhost:5672/other?heartbeat=50&connection_timeout=10000&channelMax=100" }
]

Management connection configuration

Please note: this functionality is mainly useful in test environments, since it does not create users or grant them permissions to vhosts

The AMQP protocol doesn't support assertion or checking of vhosts, so Rascal uses the RabbitMQ management API to achieve a similar result. The management connection configuration is derived from defaults and the vhost connection, but can be explicitly specified as follows...

{
  "vhosts": {
    "v1": {
      "connection": {
        "hostname": "broker.example.com",
        "user": "bob",
        "password": "secret",
        "management": {
          "protocol": "https",
          "pathname": "prefix",
          "user": "admin",
          "password": "super-secret",
          "options": {
            "timeout": 1000
          }
        }
      }
    }
  }
}

Rascal uses superagent under the hood. URL configuration is also supported.

{
  "vhosts": {
    "v1": {
      "connections": [
        {
          "url": "amqp://guest:[email protected]:5672/v1?heartbeat=10",
          "management": "http://guest:[email protected]:15672"
        },
        {
          "url": "amqp://guest:[email protected]:5672/v1?heartbeat=10",
          "management": "http://guest:[email protected]:15672"
        },
        {
          "url": "amqp://guest:[email protected]:5672/v1?heartbeat=10",
          "management": "http://guest:[email protected]:15672"
        }
      ]
    }
  }
}

You can also supply your own agent via the broker components. Use this when you need to set TLS options.

const superagent = require('superagent-defaults');
const agent = superagent().on('request', (req) => console.log(req.url));
const components = { agent };
const broker = await Broker.create(config, components);

concurrency

If you have a high number of exchanges, queues and bindings you may wish to initialise Rascal using multiple channels to improve startup time. Do this per vhost by setting the concurrency attribute to the number of channels you want to create and use.

{
  "vhosts": {
    "v1": {
      "concurrency": 10
    }
  }
}

assert

When set to true, Rascal will create the vhost if one doesn't exist using the RabbitMQ management API. This requires the management plugin to be installed on the broker and for the management user to have necessary permissions.

{
  "vhosts": {
    "v1": {
      "assert": true
    }
  }
}

check

When set to true, Rascal will check that the vhost exists using the RabbitMQ management API. This requires the management plugin to be installed on the broker and for the management user to have necessary permissions.

{
  "vhosts": {
    "v1": {
      "check": true
    }
  }
}

Channel pooling

Rascal useds pools channels it uses for publishing messages. It creates two pools per vhost - one for confirm channels, and other one for regular channels. The default maximum pool size is 5 and the minimum 1, but neither pool will be created until first use (override this by setting autostart: true). Idle channels are automatically evicted from the pool. The pool configuration can be adjusted through config, which is passed through to the underlying generic-pool library.

{
  "vhosts": {
    "v1": {
      "publicationChannelPools": {
        "regularPool": {
          "max": 10,
          "min": 5,
          "evictionRunIntervalMillis": 10000,
          "idleTimeoutMillis": 60000,
          "autostart": true
        },
        "confirmPool": {
          "max": 10,
          "min": 5,
          "evictionRunIntervalMillis": 10000,
          "idleTimeoutMillis": 60000,
          "autostart": true
        }
      }
    }
  }
}

Unfortunately there is a bug in generic-pool's implementation, which means that if the pool fails to create a channel, it can enter a tight loop, thrashing your CPU and potentially crashing your node process due to a memory leak. While we assess the long term use of pooling, we have put in a workaround. Errors will only be rejected after a configurable delay. This defaults to one second but can be overridden through the rejectionDelayMillis pool attribute. Special thanks to @willthrom for helping diagnose and fix this issue.

Flow Control

amqplib flow control dictates channels act like stream.Writable when Rascal calls channel.publish or channel.sendToQueue, returning false when the channel is saturated and true if it is not. While it is possible to ignore this and keep publishing messages, it is preferable to apply back pressure to the message source. You can do this by listening to the broker busy and ready events. Busy events are emitted when the number of outstanding channel requests reach the pool max size, and ready events emitted when the outstanding channel requests falls back down to zero. The pool details are passed to both event handlers so you can take selective action.

broker.on('busy', ({ vhost, mode, queue, size, available, borrowed, min, max }) => {
  if (vhost === 'events') return eventStream.pause();
  console.warn(`vhost ${vhost} is busy`);
});

broker.on('ready', ({ vhost, mode, queue, size, available, borrowed, min, max }) => {
  if (vhost === 'events') return eventStream.resume();
  console.info(`vhost ${vhost} is ready`);
});

namespace

Running automated tests against shared queues and exchanges is problematic. Messages left over from a previous test run can cause assertions to fail. Rascal has several strategies which help you cope with this problem, one of which is to namespace your queues and exchange.

{
  "vhosts": {
    "v1": {
      "namespace": true
    }
  }
}

If you specify "namespace" :true Rascal will prefix the queues and exchanges it creates with a uuid. Alternatively you can specify your own namespace, "namespace": "foo". Namespaces are also if you want to use a single vhost locally but multiple vhosts in other environments.

Exchanges

assert

Setting assert to true will cause Rascal to create the exchange on initialisation. If the exchange already exists and has the same configuration (type, durability, etc) everything will be fine, however if the existing exchange has a different configuration an error will be returned. Assert is enabled in the default configuration.

check

If you don't want to create exchanges on initialisation, but still want to validate that they exist set assert to false and check to true

{
  "vhosts": {
    "v1": {
      "exchanges": {
        "e1": {
          "assert": false,
          "check": true
        }
      }
    }
  }
}
type

Declares the exchange type. Must be one of direct, topic, headers or fanout. The default configuration sets the exchange type to "topic" unless overridden.

options

Define any further configuration in an options block

{
  "vhosts": {
    "v1": {
      "exchanges": {
        "e1": {
          "type": "fanout",
          "options": {
            "durable": false
          }
        }
      }
    }
  }
}

Refer to the amqplib documentation for further exchange options.

Queues

assert

Setting assert to true will cause Rascal to create the queue on initialisation. If the queue already exists and has the same configuration (durability, etc) everything will be fine, however if the existing queue has a different configuration an error will be returned. Assert is enabled in the default configuration.

check

If you don't want to create queues on initialisation, but still want to validate that they exist set assert to false and check to true

{
  "vhosts": {
    "v1": {
      "queues": {
        "q1": {
          "assert": false,
          "check": true
        }
      }
    }
  }
}
purge

Enable to purge the queue during initialisation. Useful when running automated tests

{
  "vhosts": {
    "v1": {
      "queues": {
        "q1": {
          "purge": true
        }
      }
    }
  }
}
replyTo

Sometimes you want to publish a message, and have the consumer of the message send a reply to the same application instance that published the original message. This can be difficult if you application is deployed using multiple instances which share a common configuration. Quite often the solution is to make your application stateless so it doesn't matter which instance receives the reply. An alternative is to mark the queue as a reply queue using the replyTo.

{
  "queues": {
    "q1": {
      "replyTo": true
    }
  },
  "publications": {
    "exchange": "e1",
    "replyTo": "q1"
  }
}

When true, Rascal will append a uuid to the queue name so that it is unique for each instance of the application. Use this conjunction with the publication replyTo property, to automatically set the replyTo property on outbound messages to the unique queue name. You may also want to make the queue non durable and exclusive too (see below).

options

Define any further configuration in an options block

{
  "queues": {
    "q1": {
      "options": {
        "durable": false,
        "exclusive": true
      }
    }
  }
}

To define a queue with extensions such as x-queue-type add arguments to the options block, e.g.

{
  "queues": {
    "q1": {
      "options": {
        "durable": false,
        "arguments": {
          "x-message-ttl": 65000,
          "x-queue-mode": "lazy"
        }
      }
    }
  }
}

Refer to the amqplib documentation for further queue options.

streams

Rascal supports RabbitMQ Streams via x-queue-type argument, i.e.

{
  "queues": {
    "q1": {
      "options": {
        "arguments": {
          "x-queue-type": "stream"
        }
      }
    }
  }
}

The Stream Plugin and associated binary protocol extension are not supported.

Streams are not a replacement for regular messaging - instead they are best suited for when you can tolerate occasional message loss and need for higher throughput, such as sampling web based analytics.

When working with streams you need to think carefully about data retention. Unless you specify retention configuration, messages will never be deleted and eventually you will run out of space. Conversely, if you automatically delete messages based on queue size or age, they may be lost without ever being read.

You also need to think about how you will track the consumer offset. Typically you will need to store this in a database after successfully processing the message and use it to tell the broker where to resume from after your application restarts. For example...

  const initialOffset = (await loadOffset('/my-queue')) || 'first';

  const overrides = {
    options: {
      arguments: {
        'x-stream-offset': initialOffset
      }
    }
  };

  const subscription = await broker.subscribe('/my-queue', overrides);

  subscription.on('message', async (message, content, ackOrNack) => {
    const currentOffset = message.properties.headers['x-stream-offset'];
    try {
      await handleMessage(content);
      await updateOffset('/my-queue', currentOffset);
    } catch (err) {
      await handleError('/my-queue', currentOffset, err);
    } finally {
      ackOrNack(); // Streams do not support nack so do not pass the error argument
    }
  });

However, if your application is offline for too long, and messages are still being published to the stream, it may not be able to resume from where you left off, since those messages may have been deleted. Furthermore, if your application consumes messages concurrently, you need to think about how you will recover should one fail. If you naively override the previouly saved offset, you may be replacing a higher/later offset with an lower/older one, causing in your application to restart from the wrong point. Finally, you also need to decide what to do if the message cannot be processed. You cannot simply replay the message since you are working with a stream, rather than a queue. You could cancel the subscription and resume from the current offset, but this will lead to duplicates if you have been consuming messages concurrently. Alternatively you could republish the failures to a dead letter queue and process them separately.

For the above reasons, we only recommend considering streams when you genuinely need the extra throughput.

bindings

You can bind exchanges to exchanges, or exchanges to queues.

{
  "vhosts": {
    "v1": {
      "exchanges": {
        "e1": {}
      },
      "queues": {
        "q1": {}
      },
      "bindings": {
        "b1": {
          "source": "e1",
          "destination": "q1",
          "destinationType": "queue",
          "bindingKey": "foo"
        }
      }
    }
  }
}

When using Rascals defaults, destinationType will default to "queue" and "bindingKey" will default to "#" (although this is only applicable for topics anyway)

Should you want to bind a destination to the same source with multiple binding keys, instead of duplicating the configuration you can specify an array of binding keys using either the "bindingKey" or "bindingKeys" attribute

{
  "vhosts": {
    "v1": {
      "exchanges": {
        "e1": {}
      },
      "queues": {
        "q1": {}
      },
      "bindings": {
        "b1": {
          "source": "e1",
          "destination": "q1",
          "destinationType": "queue",
          "bindingKeys": ["foo", "bar"]
        }
      }
    }
  }
}

If you want to bind to a headers exchange specify the appropriate binding options

{
  "vhosts": {
    "v1": {
      "exchanges": {
        "e1": {
          "type": "headers"
        }
      },
      "queues": {
        "q1": {}
      },
      "bindings": {
        "b1": {
          "source": "e1",
          "destination": "q1",
          "options": {
            "x-match": "all",
            "foo": "bar"
          }
        }
      }
    }
  }
}

Publications

Now that you've bound your queues and exchanges, you need to start sending them messages. This is where publications come in.

{
  "publications": {
    "p1": {
      "vhost": "v1",
      "exchange": "e1",
      "routingKey": "foo"
    }
  }
}
broker.publish('p1', 'some message');

If you prefer to send messages to a queue

{
  "publications": {
    "p1": {
      "vhost": "v1",
      "queue": "q1"
    }
  }
}

To save you entering the vhost you can nest publications inside the vhost block. Rascal also creates default publications for every queue and exchange so providing you don't need to specify any additional options you don't need to include a publications block at all. Auto created publications have the following configuration

{
  "publications": {
    "/e1": {
      "vhost": "/",
      "exchange": "e1",
      "autoCreated": true
    },
    "v1/q1": {
      "vhost": "/",
      "queue": "q1",
      "autoCreated": true
    }
  }
}

Rascal supports text, buffers and anything it can JSON.stringify. Rascal will automatically set the content type to text/plain for strings, application/json for objects and application/octet-stream when encrypting messages. Alternatively you can explicitly set the content type through the contentType option.

The broker.publish method is overloaded to accept a runtime routing key or options.

broker.publish('p1', 'some message', callback);
broker.publish('p1', 'some message', 'some.routing.key', callback);
broker.publish('p1', 'some message', {
  routingKey: 'some.routing.key',
  options: { messageId: 'foo', expiration: 5000 },
});
await broker.publish('p1', 'some message');
await broker.publish('p1', 'some message', 'some.routing.key');
await broker.publish('p1', 'some message', {
  routingKey: 'some.routing.key',
  options: { messageId: 'foo', expiration: 5000 },
});

The callback parameters are err (indicating the publication could not be found) and publication. Listen to the publication's "success" event to obtain confirmation that the message was successfully published (when using confirm channels) and the "error" event to handle errors. The "return" event will be emitted when the message was successfully published but not routed. It is possible to access the messageId from all handlers, either via the supplied messageId or the returned message itself (see below)

If you specify the "mandatory" option (or use Rascal's defaults) you can also listen for returned messages (i.e. messages that were not delivered to any queues). Before a message is returned, you will still get the "success" event, since the message was successfully published. A common mistake is to resolve a promise from the "success" event handler and reject from the "return" event handler, since the latter would have no effect.

broker.publish('p1', 'some message', (err, publication) => {
  if (err) throw err; // publication didn't exist
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('return', (message) => {
      console.warn('Message was returned: ', message.properties.messageId);
    });
});
try {
  const publication = await broker.publish('p1', 'some message');
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('return', (message) => {
      console.warn('Message was returned: ', message.properties.messageId);
    });
} catch (err) {
  // publication didn't exist
}

One publish option you should be aware of is the "persistent". Unless persistent is true, your messages will be discarded when you restart Rabbit. Despite having an impact on performance Rascal sets this in its default configuration.

Refer to the amqplib documentation for further exchange options.

It's important to realise that even though publication emits a "success" event, this offers no guarantee that the message has been sent UNLESS you use a confirm channel. Providing you use Rascal's defaults publications will always be confirmed.

{
  "publications": {
    "p1": {
      "exchange": "e1",
      "vhost": "v1",
      "confirm": true
    }
  }
}

Publishing to a queue via the default exchange

If you would like to publish directly to a queue, but you don't know the queue name ahead of time, you can use the fact that all queues are automatically bound to the default exchange with the routing key which is the same as the queue name.

You can publish directly to the queue:

broker.publish('/', 'content', 'q1', (err, publication) => { ... });

See the "default-exchange" in the examples directory for a full working example.

Timeouts

When you publish a message using a confirm channel, amqplib will wait for an acknowledgement that the message was safely received by the broker, and in a clustered environment replicated to all nodes. If something goes wrong, the broker will not send the acknowledgement, amqplib will never execute the callback, and the associated flow of execution will never be resumed. Rascal guards against this by adding publication timeouts. If the timeout expires, then Rascal will close the channel and emit a error event from the publication, however there will still be an unavoidable memory leak as amqplib's callback will never be cleared up. The default timeout is 10 seconds but can be overridden in config. The setting is ignored for normal channels and can be disabled by specifying 0.

{
  "publications": {
    "p1": {
      "exchange": "e1",
      "vhost": "v1",
      "confirm": true,
      "timeout": 10000
    }
  }
}

If you start experiencing publication timeouts you may find it useful to monitor the publication statistics via the publication.stats object, which includes the duration of Rascal's low level publish operations.

Aborting

Rascal uses a channel pool to publish messages. Access to the channel pool is synchronised via an in memory queue, which will be paused if the connection to the broker is temporarily lost. Consequently instead of erroring, publishes will be held until the connection is re-established. If you would rather abort under these circumstances, you can listen for the publication 'paused' event, and call publication.abort(). When the connection is re-established any aborted messages will be dropped instead of published.

broker.publish('p1', 'some message', (err, publication) => {
  if (err) throw err; // publication didn't exist
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('paused', (messageId) => {
      console.warn('Publication was paused. Aborting message: ', messageId);
      publication.abort();
    });
});
try {
  const publication = await broker.publish('p1', 'some message');
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('paused', (messageId) => {
      console.warn('Publication was paused. Aborting message: ', messageId);
      publication.abort();
    });
} catch (err) {
  // publication didn't exist
}

Encrypting messages

Rascal can be configured to automatically encrypt outbound messages.

{
  "vhosts": {
    "v1": {
      "exchanges": ["e1"]
    }
  },
  "publications": {
    "p1": {
      "exchange": "e1",
      "vhost": "v1",
      "confirm": true,
      "encryption": "well-known-v1"
    }
  },
  "encryption": {
    "well-known-v1": {
      "key": "f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315",
      "ivLength": 16,
      "algorithm": "aes-256-cbc"
    }
  }
}

Rascal will set the content type for encrypted messages to 'application/octet-stream'. It stashes the original content type in a header. Providing you use a correctly configured subscription, the message will be automatically decrypted, and normal content handling applied.

Forwarding messages

Sometimes you want to forward a message to a publication. This may be part of a shovel program for transferring messages between vhosts, or because you want to ensure a sequence in some workflow, but do not need to modify the original message. Rascal supports this via broker.forward. The syntax is similar to broker.publish except from you pass in the original message you want to be forwarded instead of the message payload. If the publication or overrides don't specify a routing key, the original forwarding key will be maintained. The message will also be CC'd with an additional routingkey of <queue>.<routingKey> which can be useful for some retry scenarios.

broker.forward('p1', message, overrides, (err, publication) => {
  if (err) throw err; // publication didn't exist
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('return', (message) => {
      console.warn('Message was returned: ', message.properties.messageId);
    });
});
try {
  const publication = await broker.forward('p1', message, overrides);
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('return', (message) => {
      console.warn('Message was returned: ', message.properties.messageId);
    });
} catch (err) {
  // publication didn't exist
}

Prior to version 10.0.0, if you used Rascal to consume a forwarded message, the subscriber would automatically restore the original routing key and exchange to the message.fields before emitting it. This was added to support the delayed retry loop advanced recovery strategy, but should not have been applied to broker.forward. From version 10.0.0 this behaviour has been disabled for broker.forward but you can turn it back on by setting restoreRoutingHeaders to true in the overrides. You can also disable this behaviour in the forward and republish recovery strategies by setting restoreRoutingHeaders to false.

Since there is no native, transactional support for forwarding in amqplib, you are at risk of receiving duplicate messages when using broker.forward

Subscriptions

The real fun begins with subscriptions

{
  "subscriptions": {
    "s1": {
      "queue": "e1",
      "vhost": "v1"
    }
  }
}
broker.subscribe('s1', (err, subscription) => {
  if (err) throw err; // subscription didn't exist
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    });
});
try {
  const subscription = await broker.subscribe('s1');
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    });
} catch (err) {
  // subscription didn't exist
}

It's very important that you handle errors emitted by the subscriber. If not an underlying channel error will bubble up to the uncaught error handler and crash your node process.

Prior to Rascal 4.0.0 it was also very important not to go async between getting the subscription and listening for the message or error events. If you did, you risked leaking messages and not handling errors. For Rascal 4.0.0 and beyond, subscriptions are lazily applied when you add the message handler. Because registering event handlers is synchronous, but setting up RabbitMQ consumers is asynchronous, we've also added the subscribed event in case you need to wait until the subscription has been successfully established.

Rascal supports text, buffers and anything it can JSON.parse, providing the contentType message property is set correctly. Text messages should be set to "text/plain" and JSON messages to "application/json". Other content types will be returned as a Buffer. If the publisher doesn't set the contentType or you want to override it you can do so in the subscriber configuration.

{
  "subscriptions": {
    "s1": {
      "queue": "e1",
      "vhost": "v1",
      "contentType": "application/json"
    }
  }
}

The broker.subscribe method also accepts an options parameter which will override options specified in config

broker.subscribe('s1', { prefetch: 10, retry: false }, callback);
await subscription = broker.subscribe("s1", { prefetch: 10, retry: false })

The arguments passed to the message event handler are function(message, content, ackOrNack), where message is the raw message, the content (a buffer, text, or object) and an ackOrNack callback. This ackOrNack callback should only be used for messages which were not { "options": { "noAck": true } } by the subscription configuration or the options passed to broker.subscribe. For more details on acking or nacking messages see Message Acknowledgement and Recovery Strategies.

As with publications, you can nest subscriptions inside the vhost block. Rascal creates default subscriptions for every queue so providing you don't need to specify any additional options you don't need to include a subscriptions block at all.

Subscribe All

You can subscribe to multiple subscriptions using broker.subscribeAll.

broker.subscribeAll((err, subscriptions) => {
  if (err) throw err; // one or more subscriptions didn't exist
  subscriptions.forEach((subscription) => {
    subscription
      .on('message', (message, content, ackOrNack) => {
        // Do stuff with message
      })
      .on('error', (err) => {
        console.error('Subscriber error', err);
      });
  });
});
try {
  const subscriptions = await broker.subscribeAll();
  subscriptions.forEach((subscription) => {
    subscription
      .on('message', (message, content, ackOrNack) => {
        // Do stuff with message
      })
      .on('error', (err) => {
        console.error('Subscriber error', err);
      });
  });
} catch (err) {
  // One or more subscriptions didn't exist
}

subscribeAll takes a filter so you can ignore subscriptions if required. This is especially useful for ignoring the rascals default subscriptions. e.g.

broker.subscribeAll(
  (s) => !s.autoCreated,
  (err, subscriptions) => {
    if (err) throw err; // one or more subscriptions didn't exist
    subscriptions.forEach((subscription) => {
      subscription
        .on('message', (message, content, ackOrNack) => {
          // Do stuff with message
        })
        .on('error', (err) => {
          console.error('Subscriber error', err);
        });
    });
  }
);
try {
  const subscriptions = await broker.subscribeAll(s => !s.autoCreated) => {
  subscriptions.forEach(subscription => {
    subscription.on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    }).on('error', (err) => {
      console.error('Subscriber error', err)
    })
  });
} catch(err) {
  // One or more subscriptions didn't exist
}

Invalid Messages

If rascal can't parse the content (e.g. the message had a content type of 'application/json' but the content was not JSON), it will emit an 'invalid_content' event

broker.subscribe('s1', (err, subscription) => {
  if (err) throw err; // subscription didn't exist
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    })
    .on('invalid_content', (err, message, ackOrNack) => {
      console.error('Invalid content', err);
      ackOrNack(err);
    });
});
try {
  const subscription = await broker.subscribe('s1');
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    })
    .on('invalid_content', (err, message, ackOrNack) => {
      console.error('Invalid content', err);
      ackOrNack(err);
    });
} catch (err) {
  // subscription didn't exist
}

If the message has not been auto-acknowledged you should ackOrNack it. If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead, leading to message loss if you have not configured a dead letter exchange/queue.

Handling Cancel Notifications

The RabbitMQ broker may cancel the consumer if the queue is deleted or the node on which the queue is located fails. amqplib handles this by delivering a null message. When Rascal receives the null message it will

  1. Emit a cancel event from the subscription.
  2. Emit an error event from the subscription if the cancel event was not handled
  3. Optionally attempt to resubscribe as per normal retry configuration. If the queue was deleted rather than being failed over, the queue will not automatically be re-created and retry attempts will fail indefinitely.

Decrypting messages

Rascal can be configured to automatically decrypt inbound messages.

{
  "vhosts": {
    "v1": {
      "queues": ["e1"]
    }
  },
  "subscriptions": {
    "s1": {
      "queue": "e1",
      "vhost": "v1"
    }
  },
  "encryption": {
    "well-known-v1": {
      "key": "f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315",
      "ivLength": 16,
      "algorithm": "aes-256-cbc"
    }
  }
}

Any message that was published using the "well-known-v1" encryption profile will be automatically decrypted by the subscriber.

Dealing With Redeliveries

If your app crashes before acknowledging a message, the message will be rolled back. It is common for node applications to automatically restart, however if the crash was caused by something in the message content, it will crash and restart indefinitely, thrashing the host. Prior to version 3.8.0, RabbitMQ didn't allow you to limit the number of redeliveries per message or provide a redelivery count. This is now possible using quorum queues, but for those on older versions, or in situations where quorum queues are not appropriate, subscribers can be configured with a redelivery counter and will update the message.properties.headers.rascal.redeliveries header with the number of hits. If the number of redeliveries exceeds the subscribers limit, the subscriber will emit a "redeliveries_exceeded" event, and can be handled by your application. e.g.

"subscriptions": {
    "s1": {
        "vhost": "/",
        "queue": "q1",
        "redeliveries": {
            "limit": 10,
            "counter": "<counter name>"
        }
    }
},
"redeliveries": {
    "counters": {
        "<counter name>": {
            "type": "<counter type>",
            "size": 1000,
        }
    }
}
broker.subscribe('s1', (err, subscription) => {
  if (err) throw err; // subscription didn't exist
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    })
    .on('redeliveries_exceeded', (err, message, ackOrNack) => {
      console.error('Redeliveries exceeded', err);
      ackOrNack(err);
    });
});
try {
  const subscription = await broker.subscribe('s1');
  subscription
    .on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    })
    .on('error', (err) => {
      console.error('Subscriber error', err);
    })
    .on('redeliveries_exceeded', (err, message, ackOrNack) => {
      console.error('Redeliveries exceeded', err);
      ackOrNack(err);
    });
} catch (err) {
  // subscription didn't exist
}

If you do not listen for the redeliveries_exceeded event rascal will nack the message without requeue leading to message loss if you have not configured a dead letter exchange/queue.

Rascal provides three counter implementations:

  1. stub - this is the default and does nothing.
  2. inMemory - useful only for testing since if your node process crashes, the counter will be vaporised too
  3. inMemoryCluster - like the inMemory, but since the counter resides in the master it survives worker crashes.

Of the three only inMemoryCluster is useful in production, and then only if you are using clustering. See the advanced example for how to configure it.

Implementing your own counter

If your application is not clustered, but you still want to protect yourself from redeliveries, you need to implement your own counter backed by something like redis. In times of high message volumes the counter will be hit hard so you should make sure it's fast and resilient to failure/slow responses from the underlying store.

See here for a redis backed counter.

Message Acknowledgement and Recovery Strategies

For messages which are not auto-acknowledged (the default) calling ackOrNack() with no arguments will acknowledge it. Calling ackOrNack(err) will nack the message using Rascal's default recovery strategy (nack with requeue). Calling ackOrNack(err, recoveryOptions) will trigger the specified recovery strategy or strategies. You can also acknowledge all outstanding messages on the channel by calling ackOrNack(null, { all: true }).

When using the callback API, you can call ackOrNack without a callback and errors will be emitted by the subscription. Alternatively you can specify a callback as the final argument irrespective of what other arguments you provide.

When using the promises API, ackOrNack will work as for the callback API unless you explicitly set promisifyAckOrNack to true on the subscription. If you do enable this feature, be sure to catch rejections.

Nack (Reject or Dead Letter)
ackOrNack(err, { strategy: 'nack' });

Nack causes the message to be discarded or routed to a dead letter exchange if configured. You can also negatively acknowledge all outstanding messages on a channel as follows

ackOrNack(err, { strategy: 'nack', all: true });
Nack with Requeue
ackOrNack(err, { strategy: 'nack', defer: 1000, requeue: true });

The defer option is not mandatory, but without it you are likely retry your message thousands of times a second. Even then requeuing is an inadequate strategy for error handling, since the message will be rolled back to the front of the queue and there is no simple way to detect how many times the message has been redelivered.

Dead lettering is a good option for invalid messages but with one major flaw - because the message cannot be modified it cannot be annotated with the error details. This makes it difficult to do anything useful with messages once dead lettered.

Republish
ackOrNack(err, { strategy: 'republish', defer: 1000 });

An alternative to nacking to republish the message back to the queue it came from. This has the advantage that the message will be resent to the back of the queue, allowing other messages to be processed and potentially fixing errors relating to ordering.

Rascal keeps track of the number of republishes so you can limit the number of attempts. Whenever you specify a number of attempts you should always chain a fallback strategy, otherwise if the attempts are exceeded your message will be neither acked or nacked.

ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]);

Rascal also annotates the message with detail of the error message.properties.headers.rascal.<queue>.error which can be useful if you eventually dead letter it.

Before using republish please consider the following:

  1. Rascal will copy messages properties from the original message to the republished one. If you set an expiration time on the original message this will also be recopied, effectively resetting it.

  2. Rascal will ack the original message after successfully publishing the copy. This does not take place in a distributed transaction so there is a potential of the original message being rolled back after the copy has been published (the dead-letter delay loop also suffers from this).

  3. Rascal will republish original message using a confirm channel, if the publish fails, the original message will not be nacked (You should mitigate this by chaining recovery strategies).

  4. Republishing has the side-effect of clearing message.fields.exchange and setting message.fields.routingKey to the queue name. These fields may be used by your application or by the broker when routing rejected messages to a dead letter exchange. Rascal mitigates the first of these problems by restoring the original values before passing the message to the consumer, however if the message is subsequently rejected/nacked, this information will no longer be available on the broker's copy of the message and may still cause problems for dead letter routing. To resolve this you can bind the dead letter queue to the dead letter exchange twice, using both the original routing key pattern and the queue name. Alternatively you can specify an explicit dead letter routing key by way of the x-dead-letter-routing-key argument when defining the queue.

Republish with immediate nack

As mentioned previously, dead lettering invalid messages is a good strategy with one flaw - since there is no way to modify the message you cannot annotate it with failure details. A solution to this is to republish with attempts = 1 and then nacking it to a dead letter exchange. The problem with this approach is that invalid messages will always be processed twice. To workaround this set immediateNack to true in the recovery options. This will instruct Rascal to nack the message immediately instead of emitting the 'message' event.

ackOrNack(err, { strategy: 'republish', immediateNack: true });

If you ever want to resend the message to the same queue you will have to remove the properties.headers.rascal.<queue>.immediateNack header first.

Forward

Instead of republishing the message to the same queue you can forward it to a Rascal publication. You should read the section entitled Forwarding messages to understand the risks of this.

ackOrNack(err, { strategy: 'forward', publication: 'some_exchange' });

Danger As with the Republish strategy, you can limit the number of forward attempts. Whenever you specify a number of attempts you should always chain a fallback strategy, otherwise if the attempts are exceeded your message will be neither acked or nacked.

Furthermore if the message is forwarded but cannot be routed (e.g. due to an incorrect binding), the message will be returned after Rascal receives a 'success' event from amqplib. Consequently the message will have been ack'd. Any subsequent fallback strategy which attempts to ack or nack the message will fail, and so the message may lost. The subscription will emit an error event under such circumstances.

ackOrNack(err, [
  {
    strategy: 'forward',
    publication: 'some_exchange',
    defer: 1000,
    attempts: 10,
  },
  { strategy: 'nack' },
]);

You can also override the publication options

ackOrNack(err, [
  {
    strategy: 'forward',
    publication: 'some_exchange',
    options: { routingKey: 'custom.routing.key' },
  },
  { strategy: 'nack' },
]);

One use of the forward recovery strategy is to send messages to a wait queue which will dead-letter them after a period of time. Repeated dead lettering causes some versions of RabbitMQ to crash. If you encounter this issue upgrade RabbitMQ or specify xDeathFix: true which will delete any x-death headers on the message before forwarding.

Ack

Acknowledges the message, guaranteeing that it will be discarded in the event you also have a dead letter exchange configured. Sometimes useful in automated tests or when chaining a sequence of other recovery strategies.

ackOrNack(err, { strategy: 'ack' });

Chaining Recovery Strategies

By chaining Rascal's recovery strategies and leveraging some of RabbitMQ's lesser used features such as message you can achieve some quite sophisticated error handling. A simple combination of republish and nack (with dead letter) will enable you to retry the message a maximum number of times before dead letting it.

ackOrNack(err, [
  {
    strategy: 'republish',
    defer: 1000,
    attempts: 10,
  },
  {
    strategy: 'nack',
  },
]);

Far more sophisticated strategies are achievable... Retry BackOff Fail

  1. Producer publishes a message with the routing key "a.b.c" to the "jobs" topic exchange
  2. The message is routed to the "incoming" queue. The "incoming" queue is configured with a dead letter exchange.
  3. The message is consumed from the queue. An error occurs, triggering the recovery process.
  4. The message is decorated with two CC routing keys, "delay.5m" and "incoming.a.b.c", and published to the delay exchange
  5. The message is routed to the "5-minute" queue, which is configured to dead letter the message after five minutes.
  6. After five minutes the message is dead lettered to the "retry" topic exchange.
  7. The message is routed back to the original queue using the CC routing key "incoming.a.b.c"

Steps 3 - 7 will repeat up to attempts times. If all attempts fail...

  1. The consumer dead letters the message, routing it to the "dead-letters" exchange
  2. The message is routed to the "dead-letters" queue

prefetch

Prefetch limits the number of unacknowledged messages a subscription can have outstanding. It's a great way to ensure that you don't overload your event loop or a downstream service. Rascal's default configuration sets the prefetch to 10 which may seem low, but we've managed to knock out firewalls, breach AWS thresholds and all sorts of other things by setting it to higher values.

channelPrefetch

Channel prefetch is like prefetch but operates at a channel rather than a consumer level. Since Rascal uses a dedicated channel per subscriber there is rarely any point setting it. Moreover, a channel prefetch has a considerable overhead, especially in a clustered environment so can significantly impact performance. The only reason to set a channel prefetch is if you want to adjust the prefetch dynamically after the subscription has started, and without cancelling the subscription, e.g.

broker.subscribe('s1', { prefetch: 0, channelPrefetch: 5 }, (err, subscription) => {
  if (err) throw err;
  subscription.on('message', (message, content, ackOrNack) => {
    ackOrNack();
    const prefetch = tunePrefetch();
    subscription.setChannelPrefetch(prefetch, (err) => {
      if (err) throw err;
    });
  });
});
const subscription = await broker.subscribe('s1', { prefetch: 0, channelPrefetch: 5 });
subscription.on('message', (message, content, ackOrNack) => {
  ackOrNack();
  const prefetch = tunePrefetch();
  await subscription.setChannelPrefetch(prefetch);
});

Don't forget to zero the regular consumer prefetch when specifying a channelPrefetch to prevent potential conflicts.

retry

If an error occurs on the channel (which will happen if you accidentally acknowledge a message twice), then it becomes unusable and no more messages will be delivered. Rascal listens to the channel's error even and assuming you are using its defaults will automatically attempt to resubscribe to a new channel after a one second delay. You can disable or customise this in your configuration or in the call to subscribe.

// Does not retry. This will cause an error to be emitted which unhandled will crash your process. See [Subscriber Events](#subscriber-events)
broker.subscribe('s1', { prefetch: 10, retry: false }, callback);

// Retries without delay.
broker.subscribe('s1', { prefetch: 10, retry: true }, callback);

// Retries after a one second interval.
broker.subscribe('s1', { prefetch: 10, retry: { delay: 1000 } }, callback);
// Does not retry. This will cause an error to be emitted which unhandled will crash your process. See [Subscriber Events](#subscriber-events)
await broker.subscribe('s1', { prefetch: 10, retry: false });

// Retries without delay.
await broker.subscribe('s1', { prefetch: 10, retry: true });

// Retries after a one second interval.
await broker.subscribe('s1', { prefetch: 10, retry: { delay: 1000 } });

Subscriber Events

amqplib emits error events from the channel. These can happen for a number of reasons, but a common cause is because you have acknowledged the message twice. The subscriber will listen for channel errors so it can automatically re-subscribe but still emits them so they can be reported by your application. If you don not listen to these events or handle them in a domain they will cause your application to crash.

Defaults

Configuring each vhost, exchange, queue, binding, publication and subscription explicitly wouldn't be much fun. Not only does Rascal ship with default production and test configuration files, but you can also specify your own defaults in your configuration files by adding a "defaults" sub document.

{
  "defaults": {
    "vhosts": {
      "exchanges": {
        "assert": true,
        "type": "topic"
      },
      "queues": {
        "assert": true
      },
      "bindings": {
        "destinationType": "queue",
        "bindingKey": "#"
      }
    },
    "publications": {
      "vhost": "/",
      "confirm": true,
      "options": {
        "persistent": true
      }
    },
    "subscriptions": {
      "vhost": "/",
      "prefetch": 10,
      "retry": {
        "delay": 1000
      },
      "redeliveries": {
        "counter": {
          "size": 1000
        },
        "limit": 1000
      }
    }
  }
}

Cancelling subscriptions

You can cancel subscriptions as follows

broker.subscribe('s1', (err, subscription) => {
  if (err) throw err; // subscription didn't exist
  subscription.cancel((err) => {
    console.error(err);
  });
});
try {
  const subscription = await broker.subscribe('s1');
  await subscription.cancel();
} catch (err) {
  // subscription didn't exist or could not be cancelled
}

Cancelling a subscription will stop consuming messages, but leave the channel open until any outstanding messages have been acknowledged, or the timeout specified by through the closeTimeout subscription property is exceeded.

Shutdown

You can shutdown the broker by calling await broker.shutdown() or broker.shutdown(cb).

Bonus Features

Shorthand Notation

Rascal configuration can get rather verbose, so you can use the shorthand notation

{
  "exchanges": {
    "e1": {},
    "e2": {}
  },
  "queues": {
    "q1": {},
    "q2": {}
  },
  "bindings": {
    "b1": {
      "source": "e1",
      "destination": "q1"
    },
    "b2": {
      "source": "e2",
      "destination": "q2",
      "bindingKeys": ["bk1", "bk2"]
    }
  }
}

is equivalent to...

{
  "exchanges": ["e1", "e2"],
  "queues": ["q1", "q2"],
  "bindings": ["e1 -> q1", "e2[bk1, bk2] -> q2"]
}

If you need to specify exchange, queue or binding parameters you can mix and match string and object configuration...

{
  "exchanges": {
    "e1": {},
    "e2": {
      "type": "fanout"
    }
  }
}

is equivalent to...

{
  "exchanges": [
    "e1",
    {
      "name": "e2",
      "type": "fanout"
    }
  ]
}

Connect

Rascal is a rich pub/sub wrapper and as such hides much of the amqplib channel api. If you need to access this you can programmatically establish a connection to a vhost as follows.

broker.connect('/', (err, connection) => {
  if (err) throw new Error(`Connection error: ${err.message}`);
  // profit
});
try {
  const connection = broker.connect('/');
  // profit
} catch (err) {
  throw new Error(`Connection error: ${err.message}`);
}

This will leverage Rascal's cluster connection support, but you will be responsible for error handling and disconnection.

Nuke, Purge and UnsubscribeAll

In a test environment it's useful to be able to nuke your setup between tests. The specifics will vary based on your test runner, but assuming you were using Mocha...

afterEach((done) => {
  broker.nuke(done);
});
afterEach(async () => {
  await broker.nuke();
});

It can be costly to nuke between tests, so if you want the tear down to be quicker use the purge and unsubscribeAll.

afterEach((done) => {
  async.series([broker.unsubscribeAll, broker.purge], done);
});

after((done) => {
  broker.nuke(done);
});
afterEach(async () => {
  await broker.unsubscribeAll();
  await broker.purge();
});

after(async () => {
  await broker.nuke();
});

Bounce

Bounce disconnects and reinitialises the broker.

beforeEach((done) => {
  broker.bounce(done);
});
beforeEach(async () => {
  await broker.bounce();
});

Shovels

RabbitMQ enables you to transfer messages between brokers using the Shovel plugin. You can do something similar with rascal by connecting a subscription to a publication. Shovel relies on rascals 'forward' feature, so all the caveats about duplicate messages apply.

{
  "shovels": {
    "x1": {
      "subscription": "s1",
      "publication": "p1"
    }
  }
}

or in shorthand

{
  "shovels": ["s1 -> p1"]
}

Running the tests

npm test

You'll need a RabbitMQ server running locally with default configuration. If that's too much trouble try installing docker and running the following npm run docker

rascal's People

Contributors

adriantam avatar bettiolo avatar boreplusplus avatar charlieharris1 avatar cressie176 avatar csabapalfi avatar dependabot[bot] avatar dmfaux avatar flashcrafter avatar gaganshera avatar glasseyes42 avatar huikaihoo avatar jakehoward avatar juliendangers avatar justkant avatar kkhanhluu avatar kolahzary avatar marcbachmann avatar matej-prokop avatar mmcgarr avatar paololaurenti avatar pgte avatar rossj avatar rouanw avatar scottk avatar snyk-bot avatar ulisesgascon avatar zehelein avatar zijin-m avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rascal's Issues

Default publications and subscriptions from one vhost override those from another

Rascal will automatically create a default publication for each queue and exchange, and a default subscription for each queue. The name given to the each is just the queue or exchange name, e.g. q1 or e1.

Publications and subscriptions are hoisted from the vhost block to the root configuration. Since queue and exchange names only have to be unique to a vhost, this can lead to a default publication / subscription from one vhost overridding the default publication / subscription from another vhost.

To fix this we are going to qualify the default publication and subscriptions with the vhost name, e.g. v1/q1 or v1/e1 (/q1 or /e1 for the default vhost). This would be a breaking change. To soften the break an interim release of rascal will create both the qualified and unqualified subscriptions / publications, and warn when an unqualified subscription / publication is used.

Subscription Error

Migrated a service over today, all looks fine, but have noticed that after a while I get a subscription error, at which point my service goes into infinite loop of subscription error.

broker.subscribe('incomingEvents', function(err, subscription) {
        subscription
          .on('message', handler)
          .on('error', function(err) {
            logger.error('Subscription Error: ' + err.message);
          });
    });

Wondering what we are supposed to do - should I cancel the subscription and re-subscribe?

Error on publish confirm

I am trying to publish a message using publish confirm with a publication with mandatory: true flag.
Every time I try to publish a message I get the following error:

Error was:  { TypeError: Cannot read property 'messageId' of undefined
    at EventEmitter.publication.on.on.on (/home/paolo/dev/rabbitMQ-producer.js:15:67)
    at EventEmitter.emit (events.js:189:13)
    at ConfirmChannel.emit (events.js:189:13)
    at /home/paolo/dev/node_modules/amqplib/lib/channel.js:273:10
    at ConfirmChannel.content [as handleMessage] (/home/paolo/dev/node_modules/amqplib/lib/channel.js:326:9)
    at ConfirmChannel.C.acceptMessageFrame (/home/paolo/dev/node_modules/amqplib/lib/channel.js:241:31)
    at ConfirmChannel.C.accept (/home/paolo/dev/node_modules/amqplib/lib/channel.js:394:17)
    at Connection.mainAccept [as accept] (/home/paolo/dev/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/home/paolo/dev/node_modules/amqplib/lib/connection.js:478:48)
    at Socket.emit (events.js:189:13)
    at emitReadable_ (_stream_readable.js:535:12)
    at process._tickCallback (internal/process/next_tick.js:63:19) code: 541 }

If I set mandatory: false everything is fine.

My configurations are:

exchanges: {
  messages: {
    assert: true,
    check: true,
    type: 'fanout',
    options: {
      durable: true,
      autoDelete: false
     }
   }
 }

and

      publications: {
        messages_pub: {
          exchange: 'messages',
          routingKey: '#',
          confirm: true,
          options: {
            persistent: true,
            mandatory: true
          }
        }
      }

The exchange where I am publishing messages has no queues bound.

Rascal: 4.1.0
RabbitMQ: 3.7.8

Rascal doesn't reconnect if Rabbit is killed

If I kill RabbitMQ and immediately restart it, the Rascal service breaks (even after waiting for RabbitMQ to become fully online again) with the following error next time it attempts to use the connection, e.g. for publishing a message. This also causes my app to crash (I assume this means the error is coming from a stream that I can't add an error handler to, but I haven't investigated this too much).

0|app      | IllegalOperationError: Connection closed (Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
0|app      |     at Connection.<anonymous> (/app/node_modules/amqplib/lib/connection.js:359:11)
0|app      |     at ConfirmChannel.C.sendImmediately (/app/node_modules/amqplib/lib/channel.js:64:26)
0|app      |     at ConfirmChannel.C.sendOrEnqueue (/app/node_modules/amqplib/lib/channel.js:75:10)
0|app      |     at ConfirmChannel.C._rpc (/app/node_modules/amqplib/lib/channel.js:131:8)
0|app      |     at ConfirmChannel.Channel.rpc (/app/node_modules/amqplib/lib/callback_model.js:73:8)
0|app      |     at ConfirmChannel.Channel.open (/app/node_modules/amqplib/lib/callback_model.js:86:15)
0|app      |     at CallbackModel.createConfirmChannel (/app/node_modules/amqplib/lib/callback_model.js:285:6)
0|app      |     at createChannel (/app/node_modules/rascal/lib/amqp/Vhost.js:167:38)
0|app      |     at Object.create (/app/node_modules/rascal/lib/amqp/Vhost.js:115:17)
0|app      |     at Pool._createResource (/app/node_modules/generic-pool/lib/generic-pool.js:354:17)
0|app      |     at Pool.dispense [as _dispense] (/app/node_modules/generic-pool/lib/generic-pool.js:314:10)
0|app      |     at Pool.acquire (/app/node_modules/generic-pool/lib/generic-pool.js:436:8)
0|app      |     at /app/node_modules/rascal/lib/amqp/Vhost.js:135:18
0|app      |     at /app/node_modules/rascal/node_modules/async/dist/async.js:4069:9
0|app      |     at process (/app/node_modules/rascal/node_modules/async/dist/async.js:2317:17)

This seems to be because Rascal isn't listening for the connection to close and therefore never attempts to recreate the connection. I can fix this by adding the following line into the Vhost init function:

ctx.connection.on('close', handleConnectionError.bind(null, config));

... such that the code becomes:

this.init = function(next) {
    debug('Initialising vhost: %s', config.name)
    pauseChannelAllocation()
    init(config, {}, function(err, config, ctx) {
        if (err) return next(err)
        self.emit('connect')
        ctx.connection.removeAllListeners('error')
        ctx.connection.once('error', handleConnectionError.bind(null, config))
        forwardEvents(ctx.connection, self, function(eventName) {
            return eventName === 'blocked' || eventName === 'unblocked';
        })
        ctx.connection.on('close', handleConnectionError.bind(null, config));
        connection = ctx.connection
        connectionConfig = ctx.connectionConfig
        resumeChannelAllocation()
        return next(null, self)
    })
    return self
}

With this line added, when RabbitMQ is killed, the service immediately notices and attempts to retry the connection until it succeeds:

0|app      | Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
[...]
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
[ eventually the connection is successful and no more errors are logged ]

Furthermore, while the connection is in the retrying state above, attempting to publish messages will not cause the aforementioned error to be thrown and therefore my app does not crash. I have some try-catches in there that are probably swallowing whatever errors are being thrown instead.

This may be naive as I don't understand how all the moving parts in Rascal/amqplib fit together 😅 Just hoping to get your initial thoughts and I'll be happy to investigate further on any of these points if you need me to.

[Question] What would it happen if the acknowledge message from the consumer times out?

Hello @cressie176,

More than an issue, this is a question.

We have been doing some load test for an application using Rascal. We have seen that sometimes messages hang on RabbitMQ queues as unacknowledged until we restart the consumers. Our theory is the acknowledge message fails due to a ETIMEDOUT or ECONRESET. Is our theory plausible? What are your thoughts? We are thinking to add a timeout, after which, the message would be nacked. Obviously, we would need to be prepared for redeliveries of messages as explained on https://www.rabbitmq.com/confirms.html#automatic-requeueing. The problem would be that the ackOrNack is not a promise we can put into a Promise.all with a timeout, neither does return any value. So I am not sure how we could do this. Do you have any suggestions?

Thanks in advance.

Can't subscribe to message of a specific type

Rabbit has message properties. In this properties we can put a type.
image
Rabbit let you handle ( consume ) messages of a specific type so you can have multiple consumers in one application depending on the type property.

I can't manage how to do that with rascal.

Is it possible ?

shutdown() does not gracefully drain / clear generic pools

Related to #50. I've noticed that after trying to do a graceful shutdown(), my process hangs around for about ~30 seconds. Using wtfnode it seems that there is still a timer from generic-pool that keeps the process running for a while:

[WTF Node?] open handles:
- File descriptors: (note: stdio always exists)
  - fd 1 (tty) (stdio)
  - fd 2 (tty) (stdio)
- Timers:
  - (1000 ~ 1000 ms) (anonymous) @ /code/node_modules/generic-pool/lib/generic-pool.js:256

I'm guessing this is part of generic-pool's idle handling. idleTimeoutMillis defaults to 30s which is about the time I'm seeing before actual process exit.

Perhaps the rascal vhost should drain and clear the pools during shutdown?

From the generic-pool docs:

/**
 * Step 3 - Drain pool during shutdown (optional)
 */
// Only call this once in your application -- at the point you want
// to shutdown and stop using this pool.
myPool.drain().then(function() {
  myPool.clear();
});

I'm new to rascal but I'd be happy to work on a PR if you'd like this change.

Retry not working, no consumers listed in rabbitmq admin dashboard

I need help figuring out this issue. I use rascal with multiple microservices and it works really well. But I have one microservice that after some amount of time is no longer connected to rabbitmq. I go to the admin dashboard for rabbitmq and see no consumers listed for the queue used by this microservice. In this last instance I see a lot of this error in the logs:

Unexpected close
Channel ended, no reply will be forthcoming

I don't know if that's related but I see it many times, so that makes me think it was able to reconnect. Any idea's on things I can look at to help figure this out? Do you know of any cases where this might happen?

Graceful shutdown

Currently shutting down the rascal broker will

  1. Unsubscribe all subscriptions
  2. Disconnect from all vhosts

This means any in-flight unacknowledged messages will be rolled back (and redelivered), and the outcome of any inflight publications is unknown.

It will never be possible to shutdown in a completely graceful manner since Rascal is not responsible for the full unit of work, but it should be possible to delay disconnect until

  1. All in-flight messages have been ack'd/nack'd (or a timeout expires)
  2. All borrowed publication channels have been returned to the pool (or a timeout expires)

The former would require counting messages in and out. The latter requires setting the channel publication pool to drain after pausing channel allocation.

Thoughts @BorePlusPlus ?

How/where to handle Heartbeat timeout exception

Hi @cressie176,

I am trying to test the reconnection part of Rascal. When shutting down RabbitMQ, I see the reconnection code working, using the DEBUG mode for Rascal. I see some instances of Heartbet timeout coming up from the amqplib, like:

at Heart.emit (events.js:198:13)',\n ' at Heart.EventEmitter.emit (domain.js:448:20)',\n ' at Heart.runHeartbeat (/Users/carlosgarcia/Documents/mailonline/development/mol-fe/mol-fe-web-push-api/node_modules/amqplib/lib/heartbeat.js:88:17)',\n ' at ontimeout (timers.js:436:11)',\n ' at tryOnTimeout (timers.js:300:5)',\n ' at listOnTimeout (timers.js:263:5)',\n ' at Timer.processTimers (timers.js:223:10)' ]

I have two applications, one producer and one consumer, using the same broker, and while the producer is able to publish messages after the reconnection, the consumer is not able to consume messages even though it seems that is has reconnected and re-established the subscriptions.

I see uncaught exceptions for the hearbeat and unexpected close errors. How should I handle those? I have .on('error') listeners in all the places mentioned in the documentation, when creating the broker and for the subscription and the publication. We use the promise version.

I can see this in the DEBUG rascal logs on the consumer application

rascal:Vhost Initialising vhost: my-host-name rascal:tasks:createConnection Connecting to broker using url: amqp://admin:***@localhost:5673/web-push-api?heartbeat=10&connection_timeout=10000&channelMax=100 rascal:tasks:createConnection Obtained connection: my-connection-hash rascal:tasks:createChannel Creating channel rascal:tasks:assertExchanges Asserting exchange: my-exchange rascal:tasks:assertQueues Asserting queue: subscription rascal:tasks:applyBindings Binding queue: subscription to exchange: my-exchange with binding key: my-binding-key rascal:tasks:closeChannel Closing channel rascal:Vhost vhost: my-host-name was initialised with connection: my-connection-hash

Is normal that it closes the channel?

Checking the RabbitMQ UI Management, it seems the consumer doesn't get attached to the queue.

Please, let me know if you need any more info from my side.

Thanks a lot.

Dynamic assertion of queues and exchanges.

I didn't find the way to access the amqplib functions like "assertQueue" or "bindQueue" to dynamically add some exchanges, queues, etc on the configuration.

for my needs as an example , i want to publish on new exchanges that i create dynamically depending on the different parameters that i receive on my API

( Thanks a lot for this library ;) )
using :
rascal : 4.6.2
node : 10.15.3

Error "Vhost: myvhost refers to an unsupported attribute: encryption"

I used the settings from the documentation here: https://github.com/guidesmiths/rascal#encrypting-messages

When I add the encryption field to the vhost I get this error:

Error "Vhost: myvhost refers to an unsupported attribute: encryption. If I remove the encryption field it works as expected.

This setting does not work:

"publications": {
    "my_q": {
        "queue": "my.q",
        "encryption": "well-known-v1"
    }
},
"encryption": {
    "well-known-v1": {
        "key": "f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315",
        "ivLength": 16,
        "algorithm": "aes-256-cbc"
    }
}

I saw the test's added it to the publication field, if I add it there and remove it from the vhost it works fine. This setting works:

my_q: {
    queue: `my_q`,
    encryption: {
        name: 'well-known',
        key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315',
        ivLength: 16,
        algorithm: 'aes-256-cbc',
    }
}

Any ideas? I'm using "version": "3.2.0".

Messages to publish are lost when RabbitMQ connection fails

I have this simple publication code:

const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config));

broker.on('error', (error) => {
    console.error(`Broker error: ${error.message}`);
});

setInterval(async function() {
      try {
        const publication = await broker.publish('pub', 'hello world');
        publication.on('error', (error) => {
            console.error(`Publication error: ${error.message}`);
        });
      } catch (error) {
        console.error(`Rascal config error: ${error.message}`);
      }
}, 1000);

It's running fine until I shutdown the RabbitMQ server. Then I see this error in the logs:

Broker error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"

The messages are not published now, obviously. There is no Publication error in the logs though, which caused some confusion for me.

When I start the RabbitMQ server, I'd expect the in-meantime messages to be published after Rascal reconnects. This doesn't happen, the messages are lost. If I understand the code correctly they're queued in poolQueue: https://github.com/guidesmiths/rascal/blob/a68834d46ccc0b476d4530a21368ca2b234348d6/lib/amqp/Vhost.js#L211 but then after reconnection the queue is overridden with an empty one in https://github.com/guidesmiths/rascal/blob/a68834d46ccc0b476d4530a21368ca2b234348d6/lib/amqp/Vhost.js#L245

My workaround is to add a local queue and pause/resume it based on the connection status:

const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config));

const messagesToPublishQueue = async.queue((_, next) => {
	next();
}, 1);

messagesToPublishQueue.pause();

broker
    .on('error', (error) => {
        console.error(`Broker error: ${error.message}`);
    })
	.on('connect', () => {
		messagesToPublishQueue.resume();
	})
	.on('disconnect', () => {
		messagesToPublishQueue.pause();
	});

setInterval(async function() {
    messagesToPublishQueue.push(null, async () => {
      try {
        const publication = await broker.publish('pub', 'hello world');
        publication.on('error', (error) => {
            console.error(`Publication error: ${error.message}`);
        });
      } catch (error) {
        console.error(`Rascal config error: ${error.message}`);
      }
    });
}, 1000);

Support connection max retries

Hi there,

What do you think about supporting an additional flag maxRetries inside connection.retry config?

Without a max retries, there is a change that the app is stuck in a loop reconnecting when there's something wrong externally.

With max retries, eventually the app would exit/crash and it would be easier to see that something is wrong.

Connected event listener

Hello, really great work on Rascal, thanks!

Just a question. I can "create" a broker and add a listener for an error event:

const broker = await rascal.BrokerAsPromised.create(rascal.withDefaultConfig(rascalConfigServer))
broker.on("error", onRascalBrokerError)

How can I add a listener for "connected" event instead?

Thanks again.

RFC Configuration based queue/exchange deletion

RabbitMQ doesn't allow you to redefine queues. Typically you have to create a new queue + bindings then delete the old one, which is a pain. I think Rascal might be able to automate this but introducing a delete attribute into the queue config

queues: {
  doomed: {
    delete: true
  }
}

Rascal may be able to

  1. Setup all defined exchanges / queues as per the current behaviour (excluding deleted ones)
  2. Unbind the queue
  3. Wait for the queue to be empty
  4. Delete the queue

The are some caveats though:

  • This approach is not completely safe, but it's reasonable in many circumstances
  • Early version of RabbitMQ will destroy the connection is the queue does not exist (therefore use a dedicated connection just in case)
  • The queue would still be bound to the default exchange and so may never drain
  • AMQP may not support programatic removal of all bindings / list current bindings
  • AMQP does include a queue depth function. I would either have to get + nack with requeue which may have side-effects, or use the HTTP api
  • Repeated attempts to delete a queue will fail so I would need to tolerate failures or check for existence

Rinse and repeat for exchanges

Why callback of the subscription 'error' event doesn't have 'ackOrNack' parameter as 'message' and 'invalid_content' ?

I don't understand the reason why the callback of the subscription error event doesn't have ackOrNack parameter.
In this way, if I want to handle a recovery strategy in case of an unhandled error inside the message event handler, I need to catch the error. Therefore I don't understand what is the purpose of the error event...

I was expecting that in case of an unhandled error inside the message event handler, Rascal would emit the error event passing the ackOrNack callback as parameter. So, in this case, inside the error event handler I could fire-up my recovery strategy (without the need to catch anything inside the message event handler).

Process crash with "Callback was already called."

Hi there,

I am getting this error.

if (fn === null) throw new Error("Callback was already called.");
                         ^
Error: Callback was already called.
    at /path/to/project/node_modules/async/dist/async.js:903:32
    at /path/to/project/node_modules/async/dist/async.js:2490:13
    at /path/to/project/node_modules/async/dist/async.js:2551:17
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:24:9
    at /path/to/project/node_modules/async/dist/async.js:4566:26
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:18:29
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:32:29
    at /path/to/project/node_modules/amqplib/callback_api.js:16:10
    at /path/to/project/node_modules/amqplib/lib/connect.js:147:12
    at bail (/path/to/project/node_modules/amqplib/lib/connection.js:175:5)

Not sure, how this error is created. No other logs. I am not even sure that this is caused from application code or connection related issue triggered in library code.

Thank you!

Handle publishing message when got connection issue to RabbitMQ server

We have an issue about the gap between service to RabbitMQ
Scenario

  1. Service try to publish message to rabbitMQ
    Service ------Message----> RabbitMQ

  2. While service is publishing the message to rabbitmq, there is something wrong with the connection. So service can not connect to RabbitMQ
    Service ------Message----X RabbitMQ

  3. Rascal will collect message to itself for buffering until the connection comeback
    Buffer(Message)
    ^
    |
    Service X RabbitMQ

  4. Service got restarted in some reasons and comback again without buffer and connect to RabbitMQ without problem

Service -------Connected-----> RabbitMQ

  1. The message gone

Well this case rarely happens if server is stable and never restart itself, but still can happen.
Any suggestions for Rascal to save publishing message somewhere else for doing retry publishing again (Redis, Storage, Etc.)

Change Delay Queue's TTL

In my setup I have multiple queues with various delays and these delays are configurable, so as I made some search if I want to change any config of a queue I need to create a new queue then migrate messages in the old queue to the new one and then delete the old one. I tried to find something to help me achieve this with rascal but couldn't find anything to help me delete a queue/exchange/binding from the exposed API.
I really liked this tool and I'm thinking of using it in my app instead of the amqplib because of the reasons mentioned here.

So do you have any suggestion on how I might do this?

Action required: Greenkeeper could not be activated 🚨

🚨 You need to enable Continuous Integration on all branches of this repository. 🚨

To enable Greenkeeper, you need to make sure that a commit status is reported on all branches. This is required by Greenkeeper because we are using your CI build statuses to figure out when to notify you about breaking changes.

Since we did not receive a CI status on the greenkeeper/initial branch, we assume that you still need to configure it.

If you have already set up a CI for this repository, you might need to check your configuration. Make sure it will run on all new branches. If you don’t want it to run on every branch, you can whitelist branches starting with greenkeeper/.

We recommend using Travis CI, but Greenkeeper will work with every other CI service as well.

Once you have installed CI on this repository, you’ll need to re-trigger Greenkeeper’s initial Pull Request. To do this, please delete the greenkeeper/initial branch in this repository, and then remove and re-add this repository to the Greenkeeper integration’s white list on Github. You'll find this list on your repo or organization’s settings page, under Installed GitHub Apps.

Connection configuration inconsistency

Hello,
thanks for providing great tool! While I'm using it for a short period, I found out some inconsistency between README and the current implementation in Connection configuration.

  1. Regarding the README which tells me to simply provide connection as a string, connection attribute is of type ConnectionConfig only, not the string itself.
{
  "vhosts": {
    "v1": {
      // this won't work, string is not applicable for ConnectionConfig
      "connection": "amqp://guest:[email protected]:5672/v1?heartbeat=10"
    }
  }
}
  1. Ok, so I tried with the url attribute inside the ConnectionConfig, which does not work either for me in combination with withDefaultConfig
{
  "vhosts": {
    "v1": {
      "connection": {
        // this is fine, but seems like to be ignored or overridden
        "url": "amqp://guest:[email protected]:5672/v1?heartbeat=10"
      }
    }
  }
}

This ends up in a connection issue: Failed to check vhost: /. http://guest:***@localhost:15672 errored with: connect ECONNREFUSED 127.0.0.1:15672.

Seems like my url is being overridden be a default value. I'm not able to configure connection using a string. Only working solution seems to be to pass an object with username, password, hostname attributes supplied.


UPDATE: I've realized the problem number 2 is caused by check: true in the vhost configuration, which is using management API to validate this. I've completely missed that port difference in the log. So right now url attribute of the ConnectionConfig object is used.

There's only a Typo in the README, the point 1 is valid :-)

How to handle RPC communication?

Sorry to ask this silly question.

I saw the publication.on('return') event listener, but can't figure out how to return message from the subscription.

Add dynamic subscriptions/publications

Is there any way to add more publications (ie. queues) to the Broker after it has been created?
It seems that the configuration is static but in my use case I need a way to dynamically create queues.

Example:

  • I have 3 categories that I map to 3 publications
  • I create the broker
  • 5 minutes later, a new category is added, I should create a new publication for it
    => Any way I can do this without re-creating the whole broker instance ?

allow 'bindingKey' to be an array

instead of this:

                "order-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.order.created"
                },
                "product-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.product.created"
                },
                "billing-run-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.billing_run.created"
                }

it would be nice to just write this:

                "created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": ["salesforce.v1.notifications.order.created", "salesforce.v1.notifications.product.created",
"salesforce.v1.notifications.billing_run.created"]
                },

How to declare a queue as lazy?

👋 I have a number of queues that I need to connect to that have been predeclared as lazy. I cannot seem to find the correct settings to add to the Rascal config file.

What I have unsuccessfully tried:

// ...
queues: {
  "this.is.a.queue": {
    options: {
      arguments: {
        "x-queue-mode": "lazy"
        // Also tried queueMode: "lazy"
      }
    }
  }
}
// ...
// ...
queues: {
  "this.is.a.queue": {
    options: {
      "x-queue-mode": "lazy"
      // Also tried queueMode: "lazy"
    }
  }
}
// ...

This is the error that is emitted when attempting to connect:

{ Error: Operation failed: QueueDeclare; 406 (PRECONDITION-FAILED) with message
"PRECONDITION_FAILED - inequivalent arg 'x-queue-mode' for queue 'this.is.a.queue'
in vhost '/': received the value 'lazy' of type 'longstr' but current is none"

I am happy to update the documentation and submit a pull request once I've solved this 👍

Cluster Connections: can we cancel randomness and keep the given order?

Hello,

Thank you for your great wrapper lib.
I'm trying to setup a RabbitMQ cluster with an active/passive behaviour for high-availability, where I only want to connect to the main node except if it fails.

Is there a way to disable the randomness in the vhost > connections configuration?
I would like the client to systematically connect in the given order.

Thank you!

Pollution of global object with connectionIndex property

Hi there. I'm working on switching my code to use rascal, and my mocha tests started warning about a global object leak of a "connectionIndex" property.

Adding the --check-leaks mocha option to rascal's own tests cause a similar issue:

  1) Broker
       should assert vhosts:
     Error: global leak detected: 'connectionIndex'
      at /c/Users/ross/code/rascal/test/broker.tests.js:95:7
      at /c/Users/ross/code/rascal/test/broker.tests.js:250:7
      at Immediate._onImmediate (lib/amqp/Broker.js:1:20202)
      at processImmediate (internal/timers.js:439:21)

I believe the culprit is this line, where this ends up being the global object. I'm assuming it should be self... instead.

Cancelling a subscription before the session has been opened leads to problems

Calling subscription.subscribe() only starts consuming messages after a message handler has been added using subscription.on('message', ...)

Adding a message handler is a synchronous operation because it uses Node's event emitter. Therefore it is not possible to wait until the subscription has actually be completed.

We have a test that sets up a subscription, executes quickly without using Rascal, then calls subscription.cancel() in the common teardown.

subscription.cancel synchronously marks the session as cancelled, then queues (using async) the action session.close.

This can resulting in the following sequence...

  1. subscription.subscribe
  2. register on message handler
  3. channel.consume
  4. cancel the session
  5. session._open (would normally attach the channel to the session)
  6. session._close

However instead of attaching the channel to the session, session._open yields an error if the session is cancelled. Consequently the session never gets a channel, session._close does nothing, but the channel.consume is still operational.

If a message published to the queue, channel.consume will involve the message handler, which will not be able to acknowledge the message, because the session does not have a reference to the channel.

Possible solutions:

  • Queue session.cancel instead of session._close
  • Still attach the channel the session, even if cancelled.

It would also be nice to emit an 'opened' event once the session has been opened

"Encryption: name refers to an unsupported attribute: 0" when setting encryption field in subscription to a profile name

Here's the setting:

"s1": {
    queue: `q1`,
    encryption: "well-known",
    contentType: "application/json",
    redeliveries: {
        limit: 5,
        counter: "shared"
    }
}

The readme doesn't show an example under the subscription section. There is a section for it but the example seems incomplete.

If I comment out the encryption field it works.

Outside of the vhost section I have this setting:

"well-known": {
    key: this.cryptoKey,
    ivLength: 16,
    algorithm: "aes-256-cbc"
}

Allow webpacking by explicitly requiring tasks

At the moment, webpack does not quite work with Rascal, as it requires the task files dynamically, which webpack does not understand:

https://github.com/guidesmiths/rascal/blob/6f23d6f43d3bc118d303dc630f0f71dfd330f55a/lib/amqp/tasks/index.js#L4

If this was changed to explicitly require the individual tasks, then I think webpack would work with this module.

To further explain my use case, I have a large project that uses Rascal for various services. The project is structured as a single code base, with multiple entrypoints, and we generate a separate webpack bundle for each service, which then become essentially the deployment artifacts. At this time, we have to specifically skip webpacking rascal, and then separately include an npm-installed webpack in each service's container.

What do you think about explicitly / statically requiring the tasks? I would be happy to work on PR if you think this is a good idea.

TypeError: Cannot read property 'counter' of undefined - on the auto-created subscriptions

Hallo!

I've created simple config file(see in the end of message) and used it in the code from examples

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config');
 (async () => {
  try {
    **const broker = await Broker.create(config);**
    broker.on('error', console.error);

But got the error
TypeError: Cannot read property 'counter' of undefined
at validateSubscription (c:\Src\rascal\node_modules\rascal\lib\config\validate.js:142:65)

It seems that for auto-created subscription,
subscription.redeliveries is empty

So
subscription.redeliveries.counter throws error

image

Config.json

{
  "vhosts": {
    "rabbit": {
      "connection": {
        "url": "amqp://usr:pass@app-serv:5742/"
      },
      "queues": [
        "1C"
      ]
    }
  }
}

amqplib missing dependency

Hi team!

I got an error Error: Cannot find module 'amqplib/callback_api' just after requiring the module.

Sample code to test:

const Rascal = require('rascal')
console.log("Hi Rascal!")

I am using:

  • Node v10.15.3
  • NPM v6.9.0
  • OSX 10.14
  • Rascal 4.5.0

Versions affected

  • v4.3.0
  • v4.4.0
  • v4.5.0

Why?
I discovered that in version v4.3.0 the dependecies were modified. So amqplib is part of devDependencies and peerDependencies. But since Npm@3, the way to manage peer dependencies has changed. So it won't install it by default (as expected).

How to fix it?
I was able to fix this bug. In order to do that I relocated amqplib as regular dependency and not devDependencies.

Please confirm if you believe that this is the correct way to fix it.

I will open a PR asap 😉

Reconnection failed when shutdown rabbitMq on server and swith to another cluster rabbit server

Hi,
i would like to create a nodejs script with rascal api, which is listening a rabbit queue and automatiquely reconnect to a RabbitMq server after a Rabbit shutdown and switch to another server (cluster).
please find below my code in attachment, based on the Rascal documentations.
My configuration : rabbit server cluster of 2 servers
rabbitMq 3.7.5
javascript nodejs with rascal api : 4.7.0

index.js.txt
configRascal.js.txt

It runs fine when i start de project , but , when i test the shutdown i have this message :

[2020-03-11T17:13:28.941Z] DEBUG: psf-ms-rascal-1.0.0/15684 on SCU48652 (L:\psf-ms-rascal\src\amq\listeners\index.js:15 in module.exports): Rascal listener initialized --------------
[2020-03-11T17:13:28.943Z] DEBUG: psf-ms-rascal-1.0.0/15684 on SCU48652 (L:\psf-ms-rascal\src\amq\listeners\index.js:19 in module.exports): Rascal listener running fine --------------
events.js:183
throw er; // Unhandled 'error' event

Error: read ECONNRESET
at _errnoException (util.js:1024:11)
at TCP.onread (net.js:615:25)
[nodemon] app crashed - waiting for file changes before starting...

It's normal.

But when i start rabbitmq again , it doesn't reconnect automatiquely.

I dont know how i can fix simply this issue.

this issue could be similar as one of them , but the code looked is completely different. So i need your.
Thanks a lot .
Fbernardpro.

Client doesn't reconnect on consumer cancelation

Hello,

It appears that when a consumer is canceled, Rascal is staying connected instead of attempting to reconnect and re-establishing its consumer. This could happen on a queue mirror failover or on a queue deletion.

This appears partly due to the line at https://github.com/guidesmiths/rascal/blob/master/lib/amqp/Subscription.js#L55 as the empty message that is sent indicating cancelation is a no-op.

It also appears the Rascal client is sending the correct capabilities header consumer_cancel_notify.
image

Is this expected behavior? Is there a way we could check that consumer/subscription status outside of that logic and manually reconnect from outside the client code?

Nacking a message with confirmation:true does not actually nack

The config is:

        "p2": {
            "vhost": "zhost",
            "exchange": "e2",
            "confirm": true
        }

The subscriber does this:

broker.subscribe('s2', function (err, message, content, next) {
        console.log(content)
        next(new Error("foo"))
    }, { prefetch: 1 })

The publisher receives this: (the debug output shows that rascal receives a nack, however the publisher's completion handler does not get this info

  rascal:Subscription Not acknowledging message: f6e005a6-63f2-4f6e-8199-79bba40eacb6 with options: +5ms {}
publisher { '0': null, '1': 'f6e005a6-63f2-4f6e-8199-79bba40eacb6' }

Publishers code:

broker.publish('p2', 'This is a test message', { routingKey: 'z.p' }, function () {
            console.log("publisher", arguments);
        })

Retry seems broken on subscription

Hey there. I'm trying to follow the advanced example. I've made one of my handlers immediately throw an error which results in a channel error. My issue is that the retry option is ignored and so I am experiencing hundreds of restarting channels per second.

rascal:Subscription Received message: undefined from queue: import_service:project:import +3ms
rascal:Subscription Handling channel error: crashed :( from import_project using channel: 2ecf933a-7575-4f01-8882-b8e47470352e +5ms
app:log subscription.on('error') +9ms
app:rascal crashed :( +8ms
rascal:Subscription Subscribing to queue: import_service:project:import +0ms
rascal:Vhost Requested channel. Outstanding channel requests: 1 +7ms
rascal:SubscriberSession Removing channel: 2ecf933a-7575-4f01-8882-b8e47470352e from session +8ms
rascal:Vhost Created channel 7879819d-ba7a-4995-b8da-869c69daa79f from connection: c736f742-5ad7-45f1-b7da-33c381788acc +2ms
> Subscription.js: attachErrorHandlers
> Subscription.js: channel.consume callback null { consumerTag: 'amq.ctag-v23v7N-_ej5WSBp4vQscmA' }

It seems the channel is restarted immediately regardless of retry's min property.. I've tried {retry:1000,retry:true,and leaving it out} My code is pretty similar to the advanced example, although for some reason in the example the errors aren't caught and the app stops.

Anyway, my question is, is this expected behavior to have the channel restarting so much or did I go wrong somewhere? Thanks for any advice and for this library!

Also: retry: { delay: 1000 } does nothing despite what the docs suggest. I think it was changed to retry: 1000? #12

TypeError: Cannot read property 'Symbol(Symbol.toStringTag)' of undefined

Hi, I was trying to add rascal to my project. Here is my code based on examples/simple

const config = {
	'vhosts': {
		'v1': {
			'connection': 'amqp://localhost'
		}
	}
}

Rascal.Broker.create(Rascal.withDefaultConfig(config), function(err, broker) {
	if (err) throw err
        console.log(broker)
})

here is the traceback

blah/node_modules/async/dist/async.js:228
    return supportsSymbol && fn[Symbol.toStringTag] === 'AsyncFunction';

                               ^

TypeError: Cannot read property 'Symbol(Symbol.toStringTag)' of undefined
    at isAsync (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:228:32)
    at wrapAsync (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:232:12)
    at arrayMap (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:1687:21)
    at seq (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2564:22)
    at Object.compose (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2624:16)
    at new Broker (webpack:///./node_modules/rascal/lib/amqp/Broker.js?:35:20)
    at eval (webpack:///./node_modules/rascal/lib/amqp/Broker.js?:21:7)
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2583:16
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:473:16
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2521:9

Node v10.10.0
rascal v3.2.0

Looks like the lib is broken

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.