Giter Site home page Giter Site logo

Comments (11)

michalsteyn avatar michalsteyn commented on September 20, 2024 1

I have uploaded a sample simulating the issue here.

So it makes sense, every time a process restarts, it creates a new unique queue for that process. That queue is clearly used to receive the response messages.

Just to explain our architecture. We develop Airport Kiosks. Each Kiosk has a worker process that executes a workflow. When the Kiosk restarts our Micro Service Environment will restart the worker process. In other words, it would be perfect if we can create a named queue that receives the response messages for each kiosk. Ex. kiosk-1-response, kiosk-2-response, etc.

But how can we create a RequestClient and force it to use a named queue instead of the auto-generated unique process queue? I think this is where we are missing something.

Here are the code snippets from the example for the Requester and Responder. Note, the Responder is a long-lived Micro Service. The Requester simulates short-lived worker processes.

Responder:

static void Main()
{
    Console.WriteLine("Starting Responder...");

    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        var host = sbc.Host(new Uri("rabbitmq://localhost"), h => {});

        sbc.ReceiveEndpoint(host, "channel-test", ep =>
        {
            ep.Handler<RequestMessage>(context => context.RespondAsync(
                new ResponseMessage { Text = $"Hallo {context.Message.Text}" }
            ));
        });
    });

    bus.Start();
    Console.WriteLine("Press any key to exit...");
    Console.ReadKey();
    bus.Stop();
}

And here is the Requester:

static void Main()
{
    Console.WriteLine("Starting Requester...");
    Console.WriteLine("Press ctrl-C to exit...");
    while (true)
    {
        SimulateProcessRestart().GetAwaiter().GetResult(); 
    }
}

// This Simulates a new Short Lived Process
static async Task SimulateProcessRestart()
{
    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        sbc.Host(new Uri("rabbitmq://localhost"), h => {});
    });
    await bus.StartAsync();

    //Is there a way to create a RequestClient and bind it to a named response queue?
    var client = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/channel-test"));
    var response = await client.GetResponse<ResponseMessage>(new RequestMessage {Text = "World"});

    Console.WriteLine($"Received Response: {response.Message.Text}");
    await bus.StopAsync();
}

P.S. And just to be clear, this is just a simplified example. In production, each short-lived process can live for hours. We reuse the bus for all message transactions in that process. So just to be clear, we don't create a bus per request, hundreds of requests are typically sent per process instance. But we have multiple hundreds of Kiosks, and so after a week or two, we end up with channel counts in the hundreds of thousands on our RabbitMQ Server.

from greenpipes.

phatboyg avatar phatboyg commented on September 20, 2024

Are you retaining an ISendEndpoint reference beyond the immediate use of it in your code? The reason I ask, is that is a big no-no. If you are infrequently calling Send on ISendEndpoint, you should ask for it from the ISendEndpointProvider when you need it, using the endpoint address. That way, the cache can behave properly. Every time you call Send on a cached endpoint, it is moved to the current bucket (if it isn't already in it), preventing it from being disposed. However, if you get the send endpoint, and hang on to it, and then call Send after the bucket is ready to be disposed, you will be using a likely disposed send endpoint. Does that make sense?

from greenpipes.

dmitrynovik avatar dmitrynovik commented on September 20, 2024

No we do not hold references to send endpoints. We use the MessageRequestClient and consumers, the rest is happening inside the MT.

Example:

        public async Task Consume(ConsumeContext<ICountRequest> context)
        {
            try
            {
                Log.Trace($"Handling MassTransit ICountRequest: Consumer ID: {_id}");
                var count = await _serviceImplementation.Count();
                await context.RespondAsync<ICountResponse>(new { Count = count }).ConfigureAwait(false);
            }
            catch (Exception e)
            {
                Log.Error(e, "Error Handling ICountRequest");
                throw;
            }
        }

from greenpipes.

michalsteyn avatar michalsteyn commented on September 20, 2024

Hi @phatboyg, just to clarify, we make use of the Request / Response pattern.

We use a microservice architecture and sometimes have a huge spike in transient processes. Whenever these instances send a request to our long-lived microservices, GreenPipes end up creating and caching a RabbitMQ channel when responding to the request (in the long-lived microservices).

This consequently is leading to massive channel leaks that are causing our RabbitMQ cluster to crash. After a couple of weeks, we have channel counts in the multiple hundreds of thousands of channels in RabbitMQ. Most of these channels are stale but remain cached by Green Pipes.

In an effort to reduce this effect, we reduced the cache limits to force GP to evict the stale channels sooner. But now we are experiencing active channels also getting evicted.

from greenpipes.

phatboyg avatar phatboyg commented on September 20, 2024

I guess my simple answer, why are you creating a new endpoint/response queue for each request? You should be reusing existing response queues, it takes a lot of cpu/memory to create/destroy those constantly. In any high volume site, you should be sharing response queues on web api services.

from greenpipes.

michalsteyn avatar michalsteyn commented on September 20, 2024

Update: @dmitrynovik implemented the following change, and it seems to have solved the problem:

static async Task SimulateProcessRestart()
{
    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        sbc.Host(new Uri("rabbitmq://localhost"), h => {});
        
        // Override bus endpoint queue name (create unique logical name per process name+machine)
        sbc.OverrideDefaultBusEndpointQueueName($"{typeof(Program).FullName}-{Environment.MachineName}");
    });
    await bus.StartAsync();
}

Are we on the right track by using sbc.OverrideDefaultBusEndpointQueueName?

from greenpipes.

phatboyg avatar phatboyg commented on September 20, 2024

Yeah, that would reduce the churn and use a consistent name for each machine, instead of creating a new unique one for every process invocation. Just need to make sure the process isn't running multiple times on the same box, since both instances would end up using the same queue.

from greenpipes.

michalsteyn avatar michalsteyn commented on September 20, 2024

Excellent. Yes, we have mechanisms in place to prevent multiple instances running for the same Kiosk.
This solves our issue. Thanks for the help.

from greenpipes.

phatboyg avatar phatboyg commented on September 20, 2024

I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.

from greenpipes.

dmitrynovik avatar dmitrynovik commented on September 20, 2024

I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.

Hi @phatboyg ,
Could you please elaborate more on the forthcoming change?

The reason I'm asking is that even after having the predefined RabbitMQ queue names, our channel leak problem has not been solved.

from greenpipes.

phatboyg avatar phatboyg commented on September 20, 2024

If you have a set list of queue names, and you are still seeing channel leaks, I don't think it's MassTransit. IF you have dotTrace, and can do a memory profile, you should be able to see what is holding on to references of those send endpoints.

from greenpipes.

Related Issues (8)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.