Comments (11)
I have uploaded a sample simulating the issue here.
So it makes sense, every time a process restarts, it creates a new unique queue for that process. That queue is clearly used to receive the response messages.
Just to explain our architecture. We develop Airport Kiosks. Each Kiosk has a worker process that executes a workflow. When the Kiosk restarts our Micro Service Environment will restart the worker process. In other words, it would be perfect if we can create a named queue that receives the response messages for each kiosk. Ex. kiosk-1-response, kiosk-2-response, etc.
But how can we create a RequestClient and force it to use a named queue instead of the auto-generated unique process queue? I think this is where we are missing something.
Here are the code snippets from the example for the Requester and Responder. Note, the Responder is a long-lived Micro Service. The Requester simulates short-lived worker processes.
Responder:
static void Main()
{
Console.WriteLine("Starting Responder...");
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri("rabbitmq://localhost"), h => {});
sbc.ReceiveEndpoint(host, "channel-test", ep =>
{
ep.Handler<RequestMessage>(context => context.RespondAsync(
new ResponseMessage { Text = $"Hallo {context.Message.Text}" }
));
});
});
bus.Start();
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
bus.Stop();
}
And here is the Requester:
static void Main()
{
Console.WriteLine("Starting Requester...");
Console.WriteLine("Press ctrl-C to exit...");
while (true)
{
SimulateProcessRestart().GetAwaiter().GetResult();
}
}
// This Simulates a new Short Lived Process
static async Task SimulateProcessRestart()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host(new Uri("rabbitmq://localhost"), h => {});
});
await bus.StartAsync();
//Is there a way to create a RequestClient and bind it to a named response queue?
var client = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/channel-test"));
var response = await client.GetResponse<ResponseMessage>(new RequestMessage {Text = "World"});
Console.WriteLine($"Received Response: {response.Message.Text}");
await bus.StopAsync();
}
P.S. And just to be clear, this is just a simplified example. In production, each short-lived process can live for hours. We reuse the bus for all message transactions in that process. So just to be clear, we don't create a bus per request, hundreds of requests are typically sent per process instance. But we have multiple hundreds of Kiosks, and so after a week or two, we end up with channel counts in the hundreds of thousands on our RabbitMQ Server.
from greenpipes.
Are you retaining an ISendEndpoint
reference beyond the immediate use of it in your code? The reason I ask, is that is a big no-no. If you are infrequently calling Send
on ISendEndpoint
, you should ask for it from the ISendEndpointProvider
when you need it, using the endpoint address. That way, the cache can behave properly. Every time you call Send
on a cached endpoint, it is moved to the current bucket (if it isn't already in it), preventing it from being disposed. However, if you get the send endpoint, and hang on to it, and then call Send
after the bucket is ready to be disposed, you will be using a likely disposed send endpoint. Does that make sense?
from greenpipes.
No we do not hold references to send endpoints. We use the MessageRequestClient
and consumers, the rest is happening inside the MT.
Example:
public async Task Consume(ConsumeContext<ICountRequest> context)
{
try
{
Log.Trace($"Handling MassTransit ICountRequest: Consumer ID: {_id}");
var count = await _serviceImplementation.Count();
await context.RespondAsync<ICountResponse>(new { Count = count }).ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error(e, "Error Handling ICountRequest");
throw;
}
}
from greenpipes.
Hi @phatboyg, just to clarify, we make use of the Request / Response pattern.
We use a microservice architecture and sometimes have a huge spike in transient processes. Whenever these instances send a request to our long-lived microservices, GreenPipes end up creating and caching a RabbitMQ channel when responding to the request (in the long-lived microservices).
This consequently is leading to massive channel leaks that are causing our RabbitMQ cluster to crash. After a couple of weeks, we have channel counts in the multiple hundreds of thousands of channels in RabbitMQ. Most of these channels are stale but remain cached by Green Pipes.
In an effort to reduce this effect, we reduced the cache limits to force GP to evict the stale channels sooner. But now we are experiencing active channels also getting evicted.
from greenpipes.
I guess my simple answer, why are you creating a new endpoint/response queue for each request? You should be reusing existing response queues, it takes a lot of cpu/memory to create/destroy those constantly. In any high volume site, you should be sharing response queues on web api services.
from greenpipes.
Update: @dmitrynovik implemented the following change, and it seems to have solved the problem:
static async Task SimulateProcessRestart()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host(new Uri("rabbitmq://localhost"), h => {});
// Override bus endpoint queue name (create unique logical name per process name+machine)
sbc.OverrideDefaultBusEndpointQueueName($"{typeof(Program).FullName}-{Environment.MachineName}");
});
await bus.StartAsync();
}
Are we on the right track by using sbc.OverrideDefaultBusEndpointQueueName
?
from greenpipes.
Yeah, that would reduce the churn and use a consistent name for each machine, instead of creating a new unique one for every process invocation. Just need to make sure the process isn't running multiple times on the same box, since both instances would end up using the same queue.
from greenpipes.
Excellent. Yes, we have mechanisms in place to prevent multiple instances running for the same Kiosk.
This solves our issue. Thanks for the help.
from greenpipes.
I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.
from greenpipes.
I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.
Hi @phatboyg ,
Could you please elaborate more on the forthcoming change?
The reason I'm asking is that even after having the predefined RabbitMQ queue names, our channel leak problem has not been solved.
from greenpipes.
If you have a set list of queue names, and you are still seeing channel leaks, I don't think it's MassTransit. IF you have dotTrace, and can do a memory profile, you should be able to see what is holding on to references of those send endpoints.
from greenpipes.
Related Issues (8)
- TypeCache: StackOverflowException if class has property of same type as class itself HOT 5
- Rebuild GreenCache may cause deadlock?
- DynamicFilter Race Condition
- Conveying state object in context HOT 2
- Abstract or Virtual properties cause ArgumentExceptions HOT 3
- GreenPipes NuGet Package incorrectly specifies patch-level HOT 1
- Wrong version on greenpipes.dll
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from greenpipes.