Giter Site home page Giter Site logo

vertx-rabbitmq-client's Introduction

RabbitMQ Client for Vert.x

Build Status (5.x) Build Status (4.x)

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

Getting Started

Please see the main documentation on the web-site for a full description:

Running the tests

By default the tests uses a cloud provided RabbitMQ instance.

% mvn test

You can run tests with a local RabbitMQ instance:

% mvn test -Prabbitmq.local

You will need to have RabbitMQ running with default ports on localhost for this to work.

You can setup a RabbitMQ instance with Docker:

docker run --rm --name vertx-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq

vertx-rabbitmq-client's People

Contributors

aangelidis avatar afloarea avatar barmic avatar cescoffier avatar dependabot[bot] avatar elventear avatar greatbadness avatar jonjomckay avatar kdubb avatar kevin-a avatar luramarchanjo avatar mattfoxvog avatar mjedwabn avatar neway13 avatar nscavell avatar ozangunalp avatar paullatzelsperger avatar pmlopes avatar poiuytrez avatar purplefox avatar ruget avatar sammers21 avatar slinkydeveloper avatar tcheutchoua-steve avatar tsegismont avatar tungnguyeneu avatar vietj avatar yaytay avatar yaytayatwork avatar yeikel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vertx-rabbitmq-client's Issues

queueDeclare fails when queue TTL is set

I'm experiencing the following issue with 3.5.1:
This code:

JsonObject config = new JsonObject();
config.put("x-message-ttl", 5000L);
rabbitClient.queueDeclare(queueName, isQueueDurable(), false, false, config, result -> {
// the result is failed

Returns the exception as result.cause():

java.io.IOException: null
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
	at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:843)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:281)
	at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$queueDeclare$17(RabbitMQClientImpl.java:322)
	at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$forChannel$26(RabbitMQClientImpl.java:429)
	at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:273)
	at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'storage.states.save.retries.5000' in vhost '/workers': received the value '5000' of type 'long' but current is none, class-id=50, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:360)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:225)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
	... 10 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'storage.states.save.retries.5000' in vhost '/workers': received the value '5000' of type 'long' but current is none, class-id=50, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:483)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:320)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:559)
	... 1 common frames omitted

The actual error message seems to be:
#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'storage.states.save.retries.5000' in vhost '/workers': received the value '5000' of type 'long' but current is none, class-id=50, method-id=10)

When I remove the following: config.put("x-message-ttl", 5000L); the code works.

Please also notice that a similar code can be found in the docs: https://vertx.io/docs/vertx-rabbitmq-client/java/#_declare_queue_with_additional_config :

JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);

client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
  if (queueResult.succeeded()) {
    System.out.println("Queue declared!");
  } else {
    System.err.println("Queue failed to be declared!");
    queueResult.cause().printStackTrace();
  }
});

I'm aware that I can also set up TTL on messages directly but I cannot use it in my code (it's a long story)

Can you take a look at this please?

message headers failure

Whenever I receive a message that contains a headers values, and i apply includeProperties, the framework places them in the JsonObject, which causes the JsonObject serialization to fail with message:

io.vertx.core.json.EncodeException: Failed to encode as JSON: No serializer found for class java.io.DataInputStream and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) ) (through reference chain: java.util.LinkedHashMap["properties"]->java.util.LinkedHashMap["headers"]->java.util.HashMap["contentType"]->com.rabbitmq.client.impl.ByteArrayLongString["stream"])

This means that the message then cannot be sent on the event bus and cannot be consumed.
Am i doing something wrong? Or is this indeed an issue?

For me, in order to reproduce - i just added headers to the message.

Thanks.

Support Alternate Exchange and Dead Lettering

As far as I can see there is no possibility to pass a Map into the RabbitMQClient#exchangeDeclare() method which would be needed to pass along configuration parameters such as x-dead-letter-exchange or alternate-exchange.
Please add it to the RabbitMQClient class(es), also to the rxjava variant. Although this isn't specified by AMQP 0.9.1 it is provided by RabbitMQ, so it should be made available.

I will gladly implement the fix and offer a pull request.

For now users are forced to include the vertx-rabbitmq-client at source level which is ugly.

Client basic get should reuse RabbitMQMessage instead of JsonObject

Replace basicGet(String queue, boolean autoAck, Handler<AsyncResult<JsonObject>> resultHandler) with basicGet(String queue, boolean autoAck, Handler<AsyncResult<RabbitMQMessage>> resultHandler).

RabbitMQMessage is already used in basic consumer.

This is a breaking change.

io.vertx.rabbitmq.impl.RabbitMQClientImpl#start wrong async logic

Recently i tried to use some of the Vertx async features(Future, Handlers) with RabbitMQClientImpl. However, i quickly encountered strange behavior with this method. When i map some function with start method of RabbitMQClientImpl there is a situation when mapping function or handler will never be called. When the client encounters java.net.ConnectException: Connection refused(Rabbitmq is down) it will call internally method reconnect() without completing or failing internal future: https://github.com/vert-x3/vertx-rabbitmq-client/blob/master/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java#L338, as a result my application just hangs and even if the connection is restored later application cannot continue to work and need to be restarted. My question is: why cannot we pass future https://github.com/vert-x3/vertx-rabbitmq-client/blob/master/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java#L330 to reconnect method so he will complete it after successful reconnect? I tried this code and it worked:

   private <T>void reconnect(Future<T> future) throws IOException {
      ....
                  try {
                    log.debug("Reconnect attempt # " + attempt);
                    connect();
                    vertx.cancelTimer(id);
                    future.complete();
                    log.info("Successfully reconnected to rabbitmq (attempt # " + attempt + ")");

channel.queueBind(..,..,..,Map<S,O>) missing, needed for use of 'headers' exchange

In order to properly use a headers exchange, you need to pass a proper configuration during queue binding. This is currently not possible, since the RabbitMQClient wrapper doesn't delegate to com.rabbitmq.client.Channel.queueBind(String, String, String, Map<String, Object>).

It should be fairly simple to implement, since this method is only an overload to com.rabbitmq.client.Channel.queueBind(String, String, String), which the wrapper already delegates to. In fact, if you look at the upstream implementation, the later method internally calls the former one, passing null for the Map<String, Object> parameter.

Please, add this part to the RabbitMQClientImpl (and the matching definition to RabbitMQClient):

  @Override
  public void queueBind(String queue, String exchange, String routingKey, JsonObject arguments, Handler<AsyncResult<Void>> resultHandler) {
    forChannel(resultHandler, channel -> {
      channel.queueBind(queue, exchange, routingKey, Utils.asMap(arguments));
      return null;
    });
  }

Not possible to connect to a rabbitmq cluster

When creating a connection it should be possible to specify a list of rabbitmq host:port combinations in order to connect to a random host in a set of known cluster hosts. The connection should be established like this:
connectionFactory.newConnection(new Address[]{new Addres("host1",1234)});

(Address is a class of the rabbitmq client)

basicConsume produces invalid message order on the eventbus channel

I created a simple test program, Sender sends a message with a counter, the Receiver receives it, but on the receiving site the order of the messages are wrong.

https://gist.github.com/tibor-kocsis/a75d5a8cc8dd8d9a5a2f494f5bf48865

If you run the Sender, then a few minutes later the Receiver, the output will be the following:

https://pastebin.com/M2zPPe7w

As you can see, the counter is not incement at the messages that was sent before the Receiver starts. I looked into the implementation, and my guess is that the problem is in the ConsumerHandler class line 58. There is no need for the vertx.runOnContext call, you should just call the handler with the message. The handleDelivery function is called by a rabbitmq-client threadpool, and I guess vertx creates here a new context for each message, so the handler calls will run parallel, which causes the invalid order of the message delivering.

future complete not called after reconnect in RabbitMQClientImpl.start

When using docker-compose my app attempted to connect to RabbitMQ container before RabbitMQ was ready to accept connections. I have connectionRetries=3 so my app correctly tried again. On the second attempt connection succeeded but my app hung and the verticle didn't finishing starting. Looking at RabbitMQClientImpl start method, version 3.5.0 line 338, I noticed future.complete() not called.

Excerpt from logs:

Jan 11, 2018 1:51:58 AM io.vertx.rabbitmq.impl.RabbitMQClientImpl
INFO: Starting rabbitmq client
Jan 11, 2018 1:51:58 AM io.vertx.rabbitmq.impl.RabbitMQClientImpl
SEVERE: Could not connect to rabbitmq
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:47)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:822)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:778)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:868)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.newConnection(RabbitMQClientImpl.java:86)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.connect(RabbitMQClientImpl.java:396)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$start$20(RabbitMQClientImpl.java:332)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:278)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:80)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Jan 11, 2018 1:51:58 AM io.vertx.rabbitmq.impl.RabbitMQClientImpl
INFO: Attempting to reconnect to rabbitmq...
Jan 11, 2018 1:52:08 AM io.vertx.rabbitmq.impl.RabbitMQClientImpl
INFO: Successfully reconnected to rabbitmq (attempt # 1)

No support for ConfirmListener

In order for publishers to use message confirmations without blocking their throughput it is necessary to use ConfirmListeners to asynchronously receive confirmation of messages sent.
There is currently no support for this in vertx-rabbitmq-client.

basicConsume handler fails on invalid json message

rabbitmq.basicConsume("queue", "amqp.service_channel.message", consumeResult -> { 
// ... 
});

Consumer handler will thrown an exception and terminated in case of invalid json message from AMQP

DefaultExceptionHandler: Consumer io.vertx.rabbitmq.impl.ConsumerHandler@5918b2f4 (amq.ctag-Yj0RjwWdb2Zg6_pZN4hIPw) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1):
io.vertx.core.json.DecodeException: Failed to decode:Unexpected character ('t' (code 116)): was expecting double-quote to start field name
at [Source:
Unknown macro: {type}
; line: 1, column: 3]
at io.vertx.core.json.Json.decodeValue(Json.java:76)
at io.vertx.core.json.JsonObject.fromJson(JsonObject.java:796)
at io.vertx.core.json.JsonObject.<init>(JsonObject.java:47)
at io.vertx.rabbitmq.impl.Utils.parse(Utils.java:101)
at io.vertx.rabbitmq.impl.ConsumerHandler.handleDelivery(ConsumerHandler.java:53)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:95)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

SSL option for client

I cannot find any option to turn on SSL, am I simply overlooking this option or is it simply not available?

queueDeclare does not accept not String configuration parameters

Nowadays, the endpoint is defined as following:
void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, String> config, Handler<AsyncResult> resultHandler);

However, there are parameters like x-message-ttl that only accepts integers.

I have tried to change to Map<String,Object> but I'd got an awful error

GRAVE: io.vertx.codegen.CodeGenProcessor - Could not generate model for queueDeclare(java.lang.String,boolean,boolean,boolean,java.util.Map<java.lang.String,java.lang.Object>,io.vertx.core.Handler<io.vertx.core.AsyncResult<io.vertx.core.json.JsonObject>>): type java.util.Map<java.lang.String,java.lang.Object> is not legal for use for a parameter in code generation
io.vertx.codegen.GenException: type java.util.Map<java.lang.String,java.lang.Object> is not legal for use for a parameter in code generation

If you are able to modify the codegen system to accept this kind of parameter, I can create a PR with this change.

Link between rabbitmq consumer and vertx event bus isnt working when message has headers

// rabbitmq config has "includeProperties": true
RabbitMQClient.create(vertx, config.getJsonObject("rabbitmq"));


// Setup the link between rabbitmq consumer and vertx event bus 
rabbitmq.basicConsume("queue", "amqp.service_channel.message", consumeResult -> {
       // ...
 });

When I send AMQP message with AMQP Headers error occures

Unhandled exception
java.lang.IllegalStateException: Illegal type in JsonObject: class com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
        at io.vertx.core.json.Json.checkAndCopy(Json.java:115) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.json.JsonObject.copy(JsonObject.java:646) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.json.Json.checkAndCopy(Json.java:94) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.json.JsonObject.copy(JsonObject.java:646) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.json.Json.checkAndCopy(Json.java:94) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.json.JsonObject.copy(JsonObject.java:646) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.codecs.JsonObjectMessageCodec.transform(JsonObjectMessageCodec.java:49) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.codecs.JsonObjectMessageCodec.transform(JsonObjectMessageCodec.java:27) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.MessageImpl.<init>(MessageImpl.java:92) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.MessageImpl.copyBeforeReceive(MessageImpl.java:106) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.EventBusImpl.doReceive(EventBusImpl.java:748) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.EventBusImpl.receiveMessage(EventBusImpl.java:713) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPub(EventBusImpl.java:544) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:149) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:134) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$null$0(RabbitMQClientImpl.java:53) ~[vertx-rabbitmq-client-3.0.0.jar:na]
        at io.vertx.rabbitmq.impl.RabbitMQClientImpl$$Lambda$75/1185211598.handle(Unknown Source) ~[na:na]
        at io.vertx.rabbitmq.impl.ConsumerHandler.lambda$handleDelivery$18(ConsumerHandler.java:55) ~[vertx-rabbitmq-client-3.0.0.jar:na]
        at io.vertx.rabbitmq.impl.ConsumerHandler$$Lambda$120/1066768999.handle(Unknown Source) ~[na:na]
        at io.vertx.core.impl.ContextImpl.lambda$wrapTask$15(ContextImpl.java:314) ~[vertx-core-3.0.0.jar:na]
        at io.vertx.core.impl.ContextImpl$$Lambda$14/1499867659.run(Unknown Source) [vertx-core-3.0.0.jar:na]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [netty-common-4.0.28.Final.jar:4.0.28.Fina
l]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [netty-transport-4.0.28.Final.jar:4.0.28.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [netty-common-4.0.28.Final.jar:4.0.28.Final]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

Consumer cancellation not handled

The consumer class used to consume from a queue only implements handleDelivery and none of the other callbacks. I think at least handleCancel should be implemented not necessarily by directly reregister the consumer (as this could fail due to a queue that has been deleted) but rather by notifiying the User of the Client in some way.

More Info: https://www.rabbitmq.com/consumer-cancel.html

Publishing messages when server is down results in lots of threads and connections

I have a test that sends 1M messages to rabbit (as fast as it can receive HTTP requests).
If the test starts whilst the RabbitMQ server is down the client correctly builds up the requests and the messages do get sent through when the server comes up.
However, at this point the debugger is showing that there are lots (hundreds) of threads in the client process.

I have a secondary issue that if the server is taken down after the connection has been established it breaks the client and I haven't found a way around that.

Note that I'm working with vertx4, milestone 3.

Connection recovery

Currently the vertx-rabbitmq-client (VRC) sets the channel and connection to null on receiving the shutdownCompleted event before attempting to reconnect. During the reconnection process consumers and publishers will not be restored.

The Java RabbitMQ client supports recovery (http://www.rabbitmq.com/api-guide.html#recovery). Is there a specific reason why this hasn't been enabled?

Activation would simply be:

private static Connection newConnection(JsonObject config) throws IOException, TimeoutException {
    ConnectionFactory cf = new ConnectionFactory();
    // ... other code ...
    cf.setAutomaticRecoveryEnabled(true);

    return cf.newConnection();
}

It seems to work, though there doesn't seem to be any logging.

Add connection parameters

The current connection configuration supports only a few parameters

most used:

factory.setRequestedHeartbeat(..);
factory.setHandshakeTimeout(..);

others:

factory.setRequestedChannelMax(..);
factory.setRequestedFrameMax(..);
factory.setNetworkRecoveryInterval(..);

To add here

The correct channel must be used when doing manual acks and using multiple verticles..

This is more a doc issue...

When deploying multiple verticles and using the code as below exceptions get thrown...

String address = "my.address"; 
//+ addressCount.incrementAndGet();
			
vertx.eventBus().consumer(address, msg -> {
    JsonObject json = (JsonObject) msg.body();
client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
    if(!asyncResult.succeeded()) {
            //asyncResult.cause().printStackTrace();
        } else {
        }
    });
});

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("events", address, false, consumeResult -> {
    if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    } else {
        consumeResult.cause().printStackTrace();
    }
});

To get around this, we have to make sure that each consumer/channel pair publish to their own eventbus addtress such as: my.address.1, my.address.2 and so on...

The exception:

com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 56, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:194)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:308)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:302)
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1042)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:71)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:369)
    at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$0(RabbitMQClientImpl.java:145)
    at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$22(RabbitMQClientImpl.java:413)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:263)
    at io.vertx.core.impl.OrderedExecutorFactory$OrderedExecutor.lambda$new$0(OrderedExecutorFactory.java:94)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Message encoding

Hello,

In io.vertx.rabbitmq.impl.Utils (here), we use platform dependant charset for encode message.

For decoding we enforce usage of UTF8 if no encoding given.

Maybe we can have a symmetric solution for encoding?

Have a nice day

cause Not connected

all is default config
but alert 'Not connected' every time
succeeded always false

os: win10 x64 jdk1.8.0_181

rabbitClient.queueDeclare bug??

java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:843) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:60) ~[obdpps-0.0.1-fat.jar:?]
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$queueDeclare$15(RabbitMQClientImpl.java:288) ~[obdpps-0.0.1-fat.jar:?]
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.lambda$forChannel$22(RabbitMQClientImpl.java:386) ~[obdpps-0.0.1-fat.jar:?]
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:278) ~[obdpps-0.0.1-fat.jar:?]
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:80) ~[obdpps-0.0.1-fat.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:360) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:225) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[obdpps-0.0.1-fat.jar:?]
... 9 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_144]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_144]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_144]
at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[?:1.8.0_144]
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[?:1.8.0_144]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:94) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:138) ~[obdpps-0.0.1-fat.jar:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541) ~[obdpps-0.0.1-fat.jar:?]
... 1 more

The following method throw exception above, but sometimes execute successfully:

public static Future rabbitMQClient(Vertx vertx, String queueName){
Future result = Future.future();

    if(ObjectUtils.equals(rabbitClient, null)) {

        final JsonObject config = vertx.getOrCreateContext().config().getJsonObject("rabbitmq");
        logger.trace("rabbitmq config: " + config.encodePrettily());

        RabbitMQOptions options = new RabbitMQOptions();
        options.setUser(config.getString("user", "guest"));
        options.setPassword(config.getString("password", "guest"));
        options.setHost(config.getString("host", "localhost"));
        options.setPort(config.getInteger("port", 5672));

        rabbitClient = RabbitMQClient.create(vertx, options);

        Future<Void> startFut = Future.future();
        rabbitClient.start(startRes -> {
            if (startRes.succeeded()) {
                logger.trace("rabbitmq client started successfully. isOpenChannel: " + rabbitClient.isOpenChannel()
                        + ", isConnected: " + rabbitClient.isConnected());
                startFut.complete();
            } else {
                logger.error("Failed starting rabbitmq client.", startRes.cause());
                startFut.fail(startRes.cause());
            }
        });
        startFut.compose(start -> {
            Future<Void> declareFut = Future.future();
            rabbitClient.queueDeclare(queueName, false, false, false, null, queueDeclareRes -> {
                if (queueDeclareRes.succeeded()) {
                    logger.info("queueName: " + queueName + " declare successfully.");
                    declareFut.complete();

                } else {
                    logger.error("Failed declare queueName: " + queueName, queueDeclareRes.cause());
                    declareFut.fail(queueDeclareRes.cause());

                }
            });
            return declareFut;
        }).compose(declare -> {
            rabbitClient.basicQos(1, basicQosRes -> {
                if (basicQosRes.succeeded()) {
                    logger.trace("set basic Qos=1 successfully.");
                    result.complete(rabbitClient);
                } else {
                    logger.error("Failed set basic Qos=1.", basicQosRes.cause());
                    result.fail(basicQosRes.cause());
                }
            });
        }, result);
    }
    else {
        result.complete(rabbitClient);
    }

    return result;
}

Only one channel per connection

As I can see from source code RabbitMQ client creates one channel on connection.
While common scenario is to create one channel for consumer.
Here is quote from RabbitMQ docs:

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

DataObject configuration

Currently the client configuration uses a JsonObject, use instead a Vert.x data object with the equivalent attributes.

No way to get connection status

vertx-rabbitmq-client doesn't use Autorecovery Connection feature of native java rabbitmq client.
And interface RabbitMQClient does't have method to get connection from RabbitMQClientImpl.
So there is no legal way to check connection != null or connection.isOpen() if connection to AMQP server was broken.

RabbitMQClient options

the client currently uses a JsonObject config parameter, we should wrap it with an ClientOptions to improve the usability

how to set AMQP Basic Properties from basicConsume method.

By default the code is picking up application/octet-stream as the content type is it possible to inject the properties when delivery is handled (for example application/json text/plain).
body i receive in event bus as : SGVsbG8gV29ybGQh (Hello World!)

Buffering messages possible?

I would like to buffer up to x messages until the next messages will be received.

I'm using basicConsume() to map the rabbitMQ messages to the vertx event bus but my vertx application is consuming thousands of messages which will get processed by some verticles after that. I would like to consume only a few messages from rabbitmq and process them. After a message got processed the next rabbitmq messages could be consumed (mapped to the event bus). I would like to prevent that so much messages get loaded into memory (ram) because this messages would get lost if the application stops or similar.

With the vertx-sqs plugin i could configure the buffer size but i couldn't find such configuration possibility with the vertx-rabbit-client plugin. How would you configure a buffer or build one?

Provide a reliable message publisher

A reliable publisher should:

  • Queue up messages internally until it can successfully call basicPublish.
  • Notify the caller using a robust ID (not delivery tag) when the message is confirmed by rabbit.
    This is a layer above the existing abstraction in vertx-rabbitmq-client, but it's such common boiler plate (and non-trivial) that adding to vertx-rabbitmq-client would be useful.

Note that I am aiming to guarantee at least once delivery, there will be failure modes that result in messages being delivered more than once.
Also, clients that don't need at least once delivery guarantees should not use this publisher.

Consume queue as a stream

motivation: currently queue are consumed via the event bus with the basicConsume method. It does not allow for unsubscription (tag is lost), and does not work well with message acknowledgement.

changes: provide a ReadStream for consuming RabbitMQ queues

Support basicQos(int prefetchCount, boolean global)

The client seems to missing the full Qos (Consumer Prefetch) functionality described here: https://www.rabbitmq.com/consumer-prefetch.html

Specifically it is missing the functionality to set whether the Qos setting should be applied globally or not.

I propose we add a method to match the underlying RabbitMQ client's method to RabbitMQClient:

Interface:

void basicQos(int prefetchCount, boolean global, Handler<AsyncResult<Void>> resultHandler);

Implementation:

  @Override
  public void basicQos(int prefetchCount, boolean global, Handler<AsyncResult<Void>> resultHandler) {
    forChannel(resultHandler, channel -> {
      channel.basicQos(prefetchCount, global);
      return null;
    });
  }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.