Giter Site home page Giter Site logo

OnDemandSource does not continue sending messages after the RabbitMQ Connection is lost and recovered about reactor-rabbitmq-streams-getting-started HOT 15 CLOSED

marcialrosales avatar marcialrosales commented on May 10, 2024
OnDemandSource does not continue sending messages after the RabbitMQ Connection is lost and recovered

from reactor-rabbitmq-streams-getting-started.

Comments (15)

MarcialRosales avatar MarcialRosales commented on May 10, 2024

It turns out that in case of connection drop, OnDemandSource is not able to properly recover if
a) it was waiting for a publsher-confirmation
and/or
b) we try to send an event while the connection is down
Any of these two cases makes the OnDemandSource unresponsive once the connection is restored.

If there are no in-fight operations or send attempts while the connection is down then the OnDemandSource fully recovers once the connection is restored.

Reproducer environment:

  1. deploy local rabbitmq cluster: getting-started/bin/deploy-rabbit
  2. deploy schema registry server: getting-started/bin/deploy-schema-registry
  3. launch test application : getting-started/routing/from-rest-to-stream
    enable com.pivotal.rabbitmq: debug in the application.yml to get extra logging

To send an event we run the following command from the getting-started/routing folder:

curl -v -H "Content-Type: application/json" --request POST --data  '{ "id":"122", "category1":"cat1", "category2": "cat", "transport": "train", "value": 1500.0 }' localhost:8080/shipment/

To restart rabbitmq, run again the command: getting-started/bin/deploy-rabbit

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

@MarcialRosales this is consistent with my reproduction in our application. Do you have a suggestion or ETA for a fix? I had a few team members asking about this and I have fallen behind on these updates.

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

@MarcialRosales we might be seeing this even more frequently when leveraging the Service Access Gateway feature of RabbitMQ because the Load balancer will disconnect idle connections. I am still trying to find more details.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

Hi @xyloman , I dont have yet a fix but I know now what it is wrong . It turns out that the reactive rabbitmq publisher is swallowing messages when it fails to send them due to connection failures, i.e. it does not emit an ack/nack. reactive rabbitmq streams rely on an ack/nack otherwise it does not request further messages upstream/source. This makes the app to hang.
I am working on either fixing the reactive rabbitmq publisher or add some timeout mechanism on the reactive rabbitmq streams to make it handle missing publisher-confirmations.

I am working on it until I get it fix. I should have it before before EOD 26th Feb.

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

Thank you @MarcialRosales for the update.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

@xyloman we have fixed the problem but it requires reactor-rabbitmq-1.5.2-SNAPSHOT plus additional changes in several projects under reactive-rabbitmq-streams to do with upgrading to latest reactor core and reactor netty http. If by Friday, reactor-rabbitmq does not cut an official 1.5.2 , I ship reactive-rabbitmq-streams with reactor-rabbitmq-1.5.2-SNAPSHOT.

In the mean time, doing some regression testing due to the amount of library changes done in one shot.

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

@MarcialRosales thanks for the update much appreciated. Do you believe you would have the chance to test against a RabbitMQ service instance on TAS with Service Access Gateway enabled? I am noticing that we can reproduce this producer issue consistently in apps that connect through the service access gateway due to idle connection pruning that the tcp router appears to perform.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

@xyloman I have released 0.0.8 which fixes this issue. Please take into account the following before you take this release.

IMPORTANT: This release, 0.0.8. depends on a reactor-rabbitmq-1.5.2-SNAPSHOT.jar. The reactor team is releasing 1.5.2 sometime in March (probably, March 12th). I did not want to hold you up until then therefore in order to take 0.0.8 you need to include an additional repository. You can find here how to include that second repository.

I tested this fix with an application using Service Access Gateway without any issues. Furthermore, I killed producer connections via the management ui to simulate connection drops.

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

@MarcialRosales I can report that this does appear to be working in my initial tests so thank you for turning this around. I do believe I am seeing duplicate message delivery during the recover use case I will continue to test.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

@xyloman Duplicates are totally expected. Here it is why:

  1. application emits an event
  2. the stream sends the event
  3. broker sends publisher confirmation
  4. just before the publisher confirmation arrives to the stream library, the connection is dropped
  5. when the connection is recovered, the stream sends all the unconfirmed messages

In other words, when a connection drop occurs, we do not know whether the broker got the message or not. The connection could have been dropped just before the broker got the message, or after it got the message but before the publisher-confirm arrived to the client.

The new stream-type queue will prevent duplicates thanks to a new mechanism called publishingId.

By the way, I asked Scott to pass you the following questions to you. The last question refers to the stream-type which is not yet officially released but it is available for testing/evaluation.

  1. Do you create your messaging topologies declaratively via this library or do you manage them externally?
  2. Are you using today quorum queues or planning to use them? I am planning on supporting quorum queues and being able to declaratively create them. Will you be interested?
  3. Are you aware of a new queue type called stream? will you be interested in using them via this library? i.e. as you know what is behind a stream is totally transparent to the producer/consumer. Except for the logic that declares the topology, assuming you use the library to declare topologies

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

Do you create your messaging topologies declaratively via this library or do you manage them externally?

Yes we plan on leveraging the ability to declare the topologies via the API this has been a huge lift from what teams have done in the past especially around policies. However, the ability to do more complex policies (e.g. configuring expiry or other policy parameters does not appear to be exposed this would be nice)

Are you using today quorum queues or planning to use them? I am planning on supporting quorum queues and being able to declaratively create them. Will you be interested?

I have not but I have other teams that use quorum queues that have interest in using reactive rabbitmq streams and would be blocked because of the lack of support. If support was there I would evaluate it for the workload that I am working on currently not sure if the workload currently would benefit from quorum queues.

Are you aware of a new queue type called stream? will you be interested in using them via this library? i.e. as you know what is behind a stream is totally transparent to the producer/consumer. Except for the logic that declares the topology, assuming you use the library to declare topologies

I was not aware of the new Stream queue type however, the ability to do message de-duplication would be a huge win for the workload that I am currently working on, especially based upon the recovery story you mentioned earlier. However, we are currently running these workloads on TAS and I am not sure if the stream capability is supported on that flavor of Rabbit.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

@xyloman Thank you very much for that feedback !! really appreciated.

Today it is possible to configure a queue with message-ttl. It was not documented though and I have update the docs. See dlx example here. Here is another example I took from the unit tests:

TopologyBuilder buildDestinationQueue(TopologyBuilder builder, String destinationQueueName, String destination, String ... topics ) {
        String destinationQueueDlx = destinationQueueName + "-dlx";
        return builder
                .declareQueue(destinationQueueName)
                    .boundTo(destination, topics)
                    .withDeadLetterExchange(destinationQueueDlx)
                    .classic()
                        .withMessageTTL(Duration.ofSeconds(60))
                .and()
                .then();
    }

Right now, you can fully declare a queue and/or exchange via the topologyBuilder. Everything you can configure today via policies is available via TopologyBuilder. Except for quorum queues which is coming soon. Please let me know if you find, or your team, some other policy feature missing that you need.

With regards quorum queues, I am aiming to release support for quorum queues before 12th March.

With regards stream type, it will not be available in the TAS until RabbitMQ 3.9 is released (around June 2021).

With regards duplicated, what do you use to uniquely identify a message? e.g. are you doing .withAttributes().messageId(messageId()) or using headers? or is the message id within the payload?

from reactor-rabbitmq-streams-getting-started.

xyloman avatar xyloman commented on May 10, 2024

Message id is currently in the payload but I think we could easily add something like .withAttributes().messageId(r -> r.getId()) with out much fuss.

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

HI @xyloman , I would like to share some feedback from a customer in relation to the topology builder that you could pass along to other teams who are thinking of using the library.

#1 (comment)

These are the only policy features not supported via TopologyBuilder as of v0.0.8:

  • quorum queues (will be available in v0.0.9, most likely end of next week)
  • federation ( I am not entirely sure if an application needs to deal with federation configuration though)
  • queue TTL (i.e. destroying the entire queue. I am not sure this is useful but it can easily be added if really needed)

from reactor-rabbitmq-streams-getting-started.

MarcialRosales avatar MarcialRosales commented on May 10, 2024

Hi @xyloman , v0.0.9 is available with quorum queue support. This version does not need you to include a snapshot repository like in v0.0.8.

from reactor-rabbitmq-streams-getting-started.

Related Issues (5)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.