Giter Site home page Giter Site logo

particular / nservicebus.rabbitmq Goto Github PK

View Code? Open in Web Editor NEW
87.0 17.0 57.0 11.62 MB

RabbitMQ transport for NServiceBus

Home Page: https://docs.particular.net/nservicebus/rabbitmq/

License: Other

PowerShell 0.02% C# 99.96% Batchfile 0.02%
nservicebus rabbitmq c-sharp rabbitmq-transport

nservicebus.rabbitmq's Introduction

NServiceBus.RabbitMQ

NServiceBus.RabbitMQ supports sending messages over RabbitMQ using the RabbitMQ .NET Client.

It is part of the Particular Service Platform, which includes NServiceBus and tools to build, monitor, and debug distributed systems.

See the RabbitMQ Transport documentation for more details on how to use it.

Running tests locally

All tests use the default connection string host=localhost. This can be changed by setting the RabbitMQTransport_ConnectionString environment variable.

For developers using Docker containers, the following docker command will quickly setup a container configured to use the default port:

docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

With this setup, the default connection string will work.

Setting up a docker cluster

A 3-node RabbitMQ cluster with a HAProxy load balancer in front and by default mirroring all queues across all 3 nodes can be set up using docker for testing or development purposes by running the following script:

Setup cluster network:

docker network create --driver bridge rabbitnet

Setup cluster:

docker run -d --network rabbitnet --hostname rabbit1 --name rabbit1 -v rabbitmq-data:/var/lib/rabbitmq rabbitmq:3-management
docker run -d --network rabbitnet --hostname rabbit2 --name rabbit2 -v rabbitmq-data:/var/lib/rabbitmq rabbitmq:3-management
docker run -d --network rabbitnet --hostname rabbit3 --name rabbit3 -v rabbitmq-data:/var/lib/rabbitmq rabbitmq:3-management

docker exec rabbit2 rabbitmqctl stop_app
docker exec rabbit2 rabbitmqctl join_cluster rabbit@rabbit1
docker exec rabbit2 rabbitmqctl start_app

docker exec rabbit3 rabbitmqctl stop_app
docker exec rabbit3 rabbitmqctl join_cluster rabbit@rabbit1
docker exec rabbit3 rabbitmqctl start_app

Setup classic queue mirroring:

Note that mirroring of classic queues will be removed in a future version of RabbitMQ. Consider using quorum queues instead.

docker exec rabbit1 rabbitmqctl set_policy ha-all "\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

Create haproxy.cfg file for configuring HAProxy:

global
        log 127.0.0.1   local1
        maxconn 4096
 
defaults
        log     global
        mode    tcp
        option  tcplog
        retries 3
        option redispatch
        maxconn 2000
        timeout connect 5000
        timeout client 50000
        timeout server 50000
 
listen  stats
        bind *:1936
        mode http
        stats enable
        stats hide-version
        stats realm Haproxy\ Statistics
        stats uri /
 
listen rabbitmq
        bind *:5672
        mode            tcp
        balance         roundrobin
        timeout client  3h
        timeout server  3h
        option          clitcpka
        server          rabbit1 rabbit1:5672  check inter 5s rise 2 fall 3
        server          rabbit2 rabbit2:5672  check inter 5s rise 2 fall 3
        server          rabbit3 rabbit3:5672  check inter 5s rise 2 fall 3

listen mgmt
        bind *:15672
        mode            tcp
        balance         roundrobin
        timeout client  3h
        timeout server  3h
        option          clitcpka
        server          rabbit1 rabbit1:15672  check inter 5s rise 2 fall 3
        server          rabbit2 rabbit2:15672  check inter 5s rise 2 fall 3
        server          rabbit3 rabbit3:15672  check inter 5s rise 2 fall 3

Setup HAProxy container, note correct the path where haproxy.cfg is saved.

docker run -d --network rabbitnet --hostname rabbitha --name rabbitha -p 15672:15672 -p 5672:5672 -v ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro haproxy:1.7

Setup quorum queues:

quorum queues

After all these commands have run, a 3-node RabbitMQ cluster will be running that should be accessible via the load balancer.

nservicebus.rabbitmq's People

Contributors

adamralph avatar andreasohlund avatar awright18 avatar boblangley avatar bording avatar danielmarbach avatar davidboike avatar dependabot-preview[bot] avatar dependabot[bot] avatar helenktsai avatar heskandari avatar indualagarsamy avatar internalautomation[bot] avatar janovesk avatar kbaley avatar kentdr avatar mauroservienti avatar mikeminutillo avatar particularbot avatar ramonsmits avatar seanfarmar avatar seanfeldman avatar simoncropp avatar soujay avatar synhershko avatar szymonpobiega avatar timbussmann avatar tmasternak avatar williambza avatar wojcikmike avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nservicebus.rabbitmq's Issues

Race condition in RabbitMqDequeueStrategy can cause deadlock

Hello, I've recently noticed that RabbitMqDequeueStrategy can deadlock itself.

Use case - NServiceBus is shutting down while RabbitMQ broker connection is unavailable/unstable (especially if CriticalErrorAction is reassigned from default "Fail app" to something more fault tolerant, like ignoring RMQ Broker connection failures).

Technical details:
When exception occures in consumer task (started in StartConsumer method), countdownEvent gets incremented and new consumer task gets scheduled (task recreation). The problem occurs when CancellationToken gets Cancelled in between countdownEvent.TryAddCount and the point when task scheduler processes new consumer task. TPL will not execute task when cancellation token is already Cancelled, thus making countdownEvent.Wait() caller thread to be deadlocked, since countdownEvent was already incremented and will not get decremented (Signal in finally block inside ConsumeMessages task).

A bit of code (from 'master' branch):

        void StartConsumer(string queue)
        {
            var token = tokenSource.Token;
            // ConsumeMessages will not be invoked if token.IsCancellationRequested is true
            Task.Factory
                .StartNew(ConsumeMessages, new ConsumeParams { Queue = queue, CancellationToken = token }, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                .ContinueWith(t =>
                {
                    t.Exception.Handle(ex =>
                    {
                        circuitBreaker.Failure(ex);
                        return true;
                    });

                    if (!tokenSource.IsCancellationRequested)
                    {
                        if (countdownEvent.TryAddCount())
                        {
                            // countdownEvent is incremented before task is actually started
                            StartConsumer(queue);
                        }
                    }
                }, TaskContinuationOptions.OnlyOnFaulted);
        }

Possible fix:
Call countdownEvent.TryAddCount in the beginning of ConsumeMessages task while double-checking token.IsCancellationRequested before and after TryAddCount.

The problem seems to affect all versions of NServiceBus.Transports.RabbitMQ library.

P.S. I couldn't find any documentation about this TPL behavior on MSDN, but you can easily test it, see code example below:

    public class Program
    {
        public static void Main(string[] args)
        {
            var cancellationTokenSource = new System.Threading.CancellationTokenSource();
            var token = cancellationTokenSource.Token;
            cancellationTokenSource.Cancel();
            System.Threading.Tasks.Task.Factory.StartNew(
                obj => System.Console.WriteLine("Task started"),
                token,
                token,
                System.Threading.Tasks.TaskCreationOptions.LongRunning,
                System.Threading.Tasks.TaskScheduler.Default);

            System.Console.ReadLine();
            // Will not write anything to console
        }
    }

Building this repository

I am unable to build this repository from a fresh clone. I was able to build the NServiceBus repository without any problems. Are there any steps I need to take in order to build this locally?

High memory consumption in the Erlang VM (64bit OTP 17) when using NServiceBus.RabbitMQ

I’m seeing some rather strange issues on our production "pilot" box concerning AMPQ stream disconnects. (RabbitMQ) We’re dropping messages randomly. The rate is dropping about 1 a day with 4 customers.

“The AMPQ operation was interrupted: AMPQ close-reason, initiated by library, Code =0, text-“End of Stream”, classId=0,method=0, cause=System.IO.EndOfStreamException Heatbeat missing with heartbeat==5 seconds”

Now to me it seems like RabbitMQ is unavailable for at least 5 seconds? I should note the pilot box were using in production is running on a VM (Windows Server 2008 w/32GB RAM). We have like 16 NServiceBus Services running on that box, With Erlang and Rabbit MQ on it as well.

Our scale out is obviously to migrate Erlang and Rabbit MQ into a clustered Linux (CentOS) area.
Additionally we plan on horizontally scaling our NServiceBus host instances as well. But it seems odd to me that Erlang would just fail to free resources under load. I understand that when Erlang reaches the high water mark for memory it will attempt to "dump" messages from RAM onto disk.

I am seeing issues with RabbitMQ and stress testing where the ErlangVM is crashing when it approaches 2GB. This would be in a stress testing environment and is not the same as production. I was trying to see if i could reproduce the same errors I was seeing in production. When I examined the high water mark in production it's about 13.5GB so Erlang appears to be some what greedy. After some reading it appears you should never run the Erlang VM on a shared service box.

I’m running all of this on one box. RabbitMQ, Erlang and several NserviceBus windows services.

Windows Server 2008 R2 64bit
32GB ram
8 terabyte drive space
Erlang OTP 17
RabbitMQ 3.3.1

We’re running with a few pilot customers on board now. Our message rates are really quite low.
But were seeing these AMPQ end of stream disconnects. And Erlang climbs in memory consumption.

The actual data were putting on the bus (RabbitMQ) is actually very small.

Example:

00000000-0000-0000-0000-000000000000
92cd53c5-6db8-4b95-b10a-24c232180b74
82a63e2f-da94-49cd-895d-f96da5c031fb
82a63e2f-da94-49cd-895d-f96da5c031fb
PartsOrder
TC
ToiServer
{"Guid":"660f6a66-ac99-47a2-868c-802b25376522","TransactionId":"82a63e2f-da94-49cd-895d-f96da5c031fb","CorrelationId":null,"SenderNameCode":null,"SystemIdentifier":"C178775","DestinationNameCode":"TC","DealerCode":"3647","GroupId":"BUS","AccountId":null,"TransactionName":"ProcessPartsOrder","MessageType":"ProcessPartsOrder","ReferenceId":"ORD10STK","MessageStatus":"DMS request","LanguageCode":null,"CountryId":null}

Which is just around 1k

I am seeing the Erlang VM just climb in memory and processes as I seen more and more messages. It finally just crashes with an erlang crash_dump file when I’m at about 4k messages in 5 minutes.
The memory consumption is not bad at all only about 2GB but that seems high to me for the size of the messages I’m sending. Perhaps the client is not sending Ack’s back? It looks like the Erlang VM is very slow with memory collection.

When I run AppDynamics in our stress testing environment I’m hitting a bottle neck in Erlang at about 2GB which is not your issue it’s an Erlang VM issue I think.
I’m just wondering if the RabbitMQ client that is being used is causing this.

P.S. I have crash dumps for the Erlang instance in my stress environment but in production RabbitMQ appears to not crash but "hicups" quite a bit.

Thank you very much!

Eric Cotter

Messages not getting sent when SqlConnection is used inside message handlers

It appears that when custom SQL queries to MS SQL Server (using SqlConnection) are made during the lifetime of a NServiceBus message handler, non-deterministic behavior is resulted. Sometimes ambient transaction is never committed, which results in NServiceBus.RabbitMQ never sending out messages that should be sent. That is apparently because flushing of NSB's unit of work is bound to TransactionCompleted event, which sometimes never launches.

I'm able to work around the issue the following way: instead of letting SqlConnection to automatically enlist to ambient transaction, I'm preventing it using 'Enlist=false' in connection string. I don't use System.Transactions at all with the SqlConnection, but I hook up SqlTransaction using NServiceBus'es IManageUnitsOfWork interface.

The issue can be reproduced: https://github.com/peuramaki/NServiceBus.RabbitMq.Issue26

EDIT: workaround suggested here doesn't fix my production scenario, non-deterministic behavior continues.

It also seems that if I forget to close the SqlConnection (or return it to connection pool), non-deterministic behavior is gone.

Only implicitly create exchanges for types being published. Limit queue ...

Raised by @fhalim
Migrated from by Particular/NServiceBus#1352

Background

We are having a non-NServiceBus endpoint integrate with an NServiceBus endpoint over RabbitMQ. The non-NServiceBus endpoint application creates a temporary queue/exchange pair for itself, does the equivalent of a Bus.Send to the NServiceBus endpoint, expecting a response on the temporary exchange. However, when the NServiceBus endpoint does Bus.Reply, it first tries to create a queue/exchange pair for the recipient and fails because they have already been created, but are temporary instead of permanent.

Fix

This change makes it so that Bus.Send and Bus.Reply do not implicitly attempt to create queue/exchange pairs for the recipient. Only exchanges for the types being published are created implicitly.

Impact

  • If the recipient of a Bus.Send or a Bus.Reply does not exist, this will raise an exception. This behavior is in line with the MSMQ endpoint and I think is what we'd expect when we send a command to an endpoint. Queues will only get created by the subscribers that read from them.

Better default for the RabbitMQ prefetch count

Prefetch count is not defaulted to the same number of threads as the transport is configured to use. The old version defaulted to 1 which will have a negative impact on throughtput and put uneeded preassure on the broker by making it harder to find a free prefetch slot

Multiple Subscribers w/ Same Endpoint Name Don't All Receive Messages

See https://groups.google.com/forum/#!topic/particularsoftware/42FQlZcA7fY

We have an issue where we have a WebAPI server, but it is load balanced. When another process publishes an event, we want both WebAPI servers to receive the message; however, the way that NServiceBus has configured the queues/exchanges, it appears to not support this. Is this true? Both WebAPI servers have the same EndpointName() defined (as "WebService")

So if I go into RabbitMQ Management UI, it shows that there is an exchange of "WebService" and that it binds to a queue named "WebService" and within that queue, it has 2 consumers (both of our WebAPI machines). However, according to http://stackoverflow.com/questions/10620976/rabbitmq-amqp-single-queue-multiple-consumers-for-same-message this yields a round-robin message handling. The messages that are received in the queue will not be picked up by both handlers.

So my question is... how do we deal w/ the situation laid out? I think what we really want is for the queue name to be configurable as well as the exchange. We want both WebAPIs to be part of the same exchange, but have different queue names.


Thought I had this solved with

configuration.ScaleOut().UseUniqueBrokerQueuePerMachine();

However, that still doesn't work. The RabbitMQ topology looks like:

--Exchanges--
WebService
WebService.Machine1
WebService.Machine2

--Queues bound to exchange WebService--
WebService

--Queues bound to exchange WebService.Machine1--
WebService.Machine1

--Queues bound to exchange WebService.Machine2--
WebService.Machine2

--Consumers for queue WebService--
IP for machine 1
IP for machine 2

--Consumers for queue WebService.Machine1--
IP for machine 1

--Consumers for queue WebService.Machine2--
IP for machine 2

When I do a publish, it still appears to be routed to the WebService exchange, which goes to the WebService queue, which does the round-robin handling and only goes to one of the machines, but I need it to go to both. Shouldn't it change the configuration on RabbitMQ to fan out the published messages to all of the machines that are subscribing? It doesn't appear to do this by default?

Request for multiple Transport namespaces

This issue is similar to ParticularLabs/NServiceBus.WindowsServiceBus#10

In my scenario, I have the need to connect to multiple NSB/RabbitMQ services from a single central application. Security policies prevent at least one of the of the remote NSB endpoints from being able to communicate via the Gateway http fallback protocol. A proposed solution to the above issue was to namespace Transport connection strings, which could be applied to this scenario also:

<connectionStrings>
  <add name="NServiceBus/Transport" connectionString="host=default-hostname;virtualhost=vhost;username=...;password=..." />
  <add name="NServiceBus/Transport/NamespaceA" connectionString="namespaceA-hostname;virtualhost=vhostA;username=...;password=..." />
  <add name="NServiceBus/Transport/NamespaceB" connectionString="namespaceB-hostname;virtualhost=vhostB;username=...;password=..." />
  <add name="NServiceBus/Transport/NamespaceC" connectionString="namespaceC-hostnmae;virtualhost=vhostC;username=...;password=..." />
</connectionStrings>

Additionally, being able to make the runtime decision of which transport to use via the .Send() method would be extremely useful. This might be accomplished by embedding the namespace in the destination address of .Send(), or by additional optional parameters to specify the Transport namespace.

But why stop there? What about allowing this central application to be able to consume subscriptions to objects published from each of the Transport namespaces?

Make the name of the callback queue configurable

To allow users to use a stable name for scenarios where machines come and go.

Described by the user:


the scenario is basically many subscribers on vessels we operate.  The vessels have names that are more meaningful than the machine names running the subscriber on the vessel, additionally the machine names are somewhat randomly generated due to the virtualization software used.  So basically I was wondering if we could instead of having:

 <EndPoint>.<RandomlyGeneratedMachineName> 

have:

 <EndPoint>.<VesselName>

These machines are periodically replaced too and when that happens a new name is generated and thus a new queue is created with the new name but the old queue still exists and may have messages in it that will never be read without manual intervention.  We are using MSMQ now but we want to use Rabbit for clustering and centralization reasons.

RabbitMQ Transport seems to leak threads

For certain integration and acceptance tests we're starting the bus up in a self hosted scenario. It appears that the RabbitMQ transport is leaking threads which causes VSTest to not want to shut down.

This might be related to the same PersistentConnection problem that plagued the RabbitMQ installer for creating queues.

I'll provide a sample in the near future.

Support RMQ client 3.4

The 3.4.3 client is now used. Due to the team not following semver we're locking our NuGet dependency on the exact 3.4.3 version

Order of settings

Values provided by users in ConfigSection should take precedence over values provided with Fluent API, while Fluent API overrides default values set by ConfigSeaction.

ConfigSection defaults --> Fluent API --> CofnigSection user values

Endpoint stops processing messages and churns CPU when broker connections are closed

When broker connections is dropped the NServiceBus dequeuer gets into a tight loop causing high CPU. A restart of the endpoint resolves the situation until the next connection close.

Who's affected

  • User on version NServiceBus.RabbitMQ 2.0.0 and up

Symptoms

When the receive connection to the broker gets closed the endpoint will stop processing messages and start churn CPU.

Original report

Issue raised originally in the mailing list

Creation of exchanges should honor durability settings

Symptoms

The exchanges created in the broker is always set to persistent even if the endpoint has specified to use non durable messages.

Who's affected

All users specifying the endpoint to be non durable using config.DisableDurableMessages()

Race condition on shutdown

The following error can be raised when the bus is shutdown:

System.IO.EndOfStreamException was unhandled by user code
  HResult=-2147024858
  Message=SharedQueue closed
  Source=RabbitMQ.Client
  StackTrace:
       at RabbitMQ.Util.SharedQueue`1.EnsureIsOpen()
       at RabbitMQ.Util.SharedQueue`1.Dequeue(Int32 millisecondsTimeout, T& result)
       at NServiceBus.Transports.RabbitMQ.RabbitMqDequeueStrategy.DequeueMessage(QueueingBasicConsumer consumer) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqDequeueStrategy.cs:line 169
       at NServiceBus.Transports.RabbitMQ.RabbitMqDequeueStrategy.Action(Object obj) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqDequeueStrategy.cs:line 111
       at System.Threading.Tasks.Task.Execute()
  InnerException: 

It looks like the connection is closed before the dequeuers are stopped

Make CircuitBreaker intermittent failure delay configurable

The circuit breaker for receive issues can now be configured using

<appSettings>
  <add key="NServiceBus/RabbitMqDequeueStrategy/TimeToWaitBeforeTriggering" value="00:02:00"/>
  <add key="NServiceBus/RabbitMqDequeueStrategy/DelayAfterFailure" value="00:00:05"/>
</appSettings>

Documentation

By the default the RabbitMQ transport will trigger the on critical error action when it continuously fails to receive messages from the broker for 2 minutes. This can now be customized using config

Reply to message incoming via gateway

Symptoms

Request/response fails when doing request/response over the gateway. The following warning is logged

WARN  NServiceBus.Transports.RabbitMQ.RabbitMqTransportMessageExtensions Missmatching replyto address properties found, the native 'RabbitMq.Handler.gateway' will override the one found in the headers 'Msmq.Handler@[some_machine_name]'

Who's affected

All users running the Gateway and is doing reguest/response between sites

Original report

I have following test setup:
-Msmq.Handler endpoint uses MSMQ transport and has gateway feature enabled
-RabbitMq.Handler endpoint uses RabbitMq transport and has gateway feature enableda

Msmq.Handler sends message via gateway to RabbitMq.Handler. Message arrives on RabbitMq.Handler, can be handled, but raises following warning:

2014-12-04 16:04:03.412 WARN NServiceBus.Transports.RabbitMQ.RabbitMqTransportMessageExtensions Missmatching replyto address properties found, the native 'RabbitMq.Handler.gateway' will override the one found in the headers 'Msmq.Handler@[some_machine_name]'

When handling message RabbitMq.Handler tries to do Bus.Reply, with expectation of message going back via gateway to Msmq.Handler. Instead message arrives back on RabbitMq.Handler.

When setup of all involved endpoints is changed to use MSMQ transport this works as expected, Bus.Reply effects in message being transported over gateway back to original endpoint.

EDIT:
Versions of related libraries are as follows:
-NServiceBus v 5.1.2
-NServiceBus.Host v 6.0.0
-NServiceBus.RabbitMQ v 2.0.4
-RabbitMq.Client v 3.3.5
-NServiceBus.Gateway v 1.0.0

Exchange creation failing if local address overridden

Who's affected

All users overriding the local address of the endpoint

Symptoms

The endpoint fails when subscribing throwing the following exception:

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no exchange 'PubSub.Subscriber1.WhenPublishingAnEventWithOverriddenLocalAddress.RabbitMQ' in vhost '/'", classId=40, methodId=30, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply()
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_ExchangeBind(String destination, String source, String routingKey, Boolean nowait, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.ModelBase.ExchangeBind(String destination, String source, String routingKey)
   at NServiceBus.Transports.RabbitMQ.Routing.ConventionalRoutingTopology.SetupSubscription(IModel channel, Type type, String subscriberName) in c:\Projects\NServiceBus.RabbitMQ\src\NServiceBus.RabbitMQ\Routing\ConventionalRoutingTopology.cs:line 34
   at NServiceBus.Transports.RabbitMQ.RabbitMqSubscriptionManager.Subscribe(Type eventType, Address publisherAddress) in c:\Projects\NServiceBus.RabbitMQ\src\NServiceBus.RabbitMQ\RabbitMqSubscriptionManager.cs:line 18
   at NServiceBus.Unicast.UnicastBus.Subscribe(Type messageType, Predicate`1 condition) in y:\BuildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 390
   at NServiceBus.Unicast.UnicastBus.Subscribe(Type messageType) in y:\BuildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 343
   at NServiceBus.Unicast.UnicastBus.Subscribe[T]() in y:\BuildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 334

Return addresses with RabbitMQ sitting on separate box

I have a NSB setup using RabbitMQ transport, and I'm running Rabbit on separate server (will be RabbitMQ cluster in production).

It looks like that message return addresses are set incorrectly in this scenario.

  1. Service A sends a message to Service B
  2. Saga on Service B is started
  3. Service B does it's thing (long-running)
  4. Service B calls Saga.ReplyToOriginator

In this case the originator of the saga points at queue.NSBMachineA@NSBMachineA
but I think it should point at queue.NSBMachineA@RabbitMQMachine

Currently, the reply never arrives to ServiceA (it does, if I run Rabbit on the same box than NSB).

DefaultRoutingKeyConvention seems to use GetInterfaces.FirstOrDefault incorrectly

@andreasohlund i think there are some problems with this code

https://github.com/Particular/NServiceBus.RabbitMQ/blob/develop/src/NServiceBus.RabbitMQ/Routing/DefaultRoutingKeyConvention.cs#L21

        var interfaces = type.GetInterfaces()
            .Where(i => !IsSystemType(i) && !IsNServiceBusMarkerInterface(i)).ToList();

        var implementedInterface = interfaces.FirstOrDefault();

Note we are calling GetInterfaces then .FirstOrDefault(); on that result

The problem is what is returned from GetInterfaces is ordered based on the source

For example this

public class MyEvent:IMyInterface1,IMyInterface2
{
}

Will return IMyInterface1 as the first from GetInterfaces

While this

public class MyEvent:IMyInterface2,IMyInterface1
{
}

Will return IMyInterface2 as the first from GetInterfaces

So from the users perspective their functionality is the same but the routing key will be different.

Invalid attempt made to decrement the event's count below zero.

Using v1.1.3 and getting this error on RabbitMqDequeuStrategy, line 194.

I don't have a great case to reproduce, only that it's happening on a specific machine repeatedly.

2014-08-11 13:05:54 [Fatal] Repeated failures when communicating with the RabbitMq broker
System.InvalidOperationException: Invalid attempt made to decrement the event's count below zero.
   at System.Threading.CountdownEvent.Signal()
   at NServiceBus.Transports.RabbitMQ.RabbitMqDequeueStrategy.Action(Object obj) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqDequeueStrategy.cs:line 194
   at System.Threading.Tasks.Task.Execute()

When no command binding is added a NullReferenceException is thrown

When the user forgets to add the endpoint mappings for the commands a NullReferenceException is thrown

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <!-- Only add message mappings for commands that need to be sent. RabbitMQ supports centralized pub/sub so message mappings are not required for event subscriptions -->
      <add Assembly="Messages" Namespace="Messages.Commands" Endpoint="Earth" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

Leave out the endpont mapping above, also see VideoStore example.

System.NullReferenceException: Object reference not set to an instance of an object.
at NServiceBus.Transports.RabbitMQ.Routing.ConventionalRoutingTopology.Send(IModel channel, Address address, TransportMessage message, IBasicProperties properties) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\Routing\ConventionalRoutingTopology.cs:line 59
at NServiceBus.Transports.RabbitMQ.RabbitMqMessageSender.<>c__DisplayClass1.<Send>b__0(IModel channel) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqMessageSender.cs:line 13
at NServiceBus.Transports.RabbitMQ.RabbitMqUnitOfWork.ExecuteRabbitMqActions(IList`1 actions) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqUnitOfWork.cs:line 86
at NServiceBus.Transports.RabbitMQ.RabbitMqUnitOfWork.ExecuteActionsAgainstRabbitMq(Object sender, TransactionEventArgs transactionEventArgs) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqUnitOfWork.cs:line 69
at System.Transactions.InternalTransaction.FireCompletion()
at System.Transactions.TransactionStateCommitted.EnterState(InternalTransaction tx)
at System.Transactions.TransactionStateSPC.EnterState(InternalTransaction tx)
at System.Transactions.TransactionStateVolatilePhase1.EnterState(InternalTransaction tx)
at System.Transactions.TransactionStatePhase0.EnterState(InternalTransaction tx)
at System.Transactions.TransactionStateActive.BeginCommit(InternalTransaction tx, Boolean asyncCommit, AsyncCallback asyncCallback, Object asyncState)
at System.Transactions.CommittableTransaction.Commit()
at System.Transactions.TransactionScope.InternalDispose()
at System.Transactions.TransactionScope.Dispose()
at NServiceBus.Unicast.Transport.TransportReceiver.TryProcess(TransportMessage message) in y:\BuildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs:line 260
at NServiceBus.Transports.RabbitMQ.RabbitMqDequeueStrategy.Action(Object obj) in y:\BuildAgent\work\1ccdfa5b068fb66c\src\NServiceBus.RabbitMQ\RabbitMqDequeueStrategy.cs:line 138

Error and audit messages are stored as non durable

Currently the messages that being stored in the Audit and Error queues are not triggered as durable. Therefore these messages are lost when the RabbitMQ broker is restarted.

More Info

According to the docs:

that’s in flight inside Rabbit to survive a crash, the message must

  • Have its delivery mode option set to 2 (persistent)
  • Be published into a durable exchange
  • Arrive in a durable queue

The above is not true for messages audited. Discusses with @andreasohlund this could be a core issue. But I'll raise it here for further investigation.

Here are two screenshots

2014-02-10 10_41_52-rabbitmq management

Message in audit.

2014-02-10 10_42_49-rabbitmq management audit

I think it is a design decision to have audits non durable. I just want to make sure that this was the intention.

Videostore: use Group for correlation instead of ConnectionId

The videostore samples use signalr ConnectionId for correlation of messages, this works great locally, but if your connection is far away (as in a production environment), it drops quite easily and will reconnect with a different connectionId. We should use groups instead to keep track of all connections that belong to a user and reply to the group of connections

Add a join method to the OrdersHub that adds the user to a group

public void Join(string clientId)
{
Groups.Add(Context.ConnectionId, clientId);
}

Join the user on Hub start and on reconnect

$.connection.hub.reconnected(function () {
ordersHub.server.join($scope.clientId);
});

$.connection.hub.start(function () {
ordersHub.server.join($scope.clientId);
});

Pass the clientId from the client

ordersHub.server.placeOrder(selectedVideos, $scope.clientId)

And correlate back on the group

public void Handle(OrderPlaced message)
{
var context = GlobalHost.ConnectionManager.GetHubContext();

context.Clients.Group(message.ClientId).orderReceived(new
{
message.OrderNumber,
message.VideoIds
});
}

Invalid operation exception, when trying to auto subscribe to events and when message mapping is not present

Steps to Repro

  1. Have a pub/sub sample that uses RabbitMQ as transport
  2. Add the appropriate assembly redirects for the correct version of NServiceBus.Core and NServiceBus.Host
  3. Remove the EndpointMessageMappings in the unicast bus section for the subscriber.
  4. Notice the error upon startup:
2013-12-09 10:46:09,043 [23] ERROR NServiceBus.Unicast.UnicastBus [(null)] <(nul
l)> - System.InvalidOperationException: No destination could be found for messag
e type Messages.Events.IGotRadarStatistics. Check the <MessageEndpointMappings>
section of the configuration of this endpoint for an entry either for this speci
fic message type or for its assembly.
   at NServiceBus.Unicast.UnicastBus.Subscribe(Type messageType, Predicate`1 con
dition) in y:\BuildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\Unic
astBus.cs:line 377
   at NServiceBus.Unicast.UnicastBus.Subscribe(Type messageType) in y:\BuildAgen
t\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 328
   at NServiceBus.AutomaticSubscriptions.AutoSubscriber.Start() in y:\BuildAgent
\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\AutomaticSubscriptions\AutoSubscribe
r.cs:line 24
   at NServiceBus.Unicast.UnicastBus.<>c__DisplayClass1f.<Start>b__1d() in y:\Bu
ildAgent\work\31f8c64a6e8a2d7c\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 7
86 could not be started.

Workaround
Add the message mappings in app.config.

NOTE: This used to work in version 4.2

Introduce retries on sends

With the new pipelines in 4.3 (not exposed until 4.4) we can now very easily do retries on sends (and also fallback to alternate brokers) by just injecting a new behavior into the outgoing pipeline

@fhalim, @thirkcircus, @gsogol Ifaik one of you guys asked for this type of feature?

When using Rabbit.Client version 3.0.4, Host throws an InvalidOperationException

Steps to Repro:
1. Install-package NServiceBus.Host (that will get v4.1.1)
2. Install-package NServiceBus.RabbitMQ (that will use v1.0.5)
3. Add the below assembly redirect in app.config

 <runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="NServiceBus" publicKeyToken="9fc386479f8a226c" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-4.1.0.0" newVersion="4.1.0.0" />
      </dependentAssembly>
      <dependentAssembly>
        <assemblyIdentity name="NServiceBus.Core" publicKeyToken="9fc386479f8a226c" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-4.1.0.0" newVersion="4.1.0.0" />
      </dependentAssembly>
    </assemblyBinding>
  </runtime>
4. Use RabbitMQ as transport in EndpointConfig and see the exception below:

image

Workaround:
Use RabbitMQ.Client version 3.0.0

or

Add IWantCustomInitialization and initialize the default container as below and the host starts up fine. Configure.With() rescans the folder.

public class EndpointConfig : IConfigureThisEndpoint, AsA_Server, UsingTransport<RabbitMQ>, IWantCustomInitialization
    {
        public void Init()
        {
            Configure.With()
                .DefaultBuilder();
        }
    }

or

Add NServiceBus.Host.Exe.Config in your bin folder with the contents as below:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="RabbitMQ.Client" publicKeyToken="89e7d7c5feba84ce" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-3.0.4.0" newVersion="3.0.4.0" />
      </dependentAssembly>
    </assemblyBinding>
  </runtime>
</configuration>

Scale out on RabbitMQ causes duplicate deliveries to handlers

Raised by @fhalim
Migrated from by Particular/NServiceBus#2182

I’ve noticed that the NSB scale out feature defaults to true on RabbitMQ. This causes funky behavior with the fanout exchange being used in the conventional routing topology. For instance, if Message1 is subscribed to on machine1 and machine2, the routing will end up looking like

            +---------+                                                       
         +-->Message1 +--------------+                                        
         |  +---------+              |                                        
         |                           |                                        
         |                           |                                        
+-----------------+        +-------------------+                              
|Message1_machine1|        | Message1_machine2 |                              
+--------+--------+        +---------+---------+                              

Each message gets delived to both subscribers, which I doubt was the intention of the scale out feature. Seems that the scaleout feature should be disabled for transports that support completing consumers already.

ChannelAllocationException with RabbitMQ while publishing a lot of messages

We are using NServiceBus v5 with RabbitMQ Transport.
We have 6 services that run on the same machine/bus, using NServiceBus.Host.
One of them (InputService) process XML feeds and publish a lot of messages (about 1500) in a foreach loop.

Since the v5 update, this exception is raised in the publish loop of our InputService:

RabbitMQ.Client.Exceptions.ChannelAllocationException: The connection cannot support any more channels. Consider creating a new connection     
at RabbitMQ.Client.Impl.SessionManager.Create()     
at RabbitMQ.Client.Impl.ConnectionBase.CreateModel()     
at NServiceBus.Transports.RabbitMQ.ChannelProvider.GetNewPublishChannel() 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ChannelProvider.cs:line 35     
at NServiceBus.Transports.RabbitMQ.RabbitMqMessagePublisher.Publish(TransportMessage message, PublishOptions publishOptions) 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\RabbitMqMessagePublisher.cs:line 24     
at NServiceBus.DispatchMessageToTransportBehavior.InvokeNative(DeliveryOptions deliveryOptions, TransportMessage messageToSend) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 50     
at NServiceBus.DispatchMessageToTransportBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 28     
at NServiceBus.Transports.RabbitMQ.ForwardCallbackQueueHeaderBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ForwardCallbackQueueHeaderBehavior.cs:line 18     
at NServiceBus.SerializeMessagesBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\SerializeMessagesBehavior.cs:line 38     
at NServiceBus.CreatePhysicalMessageBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\CreatePhysicalMessageBehavior.cs:line 58     
at NServiceBus.PopulateAutoCorrelationHeadersForRepliesBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Sagas\PopulateAutoCorrelationHeadersForRepliesBehavior.

Then all other running services fail with this error indefinitely and cannot reconnect:
Failed to connect to Broker: 'localhost', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable'

The only way to get all these services working again is to stop then restart them all.

Posted in Google Groups:
https://groups.google.com/forum/#!topic/particularsoftware/5gKTYqq6qDc

Channel not disposed if WaitForConfirmsOrDie throws an exception

In ConfirmsAwareChannel class, the call to WaitForConfirmsOrDie can throw if a nack is received or the timeout elapses and this causes the channel not to be disposed.

Who's affected

  • Any user publishing/sending messages with publisher confirms enabled(default)

Symptoms

Causes RabbitMQ resources to stay active for longer then necessary.
After a long period all running services fail to connect to the Broker and the endpoint consumes all the CPU of the machine.

Exceptions seen

The following two exception, then all services fail to connect to the broker.

RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text="End of stream", classId=0, methodId=0, cause=System.IO.EndOfStreamException: Heartbeat missing with heartbeat == 5 seconds     
at RabbitMQ.Client.Impl.ModelBase.WaitForConfirms(TimeSpan timeout, Boolean& timedOut)     
at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie(TimeSpan timeout)     
at NServiceBus.Transports.RabbitMQ.ConfirmsAwareChannel.Dispose() 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ConfirmsAwareChannel.cs:line 48     
at NServiceBus.Transports.RabbitMQ.RabbitMqMessagePublisher.Publish(TransportMessage message, PublishOptions publishOptions) 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\RabbitMqMessagePublisher.cs:line 26     
at NServiceBus.DispatchMessageToTransportBehavior.InvokeNative(DeliveryOptions deliveryOptions, TransportMessage messageToSend) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 50     
at NServiceBus.DispatchMessageToTransportBehavior.Invoke(OutgoingContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 28     at NServiceBus.Transports.RabbitMQ.ForwardCallbackQueueHeaderBehavior.Invoke(OutgoingContext context, Action next) in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ForwardCallbackQueueHeaderBehavior.cs:line 18     at NServiceBus.SerializeMessagesBehavior.Invoke(OutgoingContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\SerializeMessagesBehavior.cs:line 38     at NServiceBus.CreatePhysicalMessageBehavior.Invoke(OutgoingContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\CreatePhysicalMessageBehavior.cs:line 58     at NServiceBus.PopulateAut

Exception: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text="Socket exception", classId=0, methodId=0, 
cause=System.Net.Sockets.SocketException (0x80004005): A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond     
at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)     
at RabbitMQ.Client.Impl.SocketFrameHandler_0_9.ReadFrame()     
at RabbitMQ.Client.Impl.ConnectionBase.MainLoopIteration()     
at RabbitMQ.Client.Impl.ConnectionBase.MainLoop()     
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply()     
at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_ChannelOpen(String outOfBand)     
at RabbitMQ.Client.Impl.ConnectionBase.CreateModel()     
at NServiceBus.Transports.RabbitMQ.ChannelProvider.GetNewPublishChannel() 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ChannelProvider.cs:line 35     
at NServiceBus.Transports.RabbitMQ.RabbitMqMessagePublisher.Publish(TransportMessage message, PublishOptions publishOptions) 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\RabbitMqMessagePublisher.cs:line 24     
at NServiceBus.DispatchMessageToTransportBehavior.InvokeNative(DeliveryOptions deliveryOptions, TransportMessage messageToSend) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 50     
at NServiceBus.DispatchMessageToTransportBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\DispatchMessageToTransportBehavior.cs:line 28     
at NServiceBus.Transports.RabbitMQ.ForwardCallbackQueueHeaderBehavior.Invoke(OutgoingContext context, Action next) 
in c:\BuildAgent\work\41ea81d808fdfd62\src\NServiceBus.RabbitMQ\ForwardCallbackQueueHeaderBehavior.cs:line 18 

Interop with Non-NServiceBus Endpoints

When a non-NSB endpoint ((e.g. Spring Integration/Java, etc) needs to publish a message to a NServiceBus endpoint, .Net engineers force non-NSB producers to add NSB-specific semantics:

  • producers have to make sure to properly add a .Net namespace Company.Package.SubPackage.
  • due to NSB's serialization, objects in the XML have to be in Camel Case instead of language's (java, python, whatever) own semantics.
  • message type had to be added to the message header with the exact same
    name as the Exchange. For example, Company.Package.SubPackage.Events.QuoteRequested is the XML
    object we send and the Exchange and had to be defined as the Message Type
    in the message header (see attached image)
  • content_type had to be set to text/xml. A language like Java is defaulting to
    octet-stream. Potentially, telling NServiceBus to treat X as though it's Y can solve an issue.
  • CorrelationID needs to be set to the same value as the MessageID in
    the message header. No reply is being expected. Java producers are confused why they need such a header. Again, potentially, telling NSB to treat some property as the CorrelationID would suffice. Producers can decide what they want to name things and we just need to tell NSB to treat some header as CorrelationID.

In summary, NServiceBus needs to be flexible when a consumer of messages. When used in a company that uses non-.Net technologies, NSB should be allowed to be easily bent to conform what producers dictate via conventions, not the other way around.

quoterequested

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.