Comments (10)
I've found the root cause of this issue. Here are the steps to reproduce, with an auto-recovering connection:
- Create a channel
- Create a temporary queue (exclusive, not durable, auto delete).
- Start consuming from the temporary queue.
- Publish a message to a queue that will get picked up by some RPC server.
- Receive a message from the temporary queue or timeout.
- Cancel consuming from the queue.
- Close the channel.
At this point, we are done with the temporary queue. It turns out that if we do not call queueDelete
, there are objects that hang around that we no longer need (specifically, RecordedQueue
and RecoveryAwareChannelN
). We can not call queueDelete
since we already canceled consuming, and the queue would have deleted itself. If the call to delete the queue is made, an exception will occur and the channel will close.
The only workaround I could find is to call deleteRecordedQueue
on the channel object. This is nasty to me because that method is marked private. Fortunately, we are using groovy which disregards visibility modifiers. There needs to be a more formal method to clear these objects up for this use case, as this will eventually fill the heap on long running applications.
from rabbitmq-java-client.
If a code example would help, you can see this source file.
from rabbitmq-java-client.
queue.delete
is idempotent in 3.x, so you absolutely can call Channel#queueDelete
. It's a tricky thing to know when connection recovery information should be deleted for auto-delete queues.
from rabbitmq-java-client.
One thing we can do is to delete the queue on explicit basic.cancel
if it is auto-deleted. Unfortunately, it may be trickier for server-initiated cancellation. Fortunately, the latter should be rare to occur.
from rabbitmq-java-client.
Internal tracker bug: 26364
.
from rabbitmq-java-client.
You mentioned queue.delete
is idempotent in 3.x. Is server 3.x required for this?
from rabbitmq-java-client.
@budjb yes, it's the server that performs the deletion (and decides whether deleting what's not there should be considered a success).
from rabbitmq-java-client.
Fixed in master (as well as we think we can do it at the moment).
from rabbitmq-java-client.
I have this code for autorecover the connection but flood the connections.
`
public class AMQP{
private ConnectionFactory connectionFactory = new ConnectionFactory();
private Connection connection;
private ConnectorThread subscribeThread;
private final String TAG = "AMQP";
private final String EXCHANGE = "sample";
private final String USERNAME = "sample";
private final String PASSWORD = "sample";
private final String HOST = "sample";
private final String ROUTING_KEY = Utils.getIPAddress();
private Config config;
private ConnectionOptions options;
private Channel channel;
private List connections = new ArrayList<>();
public AMQP(Handler messageHandler) {
//setupConnectionFactory();
subscribe(messageHandler);
}
private void setupConnectionFactory(){
// set the heartbeat timeout to 60 seconds
connectionFactory.setRequestedHeartbeat(60);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setHost(HOST);
connectionFactory.setPort(5672);
//Lyra config
/*config = new Config()
.withConnectionListeners()
.withRecoveryPolicy(RecoveryPolicies.recoverAlways())
.withRetryPolicy(new RetryPolicy()
.withMaxAttempts(20)
.withInterval(Duration.seconds(5))
.withMaxDuration(Duration.minutes(5)));
//Lyra option config for connection
options = new ConnectionOptions().withUsername(USERNAME)
.withRequestedHeartbeat(Duration.seconds(20))
.withPassword(PASSWORD)
.withHost(HOST)
.withPort(5672);*/
}
/*public boolean isConnected()
{
return subscribeThread.isConnectionOpen();
}*/
private void subscribe(final Handler handler)
{
subscribeThread = new ConnectorThread(handler);
subscribeThread.start();
}
void closeConnection(){
Log.d(TAG, "closeConnection();");
subscribeThread.interrupt();
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
void closeThread()
{
Log.d(TAG, "closeThread();");
subscribeThread.interrupt();
}
private class ConnectorThread extends Thread {
public boolean connectionLost = true;
Handler handler;
public ConnectorThread(Handler handler)
{
this.handler = handler;
}
@Override
public void run() {
Log.d(TAG, "connectionLost => start: "+connectionLost);
while(connectionLost) {
Log.d(TAG, "connectionLost => inside while(); "+connectionLost);
try {
//connection = connectionFactory.newConnection();
connection = MySingletonConnection.getInstance().getConnection();
//Added connection in array
connections.add(connection);
Log.d(TAG, "List connections: "+connections.toString());
//connection = Connections.create(options, config);
channel = connection.createChannel();
connectionLost = false;
Log.d(TAG, "connectionLost => inside while and try: "+connectionLost);
Log.d(TAG, "connection => inside while and try: "+connection.toString());
// Declare a queue and bind it to an exchange.
com.rabbitmq.client.AMQP.Queue.DeclareOk q = channel.queueDeclare("manager2box-"+ROUTING_KEY, true, false, true, null);
channel.queueBind(q.getQueue(), EXCHANGE, ROUTING_KEY);
Log.d(TAG, "channel => inside while and try: "+channel.toString());
// Create the QueueingConsumer and have it consume from the queue
/*QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(q.getQueue(), false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
Log.d(TAG, message);
Message msg = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putString("commandBundle", message);
msg.setData(bundle);
handler.sendMessage(msg);
}*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, message);
Message msg = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putString("commandBundle", message);
msg.setData(bundle);
handler.sendMessage(msg);
}
};
channel.basicConsume(q.getQueue(), true, consumer);
} catch (Exception e1) {
Log.d(TAG, "Connection broken: " + e1.getClass().getName());
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
e1.printStackTrace();
try {
Thread.sleep(5000); //sleep and then try again
Log.d(TAG, "Thread.sleep(5000); ");
} catch (InterruptedException e) {
Log.d(TAG, "Thread.sleep(5000); InterruptedException");
e.printStackTrace();
connectionLost = true;
Log.d(TAG, "connectionLost => inside InterruptedException and try: "+connectionLost);
//connection.close();
flushConnection();
break;
}
}
}
}
/*public boolean isConnectionOpen(){
Log.i(TAG, "connection is open? "+ connectionOpen);
return connectionOpen;
}*/
}
void flushConnection(){
Log.v(TAG, "flushConnection()");
for (Connection connection : connections)
{
if (!connection.isOpen()) try {
Log.d(TAG, "connection.close() flushConnection(); => "+connection.toString());
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
`
from rabbitmq-java-client.
@Allan-Nava questions and discussions belong to the mailing list. Automatic connection recovery is not responsible for cleaning up auto-delete queues. If an queue never had any consumers, it won't be deleted.
from rabbitmq-java-client.
Related Issues (20)
- Rabbitmq Consumers Stops consuming
- Make RpcClient (Auto)Closeable HOT 1
- Do not confirmSelect more than once per channel HOT 1
- Add ability to specify maximum message size HOT 8
- Bump dependencies
- IndexOutOfBoundsException when calling ConnectionFactory.newConnection() HOT 4
- close method of the AMQConnection class HOT 1
- Channels that have had exceptions but not explicitly closed can be recovered via connection recovery HOT 3
- Bump dependencies
- Add a "builder" (chained setters) API to the ConnectionFactory class HOT 2
- amqp-client-5.19.0 has a required OSGi dependency on micrometer HOT 3
- Bump dependencies
- Memory leak in RpcClient HOT 2
- Occasional leak of Thread/Channel instances and threads blocking indefinately HOT 6
- Too large message causes infinite loop / consumer can no longer consume messages HOT 2
- Thread Pinning occurs with VirtualThreads enabled (spring boot 3) HOT 2
- Handshake error when connecting to AWS NLB using TLS 1.2 and NIO HOT 2
- Bump dependencies
- Invalid value check in NioParams.setWriteByteBufferSize HOT 1
- An interface for filtering out entities (e.g. queues) from the topology recovery cache
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rabbitmq-java-client.