Giter Site home page Giter Site logo

Comments (10)

budjb avatar budjb commented on September 8, 2024

I've found the root cause of this issue. Here are the steps to reproduce, with an auto-recovering connection:

  1. Create a channel
  2. Create a temporary queue (exclusive, not durable, auto delete).
  3. Start consuming from the temporary queue.
  4. Publish a message to a queue that will get picked up by some RPC server.
  5. Receive a message from the temporary queue or timeout.
  6. Cancel consuming from the queue.
  7. Close the channel.

At this point, we are done with the temporary queue. It turns out that if we do not call queueDelete, there are objects that hang around that we no longer need (specifically, RecordedQueue and RecoveryAwareChannelN). We can not call queueDelete since we already canceled consuming, and the queue would have deleted itself. If the call to delete the queue is made, an exception will occur and the channel will close.

The only workaround I could find is to call deleteRecordedQueue on the channel object. This is nasty to me because that method is marked private. Fortunately, we are using groovy which disregards visibility modifiers. There needs to be a more formal method to clear these objects up for this use case, as this will eventually fill the heap on long running applications.

from rabbitmq-java-client.

budjb avatar budjb commented on September 8, 2024

If a code example would help, you can see this source file.

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

queue.delete is idempotent in 3.x, so you absolutely can call Channel#queueDelete. It's a tricky thing to know when connection recovery information should be deleted for auto-delete queues.

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

One thing we can do is to delete the queue on explicit basic.cancel if it is auto-deleted. Unfortunately, it may be trickier for server-initiated cancellation. Fortunately, the latter should be rare to occur.

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

Internal tracker bug: 26364.

from rabbitmq-java-client.

budjb avatar budjb commented on September 8, 2024

You mentioned queue.delete is idempotent in 3.x. Is server 3.x required for this?

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

@budjb yes, it's the server that performs the deletion (and decides whether deleting what's not there should be considered a success).

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

Fixed in master (as well as we think we can do it at the moment).

from rabbitmq-java-client.

Allan-Nava avatar Allan-Nava commented on September 8, 2024

I have this code for autorecover the connection but flood the connections.

`

public class AMQP{
private ConnectionFactory connectionFactory = new ConnectionFactory();
private Connection connection;
private ConnectorThread subscribeThread;
private final String TAG = "AMQP";
private final String EXCHANGE = "sample";
private final String USERNAME = "sample";
private final String PASSWORD = "sample";
private final String HOST = "sample";
private final String ROUTING_KEY = Utils.getIPAddress();
private Config config;
private ConnectionOptions options;
private Channel channel;
private List connections = new ArrayList<>();

public AMQP(Handler messageHandler) {
    //setupConnectionFactory();
    subscribe(messageHandler);
}
private void setupConnectionFactory(){
    // set the heartbeat timeout to 60 seconds
    connectionFactory.setRequestedHeartbeat(60);
    connectionFactory.setAutomaticRecoveryEnabled(true);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    connectionFactory.setHost(HOST);
    connectionFactory.setPort(5672);

    //Lyra config
    /*config = new Config()
            .withConnectionListeners()
            .withRecoveryPolicy(RecoveryPolicies.recoverAlways())
            .withRetryPolicy(new RetryPolicy()
                    .withMaxAttempts(20)
                    .withInterval(Duration.seconds(5))
                    .withMaxDuration(Duration.minutes(5)));
    //Lyra option config for connection
    options = new ConnectionOptions().withUsername(USERNAME)
            .withRequestedHeartbeat(Duration.seconds(20))
            .withPassword(PASSWORD)
            .withHost(HOST)
            .withPort(5672);*/

}

/*public boolean isConnected()
{
    return subscribeThread.isConnectionOpen();
}*/

private void subscribe(final Handler handler)
{
    subscribeThread = new ConnectorThread(handler);
    subscribeThread.start();
}
void closeConnection(){
    Log.d(TAG, "closeConnection();");
    subscribeThread.interrupt();
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
void closeThread()
{
    Log.d(TAG, "closeThread();");
    subscribeThread.interrupt();
}


private class ConnectorThread extends Thread {
    public boolean connectionLost = true;

    Handler handler;

    public ConnectorThread(Handler handler)
    {
        this.handler = handler;
    }

    @Override
    public void run() {
        Log.d(TAG, "connectionLost => start: "+connectionLost);
        while(connectionLost) {
            Log.d(TAG, "connectionLost => inside while(); "+connectionLost);
            try {
                //connection = connectionFactory.newConnection();
                connection = MySingletonConnection.getInstance().getConnection();
                //Added connection in array
                connections.add(connection);
                Log.d(TAG, "List connections: "+connections.toString());
                //connection = Connections.create(options, config);
                channel = connection.createChannel();
                connectionLost = false;
                Log.d(TAG, "connectionLost => inside while and try: "+connectionLost);
                Log.d(TAG, "connection => inside while and try: "+connection.toString());
                // Declare a queue and bind it to an exchange.
                com.rabbitmq.client.AMQP.Queue.DeclareOk q = channel.queueDeclare("manager2box-"+ROUTING_KEY, true, false, true, null);
                channel.queueBind(q.getQueue(), EXCHANGE, ROUTING_KEY);
                Log.d(TAG, "channel => inside while and try: "+channel.toString());
                // Create the QueueingConsumer and have it consume from the queue
                        /*QueueingConsumer consumer = new QueueingConsumer(channel);
                        channel.basicConsume(q.getQueue(), false, consumer);

                        while (true) {
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            Log.d(TAG, message);
                            Message msg = handler.obtainMessage();
                            Bundle bundle = new Bundle();
                            bundle.putString("commandBundle", message);
                            msg.setData(bundle);
                            handler.sendMessage(msg);
                        }*/

                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        Log.d(TAG, message);
                        Message msg = handler.obtainMessage();
                        Bundle bundle = new Bundle();
                        bundle.putString("commandBundle", message);
                        msg.setData(bundle);
                        handler.sendMessage(msg);
                    }
                };
                channel.basicConsume(q.getQueue(), true, consumer);

            } catch (Exception e1) {
                Log.d(TAG, "Connection broken: " + e1.getClass().getName());
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                e1.printStackTrace();

                try {
                    Thread.sleep(5000); //sleep and then try again
                    Log.d(TAG, "Thread.sleep(5000); ");
                } catch (InterruptedException e) {
                    Log.d(TAG, "Thread.sleep(5000); InterruptedException");
                    e.printStackTrace();
                    connectionLost = true;
                    Log.d(TAG, "connectionLost => inside InterruptedException and try: "+connectionLost);
                    //connection.close();
                    flushConnection();
                    break;
                }
            }
        }
    }
    /*public boolean isConnectionOpen(){
        Log.i(TAG, "connection is open? "+ connectionOpen);
        return connectionOpen;
    }*/
}

void flushConnection(){
    Log.v(TAG, "flushConnection()");
    for (Connection connection : connections)
    {
        if (!connection.isOpen()) try {
            Log.d(TAG, "connection.close() flushConnection(); => "+connection.toString());
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

}

`

from rabbitmq-java-client.

michaelklishin avatar michaelklishin commented on September 8, 2024

@Allan-Nava questions and discussions belong to the mailing list. Automatic connection recovery is not responsible for cleaning up auto-delete queues. If an queue never had any consumers, it won't be deleted.

from rabbitmq-java-client.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.