Giter Site home page Giter Site logo

jackrabbit's Introduction

Jackrabbit

This is a fork of hunterloftis/jackrabbit.

CircleCI

Jackrabbit is a very opinionated abstraction built on top of amqplib focused on usability and implementing several messaging patterns on RabbitMQ.

Simple Example

// producer.js
'use strict';

const jackrabbit = require('@pager/jackrabbit');
const rabbit = jackrabbit(process.env.RABBIT_URL);

rabbit
  .default()
  .publish('Hello World!', { key: 'hello' })
  .on('drain', rabbit.close);
// consumer.js
'use strict';

const jackrabbit = require('@pager/jackrabbit');
const rabbit = jackrabbit(process.env.RABBIT_URL);

rabbit
  .default()
  .queue({ name: 'hello' })
  .consume(onMessage, { noAck: true });

function onMessage(data) {
  console.log('received:', data);
}

Ack/Nack Consumer Example

'use strict';

const jackrabbit = require('@pager/jackrabbit');
const rabbit = jackrabbit(process.env.RABBIT_URL);

rabbit
  .default()
  .queue({ name: 'important_job' })
  .consume(function(data, ack, nack, msg) {
    // process data...
    // and ACK on success
    ack();
    // or alternatively NACK on failure
    nack();
  })

More Examples

For now, the best usage help is can be found in examples, which map 1-to-1 with the official RabbitMQ tutorials.

Installation

npm install --save @pager/jackrabbit

Tests

The tests are set up with Docker + Docker-Compose, so you don't need to install rabbitmq (or even node) to run them:

$ docker-compose up

Reconnection

Jackrabbit is a wrapper for ampqlib, ampqlib does NOT support reconnection.

This project will try to recover a lost connection gracefully, if it fails to do so, we will throw an error event and then exit the current process with code 1.

Our approach to reconnection is recording all the exchanges and queues created through jackrabbit. Once a connection is lost, we will try to create a new one, update the existing exchange and queue references, initialize a new channel for each queue, and bind each queue's consumers to their new channel. This should be transparent to any users of this lib.

You can configure some basic parameters of the reconnection process with some env vars:

Name Default Description
RABBIT_RECONNECTION_TIMEOUT 2000 ms between each reconnection attempt. The first attempt will always be immediate.
RABBIT_RECONNECTION_RETRIES 20 Amount of retries before erroring out and killing the node process.
RABBIT_RECONNECTION_EXACT_TIMEOUT false To prevent total outages on HA services, we're adding a random overhead of 0-10% to the reconnection timeout by default. You can disable this behaviour by setting this option to true.

jackrabbit's People

Contributors

aheuermann avatar akamaozu avatar blaineehrhart avatar calvinkosmatka avatar cilindrox avatar cody-greene avatar daanzu avatar daprieto1 avatar diegorbaquero avatar dotbr avatar hunterloftis avatar ijuani avatar jeremiahbowen avatar jsemander avatar kuryaki avatar marianoquevedo avatar matthewmueller avatar naartjie avatar naktibalda avatar pwmckenna avatar quesodev avatar renovate-bot avatar renovate[bot] avatar rfrm avatar ryanhallcs avatar sa-pagerinc-automation avatar semantic-release-bot avatar shurakaisoft avatar tracker1 avatar wiktorwojcikowski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jackrabbit's Issues

The automated release is failing 🚨

🚨 The automated release from the master branch failed. 🚨

I recommend you give this issue a high priority, so other packages depending on you could benefit from your bug fixes and new features.

You can find below the list of errors reported by semantic-release. Each one of them has to be resolved in order to automatically publish your package. I’m sure you can resolve this πŸ’ͺ.

Errors are usually caused by a misconfiguration or an authentication problem. With each error reported below you will find explanation and guidance to help you to resolve it.

Once all the errors are resolved, semantic-release will release your package the next time you push a commit to the master branch. You can also manually restart the failed CI job that runs semantic-release.

If you are not sure how to resolve this, here is some links that can help you:

If those don’t help, or if this issue is reporting something you think isn’t right, you can always ask the humans behind semantic-release.


Cannot push to the Git repository.

semantic-release cannot push the version tag to the branch master on the remote Git repository with URL https://x-access-token:[secure]@github.com/pagerinc/jackrabbit.git.

This can be caused by:


Good luck with your project ✨

Your semantic-release bot πŸ“¦πŸš€

Creating a Queue object without asserting an Exchange first

Using jackrabbit, there doesn't seem to be a way to bind a new queue to an existing exchange.

The only way to bind queues and exchanges there is:

rabbit.direct('bar').queue({ name: 'foo' })

This is problematic, if the exchange that already exists in RabbitMQ has options (like durable) that don't match this, in which case this will throw an error.

In other libraries, you typically have the option to bind queues and exchanges only based on their name.

In amqplib, there is bindQueue.

There doesn't seem to be a way to get jackrabbits queue object without going through an exchange object first.

Help with rpcClient/rpcServer

I'm getting the message on the rpcServer side, but the reply() doesn't wind up back on the requesting side.... It looks like the appropriate queues are being created, and the work/rpc queue of 'rir' is being ack'd but no response is coming through on the client side.

image

Server:

  const { server, queue } = config.mq;
  const rabbit = jackrabbit(server);
  const exchange = rabbit.default();
  exchange.rpcServer(queue, handleRequest({ listenerNumber, log, config }));
  log.debug({ message: `listener ${listenerNumber} Started`, listenerNumber, pid: process.pid });

Server handle-request

export default ({ listenerNumber, log, config }) => (request, reply) => {
  const ctx = { listenerNumber, config, log, request };
  handleRequest(ctx)
    .catch(handleError(ctx) /* Error handler not hit */)
    .then(response => {
      log.response({ response });
      reply(response);
    });
};

Client:

import jackrabbit from '@pager/jackrabbit';
// Will add a context method for doing an RPC request to the worker

export default (ctx, next) => {
  if (!ctx.app.rpc) {
    const { server, queue } = ctx.app.config.mq;
    const rabbit = jackrabbit(server);
    const exchange = rabbit.default();

    ctx.app.rpc = request =>
      new Promise((resolve, reject) => {
        try {
          exchange.rpcClient(queue, request, resolve);
        } catch (error) {
          reject(error);
        }
      });
  }
  ctx.rpc = ctx.app.rpc;
  return next();
};

I'm plugging it into my koa app... the resolve never hits, I tried replacing it with console.log ... Is the Excl (exclusive) feature setting an issue? Sorry, I'm not very familiar with AMQP or rabbit, I used the upstream library a few years ago for this, but no longer have access to that code, and really just started on what I'm currently working on.

memory leaks

The rpcClient code in exchange.js has some memory leaks.
I found 3 places with problems:

1 - the publish function is storing a correlationId in a pendingReplies object, but it never clears that correlationId when receiving the reply.
So this object is gonna keep growing on each message sent. This is particularly dangerous on Pager APIs where the exchange object is instantiated only once per server.

2 - Inside the rpcClient function three functions are created and passed to the publish method (timeout, onNotFound and onReply). None of these functions clears the corresponding reply on the pendingReplies object, so the problem with (1) repeats.

3 - The publish function is adding a listener to the return event on the channel, but it never clears that listener, so this causes another memory leak and a node warning:
Warning: Possible EventEmitter memory leak detected. 11 return listeners added. Use emitter.setMaxListeners() to increase limit

Client provided name / friendly name

Hi,

it would be nice to be able to set the name for the connection to make it easy to identify clients in rabbitmq

We used it like this in amqplib directly, but it should be easy to add :)

amqp.connect(this.rabbitmqconn, {clientProperties: {connection_name: 'print-service'}});

Especially in kubernetes where the source ips of the connections are not always that helpful you can add valuable information this way

image

image

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Repository problems

Renovate tried to run on this repository, but found these problems.

  • WARN: Fallback to renovate.json file as a preset is deprecated, please use a default.json file instead.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

cloudbuild
cloudbuild.yaml
  • node 18.12
  • node 18.12
  • node 18.12
docker-compose
docker-compose.yml
  • rabbitmq 3.10.5-management-alpine@sha256:afceef7d5a4e9b3f5a0bec1e5b78bface72a2ca1a364a5e3918ca7eb453549fa
dockerfile
Dockerfile
  • node 18.12-alpine3.16@sha256:9eff44230b2fdcca57a73b8f908c8029e72d24dd05cac5339c79d3dedf6b208b
github-actions
.github/workflows/release.yml
  • actions/checkout v3
  • actions/setup-node v3
  • actions/cache v3
npm
package.json
  • amqplib 0.8.x
  • lodash.assignin 4.x
  • uuid 8.x
  • @pager/semantic-release-config 2.x
  • chai 4.x
  • dotenv 16.3.1
  • eslint 7.x
  • eslint-config-hapi 12.x
  • eslint-plugin-hapi 4.x
  • mocha 10.x
  • semantic-release 21.x
  • sinon 15.x
  • node >= v10.18.0
nvm
.nvmrc
  • node 18.12

  • Check this box to trigger a request for Renovate to run again on this repository

Limit to one message processed at a time?

This is more of a question than an issue -- possibly a feature request.

Is there a way to limit this library from dispatching more than one message at a time? We use this for web worker jobs, and I would like to run more workers, rather than more than one job at a time. Sometimes a worker gets killed due to a node going down, or some other problem, and this causes multiple jobs to fail, rather than just one. It would be nice to be able to limit to one active message being processed.

I couldn't figure out how to do this from the documentation.

unhandled "Unexpected close"

Hi,

im getting an unexpected close error and jackrabbit does not reconnect...

I'm not sure where the disconnect is happening right now, but i thought jackrabbit would handle disconnects/reconnects ?

node:events:371
      throw er; // Unhandled 'error' event
      ^
Error: Unexpected close
    at succeed (/usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:272:13)
    at onOpenOk (/usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:254:5)
    at /usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:166:32
    at /usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:160:12
    at Socket.recv (/usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:499:12)
    at Object.onceWrapper (node:events:513:28)
    at Socket.emit (node:events:394:28)
    at emitReadable_ (node:internal/streams/readable:571:12)
    at processTicksAndRejections (node:internal/process/task_queues:82:21)
Emitted 'error' event on CallbackModel instance at:
    at Connection.emit (node:events:394:28)
    at Connection.C.onSocketError (/usr/src/app/node_modules/@pager/jackrabbit/node_modules/amqplib/lib/connection.js:353:10)
    at Socket.emit (node:events:406:35)
    at endReadableNT (node:internal/streams/readable:1329:12)
    at processTicksAndRejections (node:internal/process/task_queues:83:21)

New Relic dependency

Hi,

there is a new "New Relic" dependency, which is causing warnings.
As there is no documentation on what or why this was added it looks like this was added for some of your other/internal projects ?

So just to be sure, can this still be regarded as a generic rabbitmq wrapper library or is this heading for a pagerinc specific lib ?
No judgement, just to know where this project is heading...

Incompatibility with Node 0.12

package.json states that this library is compatible with node": ">= 0.10.x".

Actually it doesn't work on Node 0.12, because exchange.js uses arrow function.

var timeout = setTimeout(() => {

Error:

$ node -v
v0.12.18
$ node src/worker.js
/.../node_modules/@pager/jackrabbit/lib/exchange.js:214
      var timeout = setTimeout(() => {
                                ^
SyntaxError: Unexpected token )

queue.consume now has a race condition in 4.8.0

Updated to 4.8.0 to test the new patch and noticed queue.consume was broken.

Kept getting this error message: "onMessage is not defined".

After going through the changes from 4.7.1 to 4.8.0, I think I discovered what the issue is.

4.7.1

jackrabbit/lib/queue.js

Lines 42 to 55 in f8dc44c

function consume(callback, options) {
if( ready ){
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options);
channel.consume(emitter.name, onMessage, opts, onConsume);
}
else {
emitter.once('ready', function() {
const opts = extend({}, DEFAULT_CONSUME_OPTIONS, options);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
}
function onMessage(msg) {

4.8.0

jackrabbit/lib/queue.js

Lines 28 to 42 in d0aab9e

const consume = (callback, consumeOptions) => {
if ( ready ){
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
channel.consume(emitter.name, onMessage, opts, onConsume);
}
else {
emitter.once('ready', () => {
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
}
const onMessage = (msg) => {

In 4.7.1, queue.consume's onMessage is a function.
In 4.8.0, queue.consume's onMessage is a variable that later in the code gets an anonymous function assigned to it.

Seemingly no difference, but a very big one in practice.

If a connection has not yet been established, queue.consume will set up a listener for the connection event before trying to use onMessage in the consume function. This is most likely the path your code is taking.

If the connection has been established before queue.consume executes, then it immediately executes the consume logic using onMessage.

The problem in 4.8.0 is if the connection is already active, then it tries to consume before onMessage variable has been assigned the anonymous function. onMessage would be an empty variable and will throw an error.

You can experience this by delaying your queue.consume call with setTimeout.

Action Required: Fix Renovate Configuration

There is an error with this repository's Renovate configuration that needs to be fixed. As a precaution, Renovate will stop PRs until it is resolved.

Error type: Cannot find preset's package (github>pagerinc/infra-renovate-config). Note: this is a nested preset so please contact the preset author if you are unable to fix it yourself.

ECONNRESET after a while

I keep getting the ECONNRESET error after a while. Any idea what might be the cause?

I'm using jackrabbit along with CloudAMQP.

Error: write ECONNRESET
    at afterWriteDispatched (internal/stream_base_commons.js:154:25)
    at writeGeneric (internal/stream_base_commons.js:145:3)
    at Socket._writeGeneric (net.js:784:11)
    at Socket._write (net.js:796:8)
    at doWrite (_stream_writable.js:442:12)
    at writeOrBuffer (_stream_writable.js:426:5)
    at Socket.Writable.write (_stream_writable.js:317:11)
    at Connection.C.sendBytes (D:\Projects\microServices\services\users\node_modules\amqplib\lib\connection.js:519:15)
    at Connection.C.sendHeartbeat (D:\Projects\microServices\services\users\node_modules\amqplib\lib\connection.js:523:15)
    at Heart.<anonymous> (D:\Projects\microServices\services\users\node_modules\amqplib\lib\connection.js:433:12)
    at Heart.emit (events.js:310:20)
    at Heart.EventEmitter.emit (domain.js:482:12)
    at Heart.runHeartbeat (D:\Projects\microServices\services\users\node_modules\amqplib\lib\heartbeat.js:88:17)
    at listOnTimeout (internal/timers.js:549:17)
    at processTimers (internal/timers.js:492:7)

Compatibility with node below v14

In package.json engines section node version set to > 10, but use of optional chaining operator (I didn't check for other stuff) makes module incompatible with NodeJS 13 or lower.
Example: /lib/jackrabbit.js:30:
return Boolean(connection?.connection?.stream?.writable);

Could use a polyfill (or lib like dot-prop) or at least required NodeJS version should be set to 14

CircleCI hangs when mocha isn't force-exitting tests

As of mocha >= 4, tests do not auto-exit without a flag - orphaned async/unresolved promises/open connections will force a test suite to remain running indefinitely

Jackrabbit's mocha lib is now at 6 and the mocha options in package.json set the force-exit flag to true to work around the hanging (but passing) tests in CircleCI

I have not been able to replicate in docker-compose or locally (tests exit). This issue is to track cleaning up the tests to close out.

A good example is this test: https://github.com/pagerinc/jackrabbit/blob/master/test/jackrabbit.test.js#L58-L78 - even after adding the .close() on the connection, the test will hang in mocha

Update amqplib to v0.8.0 for supporting Node v16

Description

When installing package on Node v16 with npm, following warning is thrown

npm WARN EBADENGINE Unsupported engine {
npm WARN EBADENGINE   package: β€˜[email protected]’,
npm WARN EBADENGINE   required: { node: β€˜>=0.8 <=15’ },
npm WARN EBADENGINE   current: { node: β€˜v16.9.1’, npm: β€˜7.21.1’ }
npm WARN EBADENGINE }

This was fixed by amqplib in their latest release 0.8.0. It would be great if we could bump dependency in this project for the same.

Looks like there is a PR by bot #725

Reference

Release notes: https://github.com/squaremo/amqp.node/releases/tag/v0.8.0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.