Giter Site home page Giter Site logo

rabbitmq / rabbitmq-java-client Goto Github PK

View Code? Open in Web Editor NEW
1.2K 112.0 569.0 67.77 MB

RabbitMQ Java client

Home Page: https://www.rabbitmq.com/java-client.html

License: Other

Makefile 0.06% Java 98.29% Python 1.19% Groovy 0.11% Erlang 0.04% Shell 0.30%
rabbitmq java

rabbitmq-java-client's Introduction

RabbitMQ Java Client

Maven Central Build Status

This repository contains source code of the RabbitMQ Java client. The client is maintained by the RabbitMQ team at Broadcom.

Dependency (Maven Artifact)

This client releases are independent of RabbitMQ server releases and can be used with RabbitMQ server 3.x. They require Java 8 or higher.

Stable

Maven

pom.xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.21.0</version>
</dependency>

Gradle

build.gradle
compile 'com.rabbitmq:amqp-client:5.21.0'

Snapshots

Maven

pom.xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.22.0-SNAPSHOT</version>
</dependency>

Snapshots are available on the Sonatype OSS snapshot repository:

pom.xml
<repositories>
  <repository>
    <id>ossrh</id>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
    <releases>
      <enabled>false</enabled>
    </releases>
  </repository>
</repositories>

Gradle

build.gradle
compile 'com.rabbitmq:amqp-client:5.22.0-SNAPSHOT'

Snapshots are available on the Sonatype OSS snapshot repository:

build.gradle
repositories {
  maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
  mavenCentral()
}

4.x Series

As of 1 January 2021 the 4.x branch is no longer supported.

Experimenting with JShell

You can experiment with the client from JShell. This requires Java 9 or more.

git clone https://github.com/rabbitmq/rabbitmq-java-client.git
cd rabbitmq-java-client
./mvnw test-compile jshell:run
...
import com.rabbitmq.client.*
ConnectionFactory cf = new ConnectionFactory()
Connection c = cf.newConnection()
...
c.close()
/exit

Building from Source

Getting the Project and its Dependencies

git clone [email protected]:rabbitmq/rabbitmq-java-client.git
cd rabbitmq-java-client
make deps

Building the JAR File

./mvnw clean package -Dmaven.test.skip

Launching Tests with the Broker Running in a Docker Container

Run the broker:

docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq

Launch "essential" tests (takes about 10 minutes):

./mvnw verify \
    -Drabbitmqctl.bin=DOCKER:rabbitmq \
    -Dit.test=ClientTestSuite,FunctionalTestSuite,ServerTestSuite

Launch a single test:

./mvnw verify \
    -Drabbitmqctl.bin=DOCKER:rabbitmq \
    -Dit.test=DeadLetterExchange

Launching Tests with a Local Broker

The tests can run against a local broker as well. The rabbitmqctl.bin system property must point to the rabbitmqctl program:

./mvnw verify \
       -Dtest-broker.A.nodename=rabbit@$(hostname) \
       -Drabbitmqctl.bin=/path/to/rabbitmqctl \
       -Dit.test=ClientTestSuite,FunctionalTestSuite,ServerTestSuite

To launch a single test:

./mvnw verify \
       -Dtest-broker.A.nodename=rabbit@$(hostname) \
       -Drabbitmqctl.bin=/path/to/rabbitmqctl \
       -Dit.test=DeadLetterExchange

Contributing

Versioning

This library uses semantic versioning.

Support

See the RabbitMQ Java libraries support page for the support timeline of this library.

License

This package, the RabbitMQ Java client library, is triple-licensed under the Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("AL").

This means that the user can consider the library to be licensed under any of the licenses from the list above. For example, you may choose the Apache Public License 2.0 and include this client into a commercial product. Projects that are licensed under the GPLv2 may choose GPLv2, and so on.

rabbitmq-java-client's People

Contributors

0x6e6562 avatar abousselmi avatar acogoluegnes avatar aschmolck avatar changlinli avatar dependabot[bot] avatar dhakimtrx avatar dimas avatar dpw avatar dumbbell avatar essiene avatar gustf avatar hogimn avatar jsoref avatar kjnilsson avatar lukebakken avatar michaelklishin avatar ogolberg avatar rabbitmq-ci avatar rade avatar reepong avatar rjbaucells avatar slayful avatar spring-operator avatar tonyg avatar tsotnikov avatar videlalvaro avatar vikinghawk avatar xuwei-k avatar yannrobert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-java-client's Issues

WorkPool possible memory leak in Set<K> unlimited

Short story: unregisterKey(K key) and unregisterAllKeys() does not unregister the key from Set<K> unlimited, which seems to result in leaks over time.

Longer story:

In our (Spring) application, we dynamically spin up and spin down queues and SimpleMessageListenerContainers as needed. I can describe the workflow in more detail if anyone really cares. After a day or two of decent load, our memory usage tends to grow very large. Analyzing a heap dump showed that the majority of memory was being used by a few hundred thousand ShutdownSignalExceptions and ChannelNs and various other related objects that still had references, all of which led back to this one Set in the WorkPool class.

It seems like a simple solution would be to add two lines which also call unlimited.remove, and I can create a Pull Request if desired, but I'm hoping that someone more familiar with the code can determine if this would cause any problems.

Thanks for all you folks do on this project. It's been fun implementing it into my system.

Link to feasible patch: https://gist.github.com/xathien/ec9ca35224868a524873

Add JMX monitoring

It would be very useful to get at least some basic statistics published via JMX. E.g. number of messages published, consumed, rejected, number of threads used etc.

Make connection handshake timeout configurable

Currently I'm trying to connect to RabbitMQ cluster behind HAProxy (also tried with Ngninx 1.9 over TCP) from a Java Client and it takes around 7 seconds to establish the connection.

I noticed that in com.rabbitmq.client.impl.AMQConnection connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2) is hard-coded to 5 seconds. So, I'm getting Timeout Exception.

I was seeing the code and HANDSHAKE_TIMEOUT is configured to 10 seconds.

public static final int HANDSHAKE_TIMEOUT = 10000;

And in the code below connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2) the reply is waiting for 5 seconds causing the timeout exception.

    AMQP.Connection.Start connStart = null;
    AMQP.Connection.Tune connTune = null;
    try {
        connStart =
                (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();

        _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

The stack trace exception is

java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:621)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:648)
    at Consumer.main(Consumer.java:39)

I'm using amqp-client 3.5.4 jar. A snippet of the code is:

ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("user");
            factory.setPassword("******");
            factory.setRequestedHeartbeat(58);
            factory.setConnectionTimeout(60000);
            factory.setHost("my.domain.com");
            factory.setPort(5670);
            connection = factory.newConnection();

It seems if HANDSHAKE_TIMEOUT could be configured programmatically in the ConnectionFactory parameters it should be enough to fix this problem.

nextDelivery method returning null

The nextDelivery method is returning null all the time after my program run about an hour every time.I don't know what happened.I am just think that the LinkedBlockingQueue is empty after some time and not filled with another.
My program is running with three threads in a thread pool and sharing a connection but the channel is independent. I am waiting for your response please.

Connection thread dies

Hi,
We have an issue where a thread seems to die, sometimes. We create a channel in a static method:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Util.getServerProperty("messagequeue.server"));
factory.setUsername(Util.getProperty("username",MESSAGEQUEUE_PROPERTIES));
factory.setPassword(Util.getProperty("password",MESSAGEQUEUE_PROPERTIES));
factory.setVirtualHost(Util.getProperty("virtualhost",MESSAGEQUEUE_PROPERTIES));
connection = factory.newConnection();
channel = connection.createChannel();

And then publish something:

channel.basicPublish(exchangeName, routingKey, mandatory,basicProperties,message.getBytes("UTF-8"));

At a certain point in time, that basicPublish fails to return (since logging is present after that point and that does not show up). It gets called many times and does not return. There are no exceptions logged by the RabbitMQ client. The only thing I can see is that the AMQP thread died (because jstacks were made during that time). And I have no idea how that can happen without closing the channel (since it will always call _frameHandler.close(); ).

This is happening on a few servers, 3 of 20. The others are fine. The servers were busy at the time (but nothing extreme).

We are using rabbitmq-client.jar from 3.1.1. I have not found a way to reproduce it.

Needless to say, a restart from the application solves any issues.

Revamp Channel docs

Channel's JavaDocs also haven't seen an update in a while, link to stuff that is no longer relevant and don't link to the tutorials and API guide.

com.rabbitmq.client.RpcClient incorrectly deals with wrong/missing correlationId

Using version 3.0.1:

In com.rabbitmq.client.RpcClient, around line 197, there's this function:

            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                synchronized (_continuationMap) {
                    String replyId = properties.getCorrelationId();
                    BlockingCell<Object> blocker = _continuationMap.get(replyId);
                    _continuationMap.remove(replyId);
                    blocker.set(body);                    
                }
            }

The issue is, 'blocker' might be null - if we get a reply to an old request that we no longer know about. The result is blocker.set(body) throws a NullPointerException and the message is then marked for retry and goes back in the queue and it creates an infinite loop.

We fixed this by replacing:

                    blocker.set(body);

with:

                    if (blocker != null) {
                        blocker.set(body);
                    } else {
                        logger.warn("blocking cell was null, ignoring response for it");
                    }

To reproduce - just resend any old jsonrpc reply message that was created by an earlier client and you'll see it NPE.

In our testing, we ran into this when we had a call that was issued once every 15 seconds or so, and took a few seconds to complete. If we restarted the client while the server was still processing a message, we'd end up with that stray reply stuck in the queue - and the new client endlessing NPEing in a loop.

Automatic recovery can leave auto-delete queues info behind

We have a long-running application that takes in HTTP requests and proxies those requests to a worker application via messages over a RabbitMQ broker.

I updated the RabbitMQ java client to 3.3.0 so we can utilize the auto recovering functionality. It works, well, however, I've seen memory usage steadily climb. Analyzing the heap, I see quite a few objects associated with the recovery classes hanging around in memory, and they look to be the culprit of the increasing memory usage. For example, I see 53,143 instances of ConsumerDispatcher, AutorecoveringChannel, RecordedQueue, RecoveryAwareChannelN, and 53,140 AMQImpl$Channel$Close and ShutdownSignalException. Skimming over the text of those exceptions, they seem to be clean shutdowns. All of these instances are held in hash maps that all end up in the autorecovering connection object, which in my example is sitting at 247.3MB of usage.

This app uses RPC-style requests. A temporary queue is created, along with a channel. A message is sent to some queue, and the application consumes via that channel until a message is received or a timeout occurs. In both cases, the consuming is canceled and the channel is closed. At the time the heap dump was taken, there were 3 active requests, which account for the difference in the instance counts.

I'm curious why all of those class instances are hanging around. I'm not sure if it's a bug with the client or our own code. There should be some way to clear out these old instances. Any insight would be appreciated.

AutorecoveringChannel does not re-register ShutdownListeners

When an AutorecoveringChannel performs its automaticallyRecover task, it loses the ShutdownListener's that were previously registered with it.

This is because it's private RecoveryAwareChannelN delegate is reset to a new RecoveryAwareChannelN and the listeners previously associated with the old delegate are now lost.

I'm not sure if this is intentional, but in my opinion this is not expected behavior. Given an AutorecoveringChannel in my hand with a few ShutdownListeners that I registered, when I call automaticallyRecover() on it then I would expect that the listeners that I had registered before are still there.

A little context on what I'm trying to do:
I create one (auto-recovering) connection with two channels for two consumers.

I have a ShutdownListener (with some logging and recovery) registered on the channels for when an occasional exception bubbles up.

Every once in a while I have a network failure that kills my channels and connection. When auto-recovery kicks in, it creates two new channels (internally, as a private delegate of the AutoRecoveringChannel) and my listeners are lost.

So, essentially what ends up happening is my ShutdownListeners are only called back up until the first network failure. After that, they are lost and I do not get called back.

Match pika implementation of URI parsing.

As detailed here, there is a difference in how this library and pika interpret the same URI. The pika solution seems cleaner, and I wanted to check for interest in my forming a pull request.

If the URI is

CLOUDAMQP_URL=amqp://guest:guest@localhost:5672/

and the connection is established with

factory.setUri(System.getenv("CLOUDAMQP_URL"));

As things stand, this results in a vhost of "" and a connection error in a default rabbitmq setup (which has a default vhost of /). Is there interest in a pull request to return a vhost of / instead of "", to make the default behavior match the default rabbitmq setup?

Migrate build system away from Ant

Ant has taken us quite far but it's showing its age. I feel we should follow what most modern (and very likely future) JVM-based projects at Pivotal use: Gradle. We'll be able to piggyback on the existing Spring team infrastructure and have nightly snapshots published to Bintray, Spring repositories. Releases also can then easily be published to the two mentioned above plus Maven Central.

Finally, Gradle is just a nicer and more widely used tool. Good idea for making it easier to contribute.

Client recovery not working

I have 2 node cluster and I am currently performing recovery test. I have network interval set to 1 sec, but actual recovery is taking around 1 mt.

    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(1000);

Steps

  1. Create a mirror queue
  2. Start producing and consuming
  3. Shutdown one of the rabbitmq-server
  4. Look at the logs. Logs show following message for about 1 mt and then recovers:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)

Exception doesn't hint at the root cause when virtual host is missing

Here is some code that reproduces the issue:

package com.bluesoftdev.bug.rmq

import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory

class RMQBadVHostNameConnectionError {

    static String host = 'localhost'
    static String username = 'cruise'
    static String password = 'cru1s3'
    static String vhost = '/cruise'

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        factory.setHost(host);
        Connection conn = factory.newConnection();
        conn.close();
    }
}

the virutal host named in the "vhost" variable does not exist on my local RMQ.

Here is the useless error message that was returned:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:648)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:678)
    at com.rabbitmq.client.ConnectionFactory$newConnection$3.call(Unknown Source)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
    at com.bluesoftdev.bug.rmq.RMQBadVHostNameConnectionError.main(RMQBadVHostNameConnectionError.groovy:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 13 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    at java.lang.Thread.run(Thread.java:745)

There is no indication that the error was caused by the bad virtual host name.

Stuck test suite & Jenkins' inability to recover

Note: I file the issue here because the Java client is involved in the stuck test and I don't know yet what's going on, but I don't have the time to study this right now.

The culprit is a timed out or aborted Jenkins build: Jenkins is unable to kill all involved processes (and doesn't notice the problem). Then it starts new builds which try to "lock" the node, fail to do so, try again forever, eventually consuming all their stack frames and segfault.

An example is this aborted build:
http://rabbit-ci.lon.pivotallabs.com:8080/job/RabbitMQ%20Server/3734/

Followed by this build which segfaults:
http://rabbit-ci.lon.pivotallabs.com:8080/job/xref%20%28plugins%20individually%29/4019/

Here are the running stuck processes on the Jenkins slave:

jenkins  20497  0.0  0.0  12880  2688 ?        S    May19   1:46 /opt/erlang/r16b03/lib/erlang/erts-5.10.4/bin/epmd -daemon
jenkins  23906  0.0  0.0   4176   580 ?        S    05:44   0:00 sh -c make run-background-node > /tmp/rabbitmq-hare-mnesia/startup_log 2> /tmp/rabbitmq-hare-mnesia/startup_err
jenkins  23909  0.0  0.0  10640  1824 ?        S    05:44   0:00 make run-background-node
jenkins  24097  0.0  0.0   4176   580 ?        S    05:44   0:00 /bin/sh -c RABBITMQ_NODE_IP_ADDRESS="0.0.0.0" RABBITMQ_NODE_PORT="5673" RABBITMQ_LOG_BASE="/tmp" RABBITMQ_MNESIA_DIR="/tmp/rabbitmq-hare-mnesia" RABBITMQ_PLUGINS_EXPAND_DIR="/tmp/rabbitmq-hare-plugins-scratch" \ ?RABBITMQ_NODE_ONLY=true \ ?RABBITMQ_SERVER_START_ARGS="-rabbit ssl_listeners [{\"0.0.0.0\",5670}] -rabbit ssl_options [{cacertfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/testca/cacert.pem\"},{certfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/cert.pem\"},{keyfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/key.pem\"},{verify_code,1}] -rabbit auth_mechanisms ['PLAIN','AMQPLAIN','EXTERNAL','RABBIT-CR-DEMO']" \ ?./scripts/rabbitmq-server
jenkins  24098  0.6  1.0 192300 41704 ?        Sl   05:44   2:16 /opt/erlang/r16b03/lib/erlang/erts-5.10.4/bin/beam.smp -W w -K true -A30 -P 1048576 -- -root /opt/erlang/r16b03/lib/erlang -progname erl -- -home /var/lib/jenkins -- -pa ./scripts/../ebin -noshell -noinput -sname hare -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"0.0.0.0",5673}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/tmp/hare.log"} -rabbit sasl_error_logger {file,"/tmp/hare-sasl.log"} -rabbit enabled_plugins_file "/does-not-exist" -rabbit plugins_dir "./scripts/../plugins" -rabbit plugins_expand_dir "/tmp/rabbitmq-hare-plugins-scratch" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/tmp/rabbitmq-hare-mnesia" -rabbit ssl_listeners [{"0.0.0.0",5670}] -rabbit ssl_options [{cacertfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/testca/cacert.pem"},{certfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/cert.pem"},{keyfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/key.pem"},{verify_code,1}] -rabbit auth_mechanisms ['PLAIN','AMQPLAIN','EXTERNAL','RABBIT-CR-DEMO'] -kernel inet_dist_listen_min 25673 -kernel inet_dist_listen_max 25673
root     24250  0.1  2.0 178356 82980 ?        Ss   02:30   0:44 /usr/bin/ruby1.8 /usr/bin/puppet agent
jenkins  24337  0.0  0.0  10796   368 ?        Ss   05:44   0:00 inet_gethost 4
jenkins  24338  0.0  0.0  17100   804 ?        S    05:44   0:00 inet_gethost 4
root     24363  0.0  0.0  71260  3528 ?        Ss   05:45   0:00 sshd: jenkins [priv]
jenkins  24365  0.0  0.0  71916  2232 ?        S    05:45   0:01 sshd: jenkins@notty
jenkins  24387  0.0  0.0  10752  1212 ?        Ss   05:45   0:00 bash -c cd "/var/lib/jenkins" && java  -jar slave.jar
jenkins  24388  0.0  2.0 1568768 83180 ?       Sl   05:45   0:15 java -jar slave.jar
jenkins  25757  0.0  0.0   4180   576 ?        S    05:37   0:00 /bin/sh -xe /tmp/hudson8511670185586124855.sh
jenkins  25759  0.0  0.0   4180   684 ?        S    05:37   0:00 /bin/sh -e /usr/local/bin/run-server-tests
jenkins  25766  0.0  0.0   4180   432 ?        S    05:37   0:00 /bin/sh -e /usr/local/bin/run-server-tests
jenkins  26320  0.0  0.0  10368  1336 ?        S    05:39   0:00 make COVER=false all
ntp      26452  0.0  0.0  34864  2100 ?        Ss   Apr13   2:49 /usr/sbin/ntpd -p /var/run/ntpd.pid -g -c /var/lib/ntp/ntp.conf.dhcp -u 107:114
jenkins  26537  0.0  0.0   4176   576 ?        S    05:39   0:00 /bin/sh -c OK=true && \ make prepare && \ { make -C ../rabbitmq-server run-tests || { OK=false; echo '\n============' '\nTESTS FAILED' '\n============\n'; } } && \ { make run-qpid-testsuite || { OK=false; echo '\n============' '\nTESTS FAILED' '\n============\n'; } } && \ { ( cd ../rabbitmq-java-client && MAKE=make ant test-suite ) || { OK=false; echo '\n============' '\nTESTS FAILED' '\n============\n'; } } && \ make cleanup && { $OK || echo '\n============' '\nTESTS FAILED' '\n============\n'; } && $OK
daemon   26683  0.0  0.0  16672   152 ?        Ss   Mar31   0:00 /usr/sbin/atd
root     26796  0.0  0.0  49932  1236 ?        Ss   Mar31   0:00 /usr/sbin/sshd
root     26798  0.0  0.0      0     0 ?        S    May18   2:21 [kworker/1:1]
jenkins  29828  0.0  0.0   4176   572 ?        S    05:40   0:00 sh -c make run-background-node > /tmp/rabbitmq-rabbit-mnesia/startup_log 2> /tmp/rabbitmq-rabbit-mnesia/startup_err
jenkins  29831  0.0  0.0  10640  1824 ?        S    05:40   0:00 make run-background-node
jenkins  30019  0.0  0.0   4176   576 ?        S    05:40   0:00 /bin/sh -c RABBITMQ_NODE_IP_ADDRESS="0.0.0.0" RABBITMQ_NODE_PORT="5672" RABBITMQ_LOG_BASE="/tmp" RABBITMQ_MNESIA_DIR="/tmp/rabbitmq-rabbit-mnesia" RABBITMQ_PLUGINS_EXPAND_DIR="/tmp/rabbitmq-rabbit-plugins-scratch" \ ?RABBITMQ_NODE_ONLY=true \ ?RABBITMQ_SERVER_START_ARGS="-rabbit ssl_listeners [{\"0.0.0.0\",5671}] -rabbit ssl_options [{cacertfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/testca/cacert.pem\"},{certfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/cert.pem\"},{keyfile,\"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/key.pem\"},{verify_code,1}] -rabbit auth_mechanisms ['PLAIN','AMQPLAIN','EXTERNAL','RABBIT-CR-DEMO']" \ ?./scripts/rabbitmq-server
jenkins  30020  0.7  1.1 261436 45724 ?        Sl   05:40   2:34 /opt/erlang/r16b03/lib/erlang/erts-5.10.4/bin/beam.smp -W w -K true -A30 -P 1048576 -- -root /opt/erlang/r16b03/lib/erlang -progname erl -- -home /var/lib/jenkins -- -pa ./scripts/../ebin -noshell -noinput -sname rabbit@rabbit-ci-slave1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"0.0.0.0",5672}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/tmp/[email protected]"} -rabbit sasl_error_logger {file,"/tmp/[email protected]"} -rabbit enabled_plugins_file "/does-not-exist" -rabbit plugins_dir "./scripts/../plugins" -rabbit plugins_expand_dir "/tmp/rabbitmq-rabbit-plugins-scratch" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/tmp/rabbitmq-rabbit-mnesia" -rabbit ssl_listeners [{"0.0.0.0",5671}] -rabbit ssl_options [{cacertfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/testca/cacert.pem"},{certfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/cert.pem"},{keyfile,"/tmp/test/rabbitmq-public-umbrella/rabbitmq-test/certs/server/key.pem"},{verify_code,1}] -rabbit auth_mechanisms ['PLAIN','AMQPLAIN','EXTERNAL','RABBIT-CR-DEMO'] -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672
jenkins  30179  0.0  0.0  10796   364 ?        Ss   05:40   0:00 inet_gethost 4
jenkins  30180  0.0  0.0  17100   812 ?        S    05:40   0:00 inet_gethost 4
jenkins  30942  0.1  3.6 1318924 147060 ?      Sl   05:42   0:34 /opt/java/latest/bin/java -classpath /usr/share/ant/lib/ant-launcher.jar:/usr/share/java/xmlParserAPIs.jar:/usr/share/java/xercesImpl.jar -Dant.home=/usr/share/ant -Dant.library.dir=/usr/share/ant/lib org.apache.tools.ant.launch.Launcher -cp  test-suite
jenkins  31312  0.0  0.8 1311756 35004 ?       Sl   05:42   0:10 /opt/java/jdk1.6.0_26/jre/bin/java -classpath /tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/lib/commons-cli-1.1.jar:/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/lib/commons-io-1.2.jar:/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/lib/junit.jar:/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/build/classes:/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/build/test/classes:/usr/share/ant/lib/junit.jar:/usr/share/java/ant-launcher-1.8.2.jar:/usr/share/ant/lib/ant.jar:/usr/share/ant/lib/ant-junit.jar:/usr/share/ant/lib/ant-junit4.jar org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner com.rabbitmq.client.test.server.ServerTests filtertrace=true haltOnError=false haltOnFailure=false formatter=org.apache.tools.ant.taskdefs.optional.junit.OutErrSummaryJUnitResultFormatter showoutput=false outputtoformatters=true logfailedtests=true logtestlistenerevents=false formatter=org.apache.tools.ant.taskdefs.optional.junit.PlainJUnitResultFormatter,/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/build/TEST-com.rabbitmq.client.test.server.ServerTests.txt formatter=org.apache.tools.ant.taskdefs.optional.junit.XMLJUnitResultFormatter,/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/build/TEST-com.rabbitmq.client.test.server.ServerTests.xml crashfile=/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/junitvmwatcher7096656096857623053.properties propsfile=/tmp/test/rabbitmq-public-umbrella/rabbitmq-java-client/junit6938551034434136649.properties

ConnectionFactory.newConnection() fails in Spring ApplicationContext

I have a weird issue to report. If I create a org.rabbitmq.client.ConnectionFactory in Java like so:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");

I'm able to create connections with newConnection() just fine.

However, if I create this instance in a Spring ApplicationContext like so:

<?xml version="1.0"?>
<beans ...>
    <bean class="org.rabbitmq.client.ConnectionFactory"/>
</beans>

then

ApplicationContext applicationContext = new ClasspathXmlApplicationContext("applicationContext.xml");
ConnectionFactory connectionFactory = applicationContext.getBean(ConnectionFactory.class);
connectionFactory.setHost("localhost");

If I try to call newConnection() with this ConnectionFactory created by Spring, then I get a weird exception:

Caused by: java.lang.IllegalArgumentException: invalid value in table
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
    at com.rabbitmq.client.impl.MethodArgumentWriter.writeTable(MethodArgumentWriter.java:139)
    at com.rabbitmq.client.impl.AMQImpl$Connection$StartOk.writeArgumentsTo(AMQImpl.java:161)
    at com.rabbitmq.client.impl.Method.toFrame(Method.java:83)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:102)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:298)
    at com.rabbitmq.client.impl.AMQChannel.quiescingRpc(AMQChannel.java:233)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:224)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:340)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
    at org.tkassembled.rabbitmq.publisher.Application.main(Application.java:47)

I've debugged my code and found that the _clientProperties variable in ConnectionFactory is the only thing that differs from the non-Spring instance and the Spring instance.

Non-Spring:

{product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Java, capabilities={exchange_exchange_bindings=true, consumer_cancel_notify=true, basic.nack=true, publisher_confirms=true}, copyright=Copyright (C) 2007-2012 VMware, Inc., version=2.8.4}

Spring:

{environment=StandardEnvironment {activeProfiles=[], defaultProfiles=[default], propertySources=[systemProperties,systemEnvironment]}, systemProperties={java.runtime.name=OpenJDK Runtime Environment, sun.boot.library.path=/usr/lib/jvm/java-6-openjdk/jre/lib/amd64, java.vm.version=20.0-b12, java.vm.vendor=Sun Microsystems Inc., java.vendor.url=http://java.sun.com/, path.separator=:, java.vm.name=OpenJDK 64-Bit Server VM, file.encoding.pkg=sun.io, sun.java.launcher=SUN_STANDARD, user.country=US, sun.os.patch.level=unknown, java.vm.specification.name=Java Virtual Machine Specification, user.dir=/home/rfkrocktk/Documents/Projects/Work/rabbitmq-demo-publisher, java.runtime.version=1.6.0_24-b24, java.awt.graphicsenv=sun.awt.X11GraphicsEnvironment, java.endorsed.dirs=/usr/lib/jvm/java-6-openjdk/jre/lib/endorsed, os.arch=amd64, java.io.tmpdir=/tmp, line.separator=
, java.vm.specification.vendor=Sun Microsystems Inc., os.name=Linux, sun.jnu.encoding=UTF-8, java.library.path=/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server:/usr/lib/jvm/java-6-openjdk/jre/lib/amd64:/usr/lib/jvm/java-6-openjdk/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib, java.specification.name=Java Platform API Specification, java.class.version=50.0, sun.management.compiler=HotSpot 64-Bit Tiered Compilers, os.version=3.0.0-22-generic, user.home=/home/rfkrocktk, user.timezone=America/Los_Angeles, java.awt.printerjob=sun.print.PSPrinterJob, file.encoding=UTF-8, java.specification.version=1.6, java.class.path=/home/rfkrocktk/Documents/Projects/Work/rabbitmq-demo-publisher/target/classes:/home/rfkrocktk/.m2/repository/com/rabbitmq/amqp-client/2.8.4/amqp-client-2.8.4.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-beans/3.1.2.RELEASE/spring-beans-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-context/3.1.2.RELEASE/spring-context-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-aop/3.1.2.RELEASE/spring-aop-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-expression/3.1.2.RELEASE/spring-expression-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-asm/3.1.2.RELEASE/spring-asm-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-context-support/3.1.2.RELEASE/spring-context-support-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/org/springframework/spring-core/3.1.2.RELEASE/spring-core-3.1.2.RELEASE.jar:/home/rfkrocktk/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/home/rfkrocktk/.m2/repository/org/slf4j/slf4j-api/1.6.5/slf4j-api-1.6.5.jar:/home/rfkrocktk/.m2/repository/org/slf4j/jul-to-slf4j/1.6.5/jul-to-slf4j-1.6.5.jar:/home/rfkrocktk/.m2/repository/org/slf4j/jcl-over-slf4j/1.6.5/jcl-over-slf4j-1.6.5.jar:/home/rfkrocktk/.m2/repository/ch/qos/logback/logback-classic/1.0.4/logback-classic-1.0.4.jar:/home/rfkrocktk/.m2/repository/ch/qos/logback/logback-core/1.0.4/logback-core-1.0.4.jar, user.name=rfkrocktk, java.vm.specification.version=1.0, sun.java.command=org.tkassembled.rabbitmq.publisher.Application, java.home=/usr/lib/jvm/java-6-openjdk/jre, sun.arch.data.model=64, user.language=en, java.specification.vendor=Sun Microsystems Inc., java.vm.info=mixed mode, java.version=1.6.0_24, java.ext.dirs=/usr/lib/jvm/java-6-openjdk/jre/lib/ext:/usr/java/packages/lib/ext, sun.boot.class.path=/usr/lib/jvm/java-6-openjdk/jre/lib/resources.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/rt.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/jsse.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/jce.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/charsets.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/netx.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/plugin.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/rhino.jar:/usr/lib/jvm/java-6-openjdk/jre/lib/modules/jdk.boot.jar:/usr/lib/jvm/java-6-openjdk/jre/classes, java.vendor=Sun Microsystems Inc., file.separator=/, java.vendor.url.bug=http://java.sun.com/cgi-bin/bugreport.cgi, sun.io.unicode.encoding=UnicodeLittle, sun.cpu.endian=little, sun.desktop=gnome, sun.cpu.isalist=}, systemEnvironment={LIBVA_DRIVER_NAME=xvba, LIBVA_DRIVERS_PATH=/usr/lib/va/drivers, GTK_MODULES=canberra-gtk-module:canberra-gtk-module, GNOME_KEYRING_CONTROL=/tmp/keyring-FVomTs, XDG_SESSION_PATH=/org/freedesktop/DisplayManager/Session7, SSH_AGENT_PID=14768, SESSION_MANAGER=local/work-macbook:@/tmp/.ICE-unix/14732,unix/work-macbook:/tmp/.ICE-unix/14732, GNOME_DESKTOP_SESSION_ID=this-is-deprecated, COMPIZ_CONFIG_PROFILE=ubuntu, XDG_SESSION_COOKIE=1caa5669b1efc2337b52f8b40000001c-1342060648.248511-507012802, GDMSESSION=ubuntu, MANDATORY_PATH=/usr/share/gconf/ubuntu.mandatory.path, XDG_DATA_DIRS=/usr/share/ubuntu:/usr/share/gnome:/usr/local/share/:/usr/share/, PWD=/home/rfkrocktk, GIO_LAUNCHED_DESKTOP_FILE=/home/rfkrocktk/.local/share/applications/alacarte-made.desktop, LOGNAME=rfkrocktk, GPG_AGENT_INFO=/tmp/keyring-FVomTs/gpg:0:1, GIO_LAUNCHED_DESKTOP_FILE_PID=9244, SSH_AUTH_SOCK=/tmp/keyring-FVomTs/ssh, LD_LIBRARY_PATH=/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server:/usr/lib/jvm/java-6-openjdk/jre/lib/amd64:/usr/lib/jvm/java-6-openjdk/jre/../lib/amd64, SHELL=/bin/bash, LIBGL_DRIVERS_PATH=/usr/lib/fglrx/dri:/usr/lib32/fglrx/dri, DBUS_SESSION_BUS_ADDRESS=unix:abstract=/tmp/dbus-nVK2hmDwIH,guid=52b6ec40d3390d2ad6e6e482000630c6, GNOME_KEYRING_PID=14723, IBUS_NO_SNOOPER_APPS=synapse, PATH=/usr/lib/lightdm/lightdm:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games, XDG_CONFIG_DIRS=/etc/xdg/xdg-ubuntu:/etc/xdg, XDG_CURRENT_DESKTOP=Unity, DESKTOP_SESSION=ubuntu, DISPLAY=:2, USER=rfkrocktk, UBUNTU_MENUPROXY=libappmenu.so, HOME=/home/rfkrocktk, XAUTHORITY=/home/rfkrocktk/.Xauthority, DEFAULTS_PATH=/usr/share/gconf/ubuntu.default.path, XDG_SEAT_PATH=/org/freedesktop/DisplayManager/Seat0, USERNAME=rfkrocktk, LANG=en_US.UTF-8}, messageSource=org.springframework.context.support.DelegatingMessageSource@55d7fc31, applicationEventMulticaster=org.springframework.context.event.SimpleApplicationEventMulticaster@56ee20fe}

Both of these instances are created in the same JVM run and classpath, so I have no idea why the properties would vary.

You can download my Maven project that demonstrates this bug here: http://ge.tt/1tQ1xaK/v/0?c

Switch to NIO[2]

Reported and discussed in some depth on rabbitmq-discuss.

Internal bug tracker: #23327.

The only solution is switching to NIO. NIO would yield little improvement for the client the way it works today but worth moving to for this issue.

Upgrade JUnit to 4.x

We've been on JUnit 3 for over 7 years. It's time to move to the latest and greatest.

jsonrpc calls do not escape single quotes properly, causes some jsonrpc calls to hang

Using version 3.0.1. While using com.rabbitmq.client.RpcClient it appears there is a json encoding bug in com.rabbitmq.tools.json.JSONReader.

To reproduce, try an RPC call to a method like this:

 public String test123() { return "test'n"; }

Per our tests, the call hangs indefinitely. It traces down to the following:

In com.rabbitmq.tools.json.JSONReader - on line 90 it reads:

if (c == '"' || c == '\'') {

Per 42a7281 It looks like this was added as a fix so strings could be surrounded by single or double quotes. However, this incidentally makes it so unescaped single quotes inside a double quoted string are no longer handled properly.

The result is that the JSONReader goes into an infinite loop while trying to parse. On line 132:

while (token != OBJECT_END) {

This remains true, because it never gets a proper end, because the escaping is thrown off.

Possible solutions:

  • One simple change would be to modify com.rabbitmq.tools.json.JSONWriter so that it escapes single quotes with a backslash. This doesn't solve the fact that the json parsing is broken, but would at least work around this particular issue.
  • Fixing the json parsing code so it's actually (or at least more so) json compliant is of course more work but would handle more edge cases.

As an overall suggestion, I would recommend not writing your own json parsing code, and instead using something like http://code.google.com/p/json-smart/ It's simple to use, and would get rid of these kinds of edge case headaches on this. RabbitMQ already has it's own code base to worry about, not really much benefit to implementing json parsing like this. You might even be able to copy and paste out of it to bundle a minimal version, assuming the license is compatible (not sure on that).

ConnectionFactory createConnection with automatic recovery returns Connection type

Creating a connection through the connection factory with automatic recovery enabled returns the correct object (AutorecoveringConnection), but since the return type of the function is Connection, the methods in AutorecoveringConnection are unavailable without casting the return value, which isn't natural. In fact, most users are probably unaware that such a class even exists since it doesn't appear in website's API guide anywhere.

I don't know exactly how many differences there are, but I was specifically looking for the addRecoveryListener method. To not complicate things too much, a simple solution could be to expose the method in the Connection interface, but only implement it in AutorecoveringConnection.

Shut shutdown executor service correctly

We didn't cover this in #87. One sensible strategy would be to count open connections and shut down the service when it reaches zero, while starting it when it goes to a positive number.

Potential Out of Memory in `QueueingConsumer`

The QueueingConsumer adds all received messages to the _queue buffer.

When messages are received faster from the channel than someone processes them and thereby removes them from the _queue the _queue will grow and eat heap memory until out of memory occurs.

Confirms stop working if you send > 2^63-1 msgs

As title says, publisher confirms won't work anymore once the max value of a Long is exceeded. There won't be any exceptions, it will silently stop adding messages to the unconfirmedSet. Although this number is high, I think this issue should still be documented.

Client may hang during initial connection

There was an issue reported to Lyra that seems to have more to do with the rabbitmq-java-client. Basically the problem is that when rabbitmq connections are proxied through an AWS elastic load balancer, ELB might accept a TCP connection but not respond to the initial handshake which leaves the client hanging forever. ELB may even close the connection, but I believe the BlockingCell is just left waiting forever. Here's a call stack from a mocked up test that reproduces this scenario:

Thread [lyra-recovery-1] (Suspended)    
    waiting for: BlockingValueOrException<V,E>  (id=28) 
    Object.wait(long) line: not available [native method]   
    BlockingValueOrException<V,E>(Object).wait() line: 503  
    BlockingValueOrException<V,E>(BlockingCell<T>).get() line: 50   
    BlockingValueOrException<V,E>(BlockingCell<T>).uninterruptibleGet() line: 89    
    BlockingValueOrException<V,E>.uninterruptibleGetValue() line: 33    
    AMQChannel$SimpleBlockingRpcContinuation(AMQChannel$BlockingRpcContinuation<T>).getReply() line: 348    
    AMQConnection.start() line: 294 
    ConnectionFactory.newConnection(ExecutorService, Address[]) line: 603   
    ConnectionHandler$3.call() line: 243    
    ConnectionHandler$3.call() line: 236    
    ConnectionHandler(RetryableResource).callWithRetries(Callable<T>, RecurringPolicy<?>, RecurringStats, Set<Class<Exception>>, boolean, boolean) line: 51 
    ConnectionHandler.createConnection(RecurringPolicy<?>, Set<Class<Exception>>, boolean) line: 236    
    ConnectionHandler.recoverConnection() line: 271 
    ConnectionHandler.access$100(ConnectionHandler) line: 41    
    ConnectionHandler$ConnectionShutdownListener$1.run() line: 95   
    ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1110  
    ThreadPoolExecutor$Worker.run() line: 603   
    Thread.run() line: 722  

The first idea that comes to my mind is that everything that happens inside AMQConnection.start() should be covered by the connection timeout setting and/or an eventual connection closure should unblock the BlockingCell.

Discrepancy in method descriptions for socket configuration

The setSocketConfigurator() method on com.rabbitmq.client.ConnectionFactory suggests that the configuration is applied after the socket has been opened, while the com.rabbitmq.client.SocketConfigurator interface seems to suggest the configuration is applied before the socket has been opened.

connection read timeout will be ignored in AMQConnection

hi,
there may be a bug in com.rabbitmq.client.impl.AMQConnection#start()。
in this method,after MainLoop start,default read timeout is 10s.
if there is something wrong with the socket,_frameHandler.readFrame() while be null (line:515).then
in handleSocketTimeout method, because of _heartbeat == 0, timeout excepiton will be ignored ,and this method
will retry and retry for ever.
please modify the default _heartbeat not quals to zero, thank you all.

Deprecation warnings with Java 8 / OpenJDK

ant gave me the following snippets while building for jdk8.


    [javac] Note: /builddir/build/BUILD/rabbitmq-java-client-3.4.1/src/com/rabbitmq/client/impl/ChannelN.java uses or overrides a deprecated API.
    [javac] Note: Recompile with -Xlint:deprecation for details.
    [javac] Note: /builddir/build/BUILD/rabbitmq-java-client-3.4.1/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java uses unchecked or unsafe operations.
    [javac] Note: Recompile with -Xlint:unchecked for details.
    [javac] 16 warnings

test-build-param:
    [mkdir] Created dir: /builddir/build/BUILD/rabbitmq-java-client-3.4.1/build/test/classes
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/build.xml:210: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 156 source files to /builddir/build/BUILD/rabbitmq-java-client-3.4.1/build/test/classes
    [javac] warning: [options] bootstrap class path not set in conjunction with -source 1.5
    [javac] warning: [options] source value 1.5 is obsolete and will be removed in a future release
    [javac] warning: [options] target value 1.5 is obsolete and will be removed in a future release
    [javac] warning: [options] To suppress warnings about obsolete options, use -Xlint:-options.
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/ConfirmBase.java:45: warning: '_' used as an identifier
    [javac]                         } catch (InterruptedException _) {
    [javac]                                                       ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/ConfirmBase.java:57: warning: '_' used as an identifier
    [javac]         } catch (TimeoutException _) {
    [javac]                                   ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/functional/Confirm.java:241: warning: '_' used as an identifier
    [javac]         } catch (IllegalStateException _) {}
    [javac]                                        ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/functional/QueueSizeLimit.java:104: warning: '_' used as an identifier
    [javac]         } catch (InterruptedException _) { }
    [javac]                                       ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:258: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:265: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:271: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:277: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:283: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:289: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:295: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:301: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:308: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/client/test/server/Permissions.java:314: warning: '_' used as an identifier
    [javac]             public void with(String _) throws IOException {
    [javac]                                     ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] /builddir/build/BUILD/rabbitmq-java-client-3.4.1/test/src/com/rabbitmq/examples/TestMain.java:218: warning: '_' used as an identifier
    [javac]         } catch (InterruptedException _) { } // ignore
    [javac]                                       ^
    [javac]   (use of '_' as an identifier might not be supported in releases after Java SE 8)
    [javac] Note: Some input files use unchecked or unsafe operations.
    [javac] Note: Recompile with -Xlint:unchecked for details.
    [javac] 19 warnings

Please handle InterruptedExceptions correctly

Class com.rabbitmq.utility.BlockingCell has several while loops catching InterruptedExceptions and ignoring them. This makes it impossible to shutdown the app correctly; you have to kill it brutally. The correct handling of an InterruptedException in a while loop would be:

try {
  ...
} catch (InterruptedException e) {
  Thread.currentThread().interrupt();
  return;
}

There are probably other classes swallowing InterruptedExceptions as well; I'm just reporting the one I found while analyzing what causes our app to hang.

For background info on thread interruption, see e.g.: http://www.ibm.com/developerworks/library/j-jtp05236/

waitForConfirms may never be true on high traffic server

I am actually running into this issue right now and have worked around it for now by closing the channel and creating a new one, which clears the unconfirmedSet and resolves the issue for a while. (By the way, is there anyway to access that set for debugging purposes?) Imagine several threads going through a common object to send messages. The only way I've figured so far to know which message failed to send is to waitForConfirms after every publish. We pass it a timeout so we don't end up hanging forever for any reason. The problem we see is that waitForConfirms is timing out quite frequently, always during high traffic situations. When I looked at the implementation of that method, it looks like it waits for the entire set to be empty before returning. You're probably beginning to see the problem. If you have multiple threads sending messages and some of them waiting for their message to be confirmed, if there is always something in the set, then it just never happens. I read that publishing to the same channel, although not recommended, over multiple threads was okay because access to the channel was serialized.

I don't know whether this is a bug, design flaw, or user error. How can I check that a confirm was received for the message that was just sent instead of all messages? Or maybe my understanding of what waitForConfirms is supposed to be used for is wrong. I've included an example usage of how I am sending and then checking if it sent. Forgive me, my code is in scala, but it's almost java so I think you'll be okay.

val basicPublishTimeout: Long = 400

rabbitClient().basicPublish(exchange.getOrElse("amq.topic"), routingKey.getOrElse(""), props, message.get.getBytes("UTF-8"))
// lets us know if the message was successfully sent
if (!waitForConfirms(basicPublishTimeout)) {
    throw new IOException("Message was nack'd by broker and will be resent.")
}

/**
 * Waits for all sent messages to be acknowledged up to timeout. Throws TimeoutException if times out.
 * Returns true if all messages were acknowledged and false if any were explicitly not acknowledged.
 * @inheritdoc
 */
def waitForConfirms(timeout: Long = 0): Boolean = try
  if (timeout == 0) channel map { _.waitForConfirms() } get
  else channel map { _.waitForConfirms(timeout) } get
catch {
  case e: lang.IllegalStateException => logger.error("The channel is not set up to confirm messages.")
    false
}

QueueingConsumer.nextDelivery can hang forever, even if the underlying connection dies

When calling nextDelivery to grab the next message from QueueingConsumer, it is calling the blocking method on the queue, take(), which will block forever.

https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/com/rabbitmq/client/QueueingConsumer.java#L215

The problem occurs then if the underlying channel dies, then the thread that called nextDelivery will now be waiting forever.

The solution we had to in our code that uses this was to use the timeout version, and manually check if the channel had closed while we were waiting every time.

Possible solution: make both versions of nextDelivery use a timeout (and spin in the case of the no-timeout version), check if the channel died, and throw a ShutdownSignalException in that case.

Possible problem with autorecovery and thread safety

Hi,

In an application I'm working on, I ended up having two threads using the same channel, and one of the threads closed the channel while the other was issuing a basicGet.
I'm using the version 3.3.3 of the client.
It seems that the basicGet thread doesn't get the information that the close was intentional, and tries to recover.
I got a couple of these stacks:

Caught an exception when recovering topology Caught an exception while recovering queue edis.32: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering queue edis.32: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:461)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:422)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:48)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:345)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:231)
at com.rabbitmq.client.impl.recovery.RecordedQueue.recover(RecordedQueue.java:36)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:448)
... 7 more

Not possible to create pretty JSON from headers.

AMQP.BasicProperties.getHeaders() returns a Map<String, Object> where the strings are of type LongStringHelper.ByteArrayLongString that is a private internal class which makes it impossible to write custom serializers for this class. Using Google Gson for instance, always inserts a "bytes": key before the value. While it is easy to write a serializer that converts the resulting byte[], I'd like to have the string value only without the "bytes": key.

In short, how can I produce a regular Java string out of LongStringHelper.ByteArrayLongString?

Add logging to DefaultExceptionHandler.java

Hi All,

I am losing information about exceptions inside RabbitMQ Java Client because default error handle write it to STDERR (!) instead of logging it to SLF4F, log4j, etc. Without it I can't redirect errors to Syslog or Airbreak and lose them.

Could you please add logging here and here?

Thanks

Automatic recovery is not terminated by Connection.close

I have encountered a problem with clean tomcat shutdown when a single broker is shutdown before shutting down tomcat. The connection and channels are configured for auto recovery. If I startup tomcat (which connects to the broker and declares some exchanges/queues) then shutdown tomcat everything shuts down cleanly BUT if I stop the broker before shutting down then I get the following message:

SEVERE: The web application [/xyz] appears to have started a thread named [AMQP Connection127.0.0.1:5672] but has failed to stop it. This is very likely to create a memory leak

As far as I can tell I am closing the connection and channels (using abort rather than close).

Thanks to the responder on the forum thread (https://groups.google.com/forum/#!topic/rabbitmq-users/JOZl7VCWSNg) I am creating this issue with the hope of getting a fix into the next release. It is a problem because it leaves tomcat in a half shutdown state (admins might not realise it it has not shutdown properly before attempting to re-start).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.