Giter Site home page Giter Site logo

onebeyond / rascal Goto Github PK

View Code? Open in Web Editor NEW
435.0 17.0 69.0 2.41 MB

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.

License: MIT License

JavaScript 99.98% Shell 0.02%
rascal

rascal's Issues

Error "Vhost: myvhost refers to an unsupported attribute: encryption"

I used the settings from the documentation here: https://github.com/guidesmiths/rascal#encrypting-messages

When I add the encryption field to the vhost I get this error:

Error "Vhost: myvhost refers to an unsupported attribute: encryption. If I remove the encryption field it works as expected.

This setting does not work:

"publications": {
    "my_q": {
        "queue": "my.q",
        "encryption": "well-known-v1"
    }
},
"encryption": {
    "well-known-v1": {
        "key": "f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315",
        "ivLength": 16,
        "algorithm": "aes-256-cbc"
    }
}

I saw the test's added it to the publication field, if I add it there and remove it from the vhost it works fine. This setting works:

my_q: {
    queue: `my_q`,
    encryption: {
        name: 'well-known',
        key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315',
        ivLength: 16,
        algorithm: 'aes-256-cbc',
    }
}

Any ideas? I'm using "version": "3.2.0".

[Question] What would it happen if the acknowledge message from the consumer times out?

Hello @cressie176,

More than an issue, this is a question.

We have been doing some load test for an application using Rascal. We have seen that sometimes messages hang on RabbitMQ queues as unacknowledged until we restart the consumers. Our theory is the acknowledge message fails due to a ETIMEDOUT or ECONRESET. Is our theory plausible? What are your thoughts? We are thinking to add a timeout, after which, the message would be nacked. Obviously, we would need to be prepared for redeliveries of messages as explained on https://www.rabbitmq.com/confirms.html#automatic-requeueing. The problem would be that the ackOrNack is not a promise we can put into a Promise.all with a timeout, neither does return any value. So I am not sure how we could do this. Do you have any suggestions?

Thanks in advance.

Default publications and subscriptions from one vhost override those from another

Rascal will automatically create a default publication for each queue and exchange, and a default subscription for each queue. The name given to the each is just the queue or exchange name, e.g. q1 or e1.

Publications and subscriptions are hoisted from the vhost block to the root configuration. Since queue and exchange names only have to be unique to a vhost, this can lead to a default publication / subscription from one vhost overridding the default publication / subscription from another vhost.

To fix this we are going to qualify the default publication and subscriptions with the vhost name, e.g. v1/q1 or v1/e1 (/q1 or /e1 for the default vhost). This would be a breaking change. To soften the break an interim release of rascal will create both the qualified and unqualified subscriptions / publications, and warn when an unqualified subscription / publication is used.

Allow webpacking by explicitly requiring tasks

At the moment, webpack does not quite work with Rascal, as it requires the task files dynamically, which webpack does not understand:

https://github.com/guidesmiths/rascal/blob/6f23d6f43d3bc118d303dc630f0f71dfd330f55a/lib/amqp/tasks/index.js#L4

If this was changed to explicitly require the individual tasks, then I think webpack would work with this module.

To further explain my use case, I have a large project that uses Rascal for various services. The project is structured as a single code base, with multiple entrypoints, and we generate a separate webpack bundle for each service, which then become essentially the deployment artifacts. At this time, we have to specifically skip webpacking rascal, and then separately include an npm-installed webpack in each service's container.

What do you think about explicitly / statically requiring the tasks? I would be happy to work on PR if you think this is a good idea.

Reconnection failed when shutdown rabbitMq on server and swith to another cluster rabbit server

Hi,
i would like to create a nodejs script with rascal api, which is listening a rabbit queue and automatiquely reconnect to a RabbitMq server after a Rabbit shutdown and switch to another server (cluster).
please find below my code in attachment, based on the Rascal documentations.
My configuration : rabbit server cluster of 2 servers
rabbitMq 3.7.5
javascript nodejs with rascal api : 4.7.0

index.js.txt
configRascal.js.txt

It runs fine when i start de project , but , when i test the shutdown i have this message :

[2020-03-11T17:13:28.941Z] DEBUG: psf-ms-rascal-1.0.0/15684 on SCU48652 (L:\psf-ms-rascal\src\amq\listeners\index.js:15 in module.exports): Rascal listener initialized --------------
[2020-03-11T17:13:28.943Z] DEBUG: psf-ms-rascal-1.0.0/15684 on SCU48652 (L:\psf-ms-rascal\src\amq\listeners\index.js:19 in module.exports): Rascal listener running fine --------------
events.js:183
throw er; // Unhandled 'error' event

Error: read ECONNRESET
at _errnoException (util.js:1024:11)
at TCP.onread (net.js:615:25)
[nodemon] app crashed - waiting for file changes before starting...

It's normal.

But when i start rabbitmq again , it doesn't reconnect automatiquely.

I dont know how i can fix simply this issue.

this issue could be similar as one of them , but the code looked is completely different. So i need your.
Thanks a lot .
Fbernardpro.

TypeError: Cannot read property 'Symbol(Symbol.toStringTag)' of undefined

Hi, I was trying to add rascal to my project. Here is my code based on examples/simple

const config = {
	'vhosts': {
		'v1': {
			'connection': 'amqp://localhost'
		}
	}
}

Rascal.Broker.create(Rascal.withDefaultConfig(config), function(err, broker) {
	if (err) throw err
        console.log(broker)
})

here is the traceback

blah/node_modules/async/dist/async.js:228
    return supportsSymbol && fn[Symbol.toStringTag] === 'AsyncFunction';

                               ^

TypeError: Cannot read property 'Symbol(Symbol.toStringTag)' of undefined
    at isAsync (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:228:32)
    at wrapAsync (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:232:12)
    at arrayMap (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:1687:21)
    at seq (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2564:22)
    at Object.compose (/Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2624:16)
    at new Broker (webpack:///./node_modules/rascal/lib/amqp/Broker.js?:35:20)
    at eval (webpack:///./node_modules/rascal/lib/amqp/Broker.js?:21:7)
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2583:16
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:473:16
    at /Somewhere/over/the/rainbow/node_modules/async/dist/async.js:2521:9

Node v10.10.0
rascal v3.2.0

Looks like the lib is broken

Can't subscribe to message of a specific type

Rabbit has message properties. In this properties we can put a type.
image
Rabbit let you handle ( consume ) messages of a specific type so you can have multiple consumers in one application depending on the type property.

I can't manage how to do that with rascal.

Is it possible ?

How to handle RPC communication?

Sorry to ask this silly question.

I saw the publication.on('return') event listener, but can't figure out how to return message from the subscription.

Connection configuration inconsistency

Hello,
thanks for providing great tool! While I'm using it for a short period, I found out some inconsistency between README and the current implementation in Connection configuration.

  1. Regarding the README which tells me to simply provide connection as a string, connection attribute is of type ConnectionConfig only, not the string itself.
{
  "vhosts": {
    "v1": {
      // this won't work, string is not applicable for ConnectionConfig
      "connection": "amqp://guest:[email protected]:5672/v1?heartbeat=10"
    }
  }
}
  1. Ok, so I tried with the url attribute inside the ConnectionConfig, which does not work either for me in combination with withDefaultConfig
{
  "vhosts": {
    "v1": {
      "connection": {
        // this is fine, but seems like to be ignored or overridden
        "url": "amqp://guest:[email protected]:5672/v1?heartbeat=10"
      }
    }
  }
}

This ends up in a connection issue: Failed to check vhost: /. http://guest:***@localhost:15672 errored with: connect ECONNREFUSED 127.0.0.1:15672.

Seems like my url is being overridden be a default value. I'm not able to configure connection using a string. Only working solution seems to be to pass an object with username, password, hostname attributes supplied.


UPDATE: I've realized the problem number 2 is caused by check: true in the vhost configuration, which is using management API to validate this. I've completely missed that port difference in the log. So right now url attribute of the ConnectionConfig object is used.

There's only a Typo in the README, the point 1 is valid :-)

Add dynamic subscriptions/publications

Is there any way to add more publications (ie. queues) to the Broker after it has been created?
It seems that the configuration is static but in my use case I need a way to dynamically create queues.

Example:

  • I have 3 categories that I map to 3 publications
  • I create the broker
  • 5 minutes later, a new category is added, I should create a new publication for it
    => Any way I can do this without re-creating the whole broker instance ?

Graceful shutdown

Currently shutting down the rascal broker will

  1. Unsubscribe all subscriptions
  2. Disconnect from all vhosts

This means any in-flight unacknowledged messages will be rolled back (and redelivered), and the outcome of any inflight publications is unknown.

It will never be possible to shutdown in a completely graceful manner since Rascal is not responsible for the full unit of work, but it should be possible to delay disconnect until

  1. All in-flight messages have been ack'd/nack'd (or a timeout expires)
  2. All borrowed publication channels have been returned to the pool (or a timeout expires)

The former would require counting messages in and out. The latter requires setting the channel publication pool to drain after pausing channel allocation.

Thoughts @BorePlusPlus ?

Pollution of global object with connectionIndex property

Hi there. I'm working on switching my code to use rascal, and my mocha tests started warning about a global object leak of a "connectionIndex" property.

Adding the --check-leaks mocha option to rascal's own tests cause a similar issue:

  1) Broker
       should assert vhosts:
     Error: global leak detected: 'connectionIndex'
      at /c/Users/ross/code/rascal/test/broker.tests.js:95:7
      at /c/Users/ross/code/rascal/test/broker.tests.js:250:7
      at Immediate._onImmediate (lib/amqp/Broker.js:1:20202)
      at processImmediate (internal/timers.js:439:21)

I believe the culprit is this line, where this ends up being the global object. I'm assuming it should be self... instead.

Client doesn't reconnect on consumer cancelation

Hello,

It appears that when a consumer is canceled, Rascal is staying connected instead of attempting to reconnect and re-establishing its consumer. This could happen on a queue mirror failover or on a queue deletion.

This appears partly due to the line at https://github.com/guidesmiths/rascal/blob/master/lib/amqp/Subscription.js#L55 as the empty message that is sent indicating cancelation is a no-op.

It also appears the Rascal client is sending the correct capabilities header consumer_cancel_notify.
image

Is this expected behavior? Is there a way we could check that consumer/subscription status outside of that logic and manually reconnect from outside the client code?

Subscription Error

Migrated a service over today, all looks fine, but have noticed that after a while I get a subscription error, at which point my service goes into infinite loop of subscription error.

broker.subscribe('incomingEvents', function(err, subscription) {
        subscription
          .on('message', handler)
          .on('error', function(err) {
            logger.error('Subscription Error: ' + err.message);
          });
    });

Wondering what we are supposed to do - should I cancel the subscription and re-subscribe?

Messages to publish are lost when RabbitMQ connection fails

I have this simple publication code:

const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config));

broker.on('error', (error) => {
    console.error(`Broker error: ${error.message}`);
});

setInterval(async function() {
      try {
        const publication = await broker.publish('pub', 'hello world');
        publication.on('error', (error) => {
            console.error(`Publication error: ${error.message}`);
        });
      } catch (error) {
        console.error(`Rascal config error: ${error.message}`);
      }
}, 1000);

It's running fine until I shutdown the RabbitMQ server. Then I see this error in the logs:

Broker error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"

The messages are not published now, obviously. There is no Publication error in the logs though, which caused some confusion for me.

When I start the RabbitMQ server, I'd expect the in-meantime messages to be published after Rascal reconnects. This doesn't happen, the messages are lost. If I understand the code correctly they're queued in poolQueue: https://github.com/guidesmiths/rascal/blob/a68834d46ccc0b476d4530a21368ca2b234348d6/lib/amqp/Vhost.js#L211 but then after reconnection the queue is overridden with an empty one in https://github.com/guidesmiths/rascal/blob/a68834d46ccc0b476d4530a21368ca2b234348d6/lib/amqp/Vhost.js#L245

My workaround is to add a local queue and pause/resume it based on the connection status:

const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config));

const messagesToPublishQueue = async.queue((_, next) => {
	next();
}, 1);

messagesToPublishQueue.pause();

broker
    .on('error', (error) => {
        console.error(`Broker error: ${error.message}`);
    })
	.on('connect', () => {
		messagesToPublishQueue.resume();
	})
	.on('disconnect', () => {
		messagesToPublishQueue.pause();
	});

setInterval(async function() {
    messagesToPublishQueue.push(null, async () => {
      try {
        const publication = await broker.publish('pub', 'hello world');
        publication.on('error', (error) => {
            console.error(`Publication error: ${error.message}`);
        });
      } catch (error) {
        console.error(`Rascal config error: ${error.message}`);
      }
    });
}, 1000);

Handle publishing message when got connection issue to RabbitMQ server

We have an issue about the gap between service to RabbitMQ
Scenario

  1. Service try to publish message to rabbitMQ
    Service ------Message----> RabbitMQ

  2. While service is publishing the message to rabbitmq, there is something wrong with the connection. So service can not connect to RabbitMQ
    Service ------Message----X RabbitMQ

  3. Rascal will collect message to itself for buffering until the connection comeback
    Buffer(Message)
    ^
    |
    Service X RabbitMQ

  4. Service got restarted in some reasons and comback again without buffer and connect to RabbitMQ without problem

Service -------Connected-----> RabbitMQ

  1. The message gone

Well this case rarely happens if server is stable and never restart itself, but still can happen.
Any suggestions for Rascal to save publishing message somewhere else for doing retry publishing again (Redis, Storage, Etc.)

TypeError: Cannot read property 'counter' of undefined - on the auto-created subscriptions

Hallo!

I've created simple config file(see in the end of message) and used it in the code from examples

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config');
 (async () => {
  try {
    **const broker = await Broker.create(config);**
    broker.on('error', console.error);

But got the error
TypeError: Cannot read property 'counter' of undefined
at validateSubscription (c:\Src\rascal\node_modules\rascal\lib\config\validate.js:142:65)

It seems that for auto-created subscription,
subscription.redeliveries is empty

So
subscription.redeliveries.counter throws error

image

Config.json

{
  "vhosts": {
    "rabbit": {
      "connection": {
        "url": "amqp://usr:pass@app-serv:5742/"
      },
      "queues": [
        "1C"
      ]
    }
  }
}

Nacking a message with confirmation:true does not actually nack

The config is:

        "p2": {
            "vhost": "zhost",
            "exchange": "e2",
            "confirm": true
        }

The subscriber does this:

broker.subscribe('s2', function (err, message, content, next) {
        console.log(content)
        next(new Error("foo"))
    }, { prefetch: 1 })

The publisher receives this: (the debug output shows that rascal receives a nack, however the publisher's completion handler does not get this info

  rascal:Subscription Not acknowledging message: f6e005a6-63f2-4f6e-8199-79bba40eacb6 with options: +5ms {}
publisher { '0': null, '1': 'f6e005a6-63f2-4f6e-8199-79bba40eacb6' }

Publishers code:

broker.publish('p2', 'This is a test message', { routingKey: 'z.p' }, function () {
            console.log("publisher", arguments);
        })

RFC Configuration based queue/exchange deletion

RabbitMQ doesn't allow you to redefine queues. Typically you have to create a new queue + bindings then delete the old one, which is a pain. I think Rascal might be able to automate this but introducing a delete attribute into the queue config

queues: {
  doomed: {
    delete: true
  }
}

Rascal may be able to

  1. Setup all defined exchanges / queues as per the current behaviour (excluding deleted ones)
  2. Unbind the queue
  3. Wait for the queue to be empty
  4. Delete the queue

The are some caveats though:

  • This approach is not completely safe, but it's reasonable in many circumstances
  • Early version of RabbitMQ will destroy the connection is the queue does not exist (therefore use a dedicated connection just in case)
  • The queue would still be bound to the default exchange and so may never drain
  • AMQP may not support programatic removal of all bindings / list current bindings
  • AMQP does include a queue depth function. I would either have to get + nack with requeue which may have side-effects, or use the HTTP api
  • Repeated attempts to delete a queue will fail so I would need to tolerate failures or check for existence

Rinse and repeat for exchanges

How to declare a queue as lazy?

👋 I have a number of queues that I need to connect to that have been predeclared as lazy. I cannot seem to find the correct settings to add to the Rascal config file.

What I have unsuccessfully tried:

// ...
queues: {
  "this.is.a.queue": {
    options: {
      arguments: {
        "x-queue-mode": "lazy"
        // Also tried queueMode: "lazy"
      }
    }
  }
}
// ...
// ...
queues: {
  "this.is.a.queue": {
    options: {
      "x-queue-mode": "lazy"
      // Also tried queueMode: "lazy"
    }
  }
}
// ...

This is the error that is emitted when attempting to connect:

{ Error: Operation failed: QueueDeclare; 406 (PRECONDITION-FAILED) with message
"PRECONDITION_FAILED - inequivalent arg 'x-queue-mode' for queue 'this.is.a.queue'
in vhost '/': received the value 'lazy' of type 'longstr' but current is none"

I am happy to update the documentation and submit a pull request once I've solved this 👍

amqplib missing dependency

Hi team!

I got an error Error: Cannot find module 'amqplib/callback_api' just after requiring the module.

Sample code to test:

const Rascal = require('rascal')
console.log("Hi Rascal!")

I am using:

  • Node v10.15.3
  • NPM v6.9.0
  • OSX 10.14
  • Rascal 4.5.0

Versions affected

  • v4.3.0
  • v4.4.0
  • v4.5.0

Why?
I discovered that in version v4.3.0 the dependecies were modified. So amqplib is part of devDependencies and peerDependencies. But since Npm@3, the way to manage peer dependencies has changed. So it won't install it by default (as expected).

How to fix it?
I was able to fix this bug. In order to do that I relocated amqplib as regular dependency and not devDependencies.

Please confirm if you believe that this is the correct way to fix it.

I will open a PR asap 😉

Support connection max retries

Hi there,

What do you think about supporting an additional flag maxRetries inside connection.retry config?

Without a max retries, there is a change that the app is stuck in a loop reconnecting when there's something wrong externally.

With max retries, eventually the app would exit/crash and it would be easier to see that something is wrong.

Rascal doesn't reconnect if Rabbit is killed

If I kill RabbitMQ and immediately restart it, the Rascal service breaks (even after waiting for RabbitMQ to become fully online again) with the following error next time it attempts to use the connection, e.g. for publishing a message. This also causes my app to crash (I assume this means the error is coming from a stream that I can't add an error handler to, but I haven't investigated this too much).

0|app      | IllegalOperationError: Connection closed (Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
0|app      |     at Connection.<anonymous> (/app/node_modules/amqplib/lib/connection.js:359:11)
0|app      |     at ConfirmChannel.C.sendImmediately (/app/node_modules/amqplib/lib/channel.js:64:26)
0|app      |     at ConfirmChannel.C.sendOrEnqueue (/app/node_modules/amqplib/lib/channel.js:75:10)
0|app      |     at ConfirmChannel.C._rpc (/app/node_modules/amqplib/lib/channel.js:131:8)
0|app      |     at ConfirmChannel.Channel.rpc (/app/node_modules/amqplib/lib/callback_model.js:73:8)
0|app      |     at ConfirmChannel.Channel.open (/app/node_modules/amqplib/lib/callback_model.js:86:15)
0|app      |     at CallbackModel.createConfirmChannel (/app/node_modules/amqplib/lib/callback_model.js:285:6)
0|app      |     at createChannel (/app/node_modules/rascal/lib/amqp/Vhost.js:167:38)
0|app      |     at Object.create (/app/node_modules/rascal/lib/amqp/Vhost.js:115:17)
0|app      |     at Pool._createResource (/app/node_modules/generic-pool/lib/generic-pool.js:354:17)
0|app      |     at Pool.dispense [as _dispense] (/app/node_modules/generic-pool/lib/generic-pool.js:314:10)
0|app      |     at Pool.acquire (/app/node_modules/generic-pool/lib/generic-pool.js:436:8)
0|app      |     at /app/node_modules/rascal/lib/amqp/Vhost.js:135:18
0|app      |     at /app/node_modules/rascal/node_modules/async/dist/async.js:4069:9
0|app      |     at process (/app/node_modules/rascal/node_modules/async/dist/async.js:2317:17)

This seems to be because Rascal isn't listening for the connection to close and therefore never attempts to recreate the connection. I can fix this by adding the following line into the Vhost init function:

ctx.connection.on('close', handleConnectionError.bind(null, config));

... such that the code becomes:

this.init = function(next) {
    debug('Initialising vhost: %s', config.name)
    pauseChannelAllocation()
    init(config, {}, function(err, config, ctx) {
        if (err) return next(err)
        self.emit('connect')
        ctx.connection.removeAllListeners('error')
        ctx.connection.once('error', handleConnectionError.bind(null, config))
        forwardEvents(ctx.connection, self, function(eventName) {
            return eventName === 'blocked' || eventName === 'unblocked';
        })
        ctx.connection.on('close', handleConnectionError.bind(null, config));
        connection = ctx.connection
        connectionConfig = ctx.connectionConfig
        resumeChannelAllocation()
        return next(null, self)
    })
    return self
}

With this line added, when RabbitMQ is killed, the service immediately notices and attempts to retry the connection until it succeeds:

0|app      | Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | getaddrinfo ENOTFOUND rabbitmq rabbitmq:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
[...]
0|app      | connect ECONNREFUSED 172.18.0.4:5672
0|app      | connect ECONNREFUSED 172.18.0.4:5672
[ eventually the connection is successful and no more errors are logged ]

Furthermore, while the connection is in the retrying state above, attempting to publish messages will not cause the aforementioned error to be thrown and therefore my app does not crash. I have some try-catches in there that are probably swallowing whatever errors are being thrown instead.

This may be naive as I don't understand how all the moving parts in Rascal/amqplib fit together 😅 Just hoping to get your initial thoughts and I'll be happy to investigate further on any of these points if you need me to.

Why callback of the subscription 'error' event doesn't have 'ackOrNack' parameter as 'message' and 'invalid_content' ?

I don't understand the reason why the callback of the subscription error event doesn't have ackOrNack parameter.
In this way, if I want to handle a recovery strategy in case of an unhandled error inside the message event handler, I need to catch the error. Therefore I don't understand what is the purpose of the error event...

I was expecting that in case of an unhandled error inside the message event handler, Rascal would emit the error event passing the ackOrNack callback as parameter. So, in this case, inside the error event handler I could fire-up my recovery strategy (without the need to catch anything inside the message event handler).

Cancelling a subscription before the session has been opened leads to problems

Calling subscription.subscribe() only starts consuming messages after a message handler has been added using subscription.on('message', ...)

Adding a message handler is a synchronous operation because it uses Node's event emitter. Therefore it is not possible to wait until the subscription has actually be completed.

We have a test that sets up a subscription, executes quickly without using Rascal, then calls subscription.cancel() in the common teardown.

subscription.cancel synchronously marks the session as cancelled, then queues (using async) the action session.close.

This can resulting in the following sequence...

  1. subscription.subscribe
  2. register on message handler
  3. channel.consume
  4. cancel the session
  5. session._open (would normally attach the channel to the session)
  6. session._close

However instead of attaching the channel to the session, session._open yields an error if the session is cancelled. Consequently the session never gets a channel, session._close does nothing, but the channel.consume is still operational.

If a message published to the queue, channel.consume will involve the message handler, which will not be able to acknowledge the message, because the session does not have a reference to the channel.

Possible solutions:

  • Queue session.cancel instead of session._close
  • Still attach the channel the session, even if cancelled.

It would also be nice to emit an 'opened' event once the session has been opened

shutdown() does not gracefully drain / clear generic pools

Related to #50. I've noticed that after trying to do a graceful shutdown(), my process hangs around for about ~30 seconds. Using wtfnode it seems that there is still a timer from generic-pool that keeps the process running for a while:

[WTF Node?] open handles:
- File descriptors: (note: stdio always exists)
  - fd 1 (tty) (stdio)
  - fd 2 (tty) (stdio)
- Timers:
  - (1000 ~ 1000 ms) (anonymous) @ /code/node_modules/generic-pool/lib/generic-pool.js:256

I'm guessing this is part of generic-pool's idle handling. idleTimeoutMillis defaults to 30s which is about the time I'm seeing before actual process exit.

Perhaps the rascal vhost should drain and clear the pools during shutdown?

From the generic-pool docs:

/**
 * Step 3 - Drain pool during shutdown (optional)
 */
// Only call this once in your application -- at the point you want
// to shutdown and stop using this pool.
myPool.drain().then(function() {
  myPool.clear();
});

I'm new to rascal but I'd be happy to work on a PR if you'd like this change.

Connected event listener

Hello, really great work on Rascal, thanks!

Just a question. I can "create" a broker and add a listener for an error event:

const broker = await rascal.BrokerAsPromised.create(rascal.withDefaultConfig(rascalConfigServer))
broker.on("error", onRascalBrokerError)

How can I add a listener for "connected" event instead?

Thanks again.

allow 'bindingKey' to be an array

instead of this:

                "order-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.order.created"
                },
                "product-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.product.created"
                },
                "billing-run-created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": "salesforce.v1.notifications.billing_run.created"
                }

it would be nice to just write this:

                "created": {
                    "source": "gateway",
                    "destination": "openaccounts_adapter:persist",
                    "bindingKey": ["salesforce.v1.notifications.order.created", "salesforce.v1.notifications.product.created",
"salesforce.v1.notifications.billing_run.created"]
                },

Process crash with "Callback was already called."

Hi there,

I am getting this error.

if (fn === null) throw new Error("Callback was already called.");
                         ^
Error: Callback was already called.
    at /path/to/project/node_modules/async/dist/async.js:903:32
    at /path/to/project/node_modules/async/dist/async.js:2490:13
    at /path/to/project/node_modules/async/dist/async.js:2551:17
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:24:9
    at /path/to/project/node_modules/async/dist/async.js:4566:26
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:18:29
    at /path/to/project/node_modules/rascal/lib/amqp/tasks/createConnection.js:32:29
    at /path/to/project/node_modules/amqplib/callback_api.js:16:10
    at /path/to/project/node_modules/amqplib/lib/connect.js:147:12
    at bail (/path/to/project/node_modules/amqplib/lib/connection.js:175:5)

Not sure, how this error is created. No other logs. I am not even sure that this is caused from application code or connection related issue triggered in library code.

Thank you!

"Encryption: name refers to an unsupported attribute: 0" when setting encryption field in subscription to a profile name

Here's the setting:

"s1": {
    queue: `q1`,
    encryption: "well-known",
    contentType: "application/json",
    redeliveries: {
        limit: 5,
        counter: "shared"
    }
}

The readme doesn't show an example under the subscription section. There is a section for it but the example seems incomplete.

If I comment out the encryption field it works.

Outside of the vhost section I have this setting:

"well-known": {
    key: this.cryptoKey,
    ivLength: 16,
    algorithm: "aes-256-cbc"
}

Error on publish confirm

I am trying to publish a message using publish confirm with a publication with mandatory: true flag.
Every time I try to publish a message I get the following error:

Error was:  { TypeError: Cannot read property 'messageId' of undefined
    at EventEmitter.publication.on.on.on (/home/paolo/dev/rabbitMQ-producer.js:15:67)
    at EventEmitter.emit (events.js:189:13)
    at ConfirmChannel.emit (events.js:189:13)
    at /home/paolo/dev/node_modules/amqplib/lib/channel.js:273:10
    at ConfirmChannel.content [as handleMessage] (/home/paolo/dev/node_modules/amqplib/lib/channel.js:326:9)
    at ConfirmChannel.C.acceptMessageFrame (/home/paolo/dev/node_modules/amqplib/lib/channel.js:241:31)
    at ConfirmChannel.C.accept (/home/paolo/dev/node_modules/amqplib/lib/channel.js:394:17)
    at Connection.mainAccept [as accept] (/home/paolo/dev/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/home/paolo/dev/node_modules/amqplib/lib/connection.js:478:48)
    at Socket.emit (events.js:189:13)
    at emitReadable_ (_stream_readable.js:535:12)
    at process._tickCallback (internal/process/next_tick.js:63:19) code: 541 }

If I set mandatory: false everything is fine.

My configurations are:

exchanges: {
  messages: {
    assert: true,
    check: true,
    type: 'fanout',
    options: {
      durable: true,
      autoDelete: false
     }
   }
 }

and

      publications: {
        messages_pub: {
          exchange: 'messages',
          routingKey: '#',
          confirm: true,
          options: {
            persistent: true,
            mandatory: true
          }
        }
      }

The exchange where I am publishing messages has no queues bound.

Rascal: 4.1.0
RabbitMQ: 3.7.8

Retry seems broken on subscription

Hey there. I'm trying to follow the advanced example. I've made one of my handlers immediately throw an error which results in a channel error. My issue is that the retry option is ignored and so I am experiencing hundreds of restarting channels per second.

rascal:Subscription Received message: undefined from queue: import_service:project:import +3ms
rascal:Subscription Handling channel error: crashed :( from import_project using channel: 2ecf933a-7575-4f01-8882-b8e47470352e +5ms
app:log subscription.on('error') +9ms
app:rascal crashed :( +8ms
rascal:Subscription Subscribing to queue: import_service:project:import +0ms
rascal:Vhost Requested channel. Outstanding channel requests: 1 +7ms
rascal:SubscriberSession Removing channel: 2ecf933a-7575-4f01-8882-b8e47470352e from session +8ms
rascal:Vhost Created channel 7879819d-ba7a-4995-b8da-869c69daa79f from connection: c736f742-5ad7-45f1-b7da-33c381788acc +2ms
> Subscription.js: attachErrorHandlers
> Subscription.js: channel.consume callback null { consumerTag: 'amq.ctag-v23v7N-_ej5WSBp4vQscmA' }

It seems the channel is restarted immediately regardless of retry's min property.. I've tried {retry:1000,retry:true,and leaving it out} My code is pretty similar to the advanced example, although for some reason in the example the errors aren't caught and the app stops.

Anyway, my question is, is this expected behavior to have the channel restarting so much or did I go wrong somewhere? Thanks for any advice and for this library!

Also: retry: { delay: 1000 } does nothing despite what the docs suggest. I think it was changed to retry: 1000? #12

Action required: Greenkeeper could not be activated 🚨

🚨 You need to enable Continuous Integration on all branches of this repository. 🚨

To enable Greenkeeper, you need to make sure that a commit status is reported on all branches. This is required by Greenkeeper because we are using your CI build statuses to figure out when to notify you about breaking changes.

Since we did not receive a CI status on the greenkeeper/initial branch, we assume that you still need to configure it.

If you have already set up a CI for this repository, you might need to check your configuration. Make sure it will run on all new branches. If you don’t want it to run on every branch, you can whitelist branches starting with greenkeeper/.

We recommend using Travis CI, but Greenkeeper will work with every other CI service as well.

Once you have installed CI on this repository, you’ll need to re-trigger Greenkeeper’s initial Pull Request. To do this, please delete the greenkeeper/initial branch in this repository, and then remove and re-add this repository to the Greenkeeper integration’s white list on Github. You'll find this list on your repo or organization’s settings page, under Installed GitHub Apps.

Dynamic assertion of queues and exchanges.

I didn't find the way to access the amqplib functions like "assertQueue" or "bindQueue" to dynamically add some exchanges, queues, etc on the configuration.

for my needs as an example , i want to publish on new exchanges that i create dynamically depending on the different parameters that i receive on my API

( Thanks a lot for this library ;) )
using :
rascal : 4.6.2
node : 10.15.3

Change Delay Queue's TTL

In my setup I have multiple queues with various delays and these delays are configurable, so as I made some search if I want to change any config of a queue I need to create a new queue then migrate messages in the old queue to the new one and then delete the old one. I tried to find something to help me achieve this with rascal but couldn't find anything to help me delete a queue/exchange/binding from the exposed API.
I really liked this tool and I'm thinking of using it in my app instead of the amqplib because of the reasons mentioned here.

So do you have any suggestion on how I might do this?

Cluster Connections: can we cancel randomness and keep the given order?

Hello,

Thank you for your great wrapper lib.
I'm trying to setup a RabbitMQ cluster with an active/passive behaviour for high-availability, where I only want to connect to the main node except if it fails.

Is there a way to disable the randomness in the vhost > connections configuration?
I would like the client to systematically connect in the given order.

Thank you!

How/where to handle Heartbeat timeout exception

Hi @cressie176,

I am trying to test the reconnection part of Rascal. When shutting down RabbitMQ, I see the reconnection code working, using the DEBUG mode for Rascal. I see some instances of Heartbet timeout coming up from the amqplib, like:

at Heart.emit (events.js:198:13)',\n ' at Heart.EventEmitter.emit (domain.js:448:20)',\n ' at Heart.runHeartbeat (/Users/carlosgarcia/Documents/mailonline/development/mol-fe/mol-fe-web-push-api/node_modules/amqplib/lib/heartbeat.js:88:17)',\n ' at ontimeout (timers.js:436:11)',\n ' at tryOnTimeout (timers.js:300:5)',\n ' at listOnTimeout (timers.js:263:5)',\n ' at Timer.processTimers (timers.js:223:10)' ]

I have two applications, one producer and one consumer, using the same broker, and while the producer is able to publish messages after the reconnection, the consumer is not able to consume messages even though it seems that is has reconnected and re-established the subscriptions.

I see uncaught exceptions for the hearbeat and unexpected close errors. How should I handle those? I have .on('error') listeners in all the places mentioned in the documentation, when creating the broker and for the subscription and the publication. We use the promise version.

I can see this in the DEBUG rascal logs on the consumer application

rascal:Vhost Initialising vhost: my-host-name rascal:tasks:createConnection Connecting to broker using url: amqp://admin:***@localhost:5673/web-push-api?heartbeat=10&connection_timeout=10000&channelMax=100 rascal:tasks:createConnection Obtained connection: my-connection-hash rascal:tasks:createChannel Creating channel rascal:tasks:assertExchanges Asserting exchange: my-exchange rascal:tasks:assertQueues Asserting queue: subscription rascal:tasks:applyBindings Binding queue: subscription to exchange: my-exchange with binding key: my-binding-key rascal:tasks:closeChannel Closing channel rascal:Vhost vhost: my-host-name was initialised with connection: my-connection-hash

Is normal that it closes the channel?

Checking the RabbitMQ UI Management, it seems the consumer doesn't get attached to the queue.

Please, let me know if you need any more info from my side.

Thanks a lot.

Retry not working, no consumers listed in rabbitmq admin dashboard

I need help figuring out this issue. I use rascal with multiple microservices and it works really well. But I have one microservice that after some amount of time is no longer connected to rabbitmq. I go to the admin dashboard for rabbitmq and see no consumers listed for the queue used by this microservice. In this last instance I see a lot of this error in the logs:

Unexpected close
Channel ended, no reply will be forthcoming

I don't know if that's related but I see it many times, so that makes me think it was able to reconnect. Any idea's on things I can look at to help figure this out? Do you know of any cases where this might happen?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.