Giter Site home page Giter Site logo

nimbusapi / nimbus Goto Github PK

View Code? Open in Web Editor NEW
113.0 25.0 84.0 74.69 MB

Nimbus is a .NET client library to provide an easy abstraction over common messaging frameworks.

Home Page: http://nimbusapi.com/

License: MIT License

C# 99.00% PowerShell 0.68% Dockerfile 0.10% Shell 0.23%

nimbus's Introduction

Nimbus

Nimbus is a .NET client library to provide an easy abstraction over common messaging frameworks.

Developing using Nimbus

For more information go to The Nimbus website or our Documentation Wiki

Developing Nimbus itself

git clone <this repository>
docker-compose up -d
dotnet test

Development infrastructure

There are two docker-compose files. The docker-compose.yml file will spin up:

The integration tests are configured to run via the appsettings.json file within the build pipeline using standard Docker single-token service names. Locally, test configuration is overridden via the appsettings.Development.json file that points all of the services to the ports on localhost exposed by Docker.

nimbus's People

Contributors

aaronpowell avatar damianmac avatar dependabot[bot] avatar droyad avatar fallin avatar gertjvr avatar gitter-badger avatar jeremycade avatar jfbosch avatar lukeschafer avatar ma499 avatar michaelnoonan avatar nblumhardt avatar nhuhuynh avatar patrick-ellume avatar stefansedich avatar tathamoddie avatar teyc avatar uglybugger avatar zaccharles avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nimbus's Issues

Outgoing and Incoming Message Filters

Is filter the best term? It's synonymous with the MVC filters, so probably...

e.g.

IFilterOutgoingMessages
IFilterOutgoingMessagesOf<T>
IFilterIncomingMessages
IFilterIncomingMessagesOf<T>

Possible definitions

public interface IFilterOutgoingMessagesOf<T> {
    T FilterOut(T message);
    BrokeredMessage FilterOut(BrokeredMessage brokeredMessage, T originalMessage);
}
public interface IFilterIncomingMessagesOf<T> {
    T FilterIn(BrokeredMessage brokeredMessage, T originalMessage);
}

Registering
Would normally just register the components in the container, but it may be useful to have something like:

new BusBuilder().Configure()
    .WithOutgoingMessageFilter<DataAnnotationValidator>() // DataAnnotationValidator : IFilterOutgoingMessages
    .WithOutgoingMessageFilter<DataBusMutator>() // DataBusMutator : IFilterOutgoingMessages, IFilterIncomingMessages
    .WithIncomingMessageFilter<DataBusMutator>() // DataBusMutator : IFilterOutgoingMessages, IFilterIncomingMessages
    .WithOutgoingMessageFilter<

The implementations can return original input (modified or whatever) or return new instances. They have the ability to throw exception and cancel the incoming/outgoing message. Would be transient per message (to enable state if we require peeking at multiple stages of the message)

Example usages:

Outgoing filter to run validation on the message using validation attributes (built-in?)
Outgoing filter to detect 'DataBus' and apply changes to the message - this might be better if it can also modify the brokeredmessage
Incoming filter to detect 'DataBus' and apply changes to the message - this might be better if it can also read the brokeredmessage

Build order was broken but this wasn't picked up by Team City

I think it makes sense for TC to do a git clean before building. I pulled the latest source code and a clean build failed due to the build order: Nimbus.IntegrationTests built before the Autofac and Windsor IntegrationTest libraries.

I've committed a fix for the project build ordering in 2c1b9f1, but TC should have picked this up earlier in my opinion.

Serialization of interfaces

Does Nimbus support that? I've had to put lots ot DataContract/DataMember and KnownType all over my messages to get it working...

Can I swap the default serializer?

I mean things like:

    public class TimelineNotification : IBusCommand
    {
        public ITimelineEventData Data { get; set; }
    }

Log Level for failing messages

Consider logging dispatch exceptions as Warnings until the message will be dead-lettered at which point escalate it to an Error. Couple of thoughts about why:

  1. Transient failures will generally be solved by retries - improving the signal-to-noise ratio
  2. Persistent failures (like poison messages) would be logged as a single error at the time when it is detected as poisonous - again improving signal-to-noise ratio

Nimbus Local (Same Machine) Messaging

I've been looking through the Nimbus source to identify the ease with which local messaging can be implemented. I don't want to move around too much infrastructural cheese, but in terms of scalability I think it's a great feature.

At the moment would I be right in saying the message flow (one-way) is roughly as follows (sorry about the crummy text-diagrams):
[Bus] -> Execute operation to -> [Sender (eg. ICommandSender)] -> Get queue (IMessageSender) from -> [MessagingFactory] -> Create queue if necessary in -> [QueueManager] -> Send message to queue <- Gets messages <- [Message Pump] -> Sends to -> [Dispatcher] -> Creates instance of -> [Handler] -> Handles

So in short:
Outgoing:
[Bus] -> [Sender] -> [MessagingFactory] -> Queue
Incoming:
Queue -> [MessagePump] -> [Dispatcher] -> [Handler]

My early thoughts are to wrap an F# Mailbox Processor in the IMessageSender and IMessageReceiver interfaces and for each Azure Queue that is created, create an equivalent local queue.

When the MessagingFactory passes the queue to the sender it will first try to find a local queue. If it does, and the Bus has opted in to local messaging, return it over the Azure queue. If not, return the Azure queue. It would look like this:

Outgoing:
[Bus] -> [Sender] -> [MessagingFactory] -> Local or External? -> Queue
Incoming (Local):
Queue -> [Dispatcher] -> [Handler]
Incoming (External):
Azure Queue -> [Message Pump] -> Local Queue -> [Dispatcher] -> [Handler]

In the message pump, route all Azure Queue traffic through the equivalent local queue which will then call the dispatcher, rather than calling the dispatcher through the pump. This way, if you have multiple services running on the same machine you can optimise bandwidth and performance by messaging via the F# queues instead of Azure, but also receive and handle traffic from the Azure queues in exactly the same way.

I think the important things to note would be: local messaging is less resilient, so it would be opt in to return local queues. Whether or not the Bus opts in, still build the local queues and route MessagePump traffic through them anyway because this makes no difference in that case.

Does anyone have anything to add or potential caveats they can see? I'm playing around with the concept locally in a fork. If I implement this without breaking existing funcitonality would it be pulled into the main branch or is it outside the scope of Nimbus?

Incoming messages does not have parallelism.

The SBWS/ASB MessagePump only allows concurrent handling of messages AFTER .Complete (and friends) is called. Until the message is 'complete' it will only call 1 handler at a time. This can be worked around by using a custom message pump instead of the in-built one. This issue is not obvious when processing commands quickly (more-or-less in-proc), but if a handler has to make an external api/resource call, it hold up many messages.

You can see an example of this in the following gist:
https://gist.github.com/lukeschafer/9797555

Obviously sub in your queue name and connection string. Toggle 'useCustomMessagePump' to see the difference.

Delayed Abandon

Delayed abandons would be useful for retry backoffs. E.g. instead of

await message.AbandonAsync(exception.ExceptionDetailsAsProperties(_clock.UtcNow));

you can

If (_settings.DelayedAbandon.HasValue) Thread.Sleep(_settings.DelayedAbandon.GetValueOrDefault());
await message.AbandonAsync(exception.ExceptionDetailsAsProperties(_clock.UtcNow));

It's safe (durable) to wait in-proc like this as a) it's already on its own thread, and b) if the process dies the peeklock will simply expire and the message will become available again

Optional:

Exponential backoffs - when abandoning a message, use DeliveryCount, or add the last-waited/abandon count to the message properties and read this when backing off.

Happy to implement everything above if you agree.

Remove unnecessary finalizers

You shouldn't declare finalizers unless you have to. You should only declare a finalizer if you have unmanaged resources to dispose. The CLR deals with finalizable objects differently, even if SuppressFinalize is called. More info on msdn

public class Bus : IBus, IDisposable
{
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    // No need in finalizer
    ~Bus()
    {
        Dispose(false);
    }

    protected virtual void Dispose(bool disposing)
    {
        // Because no unmanaged resources here
        if (!disposing) return;
        //...
    }
}

Take advantage of serilog's structured logging

It would be nice if, when using the serilog sink, it destructured things like the message objects. I had a bit of a play with trying it in the SerilogLogger or within Serilog, however it didn't seem right.

I best solution I can think of is some way of specifying a different message format for serilog than for the console logger:

  1. Using a key that then looks it up out of a dictionary of formats depending on the logger
  2. In the console logger, reformatting the serilog style format into a string.format format:
    regex replace "{@?[^}]}"

NuGet package dependencies don't require corresponding versions when built together

Nimbus.MessageContracts depends on Nimbus. Both are built as part of the same solution and packed via nuget.exe as part of the same build pass.

Looks like the call to nuget.exe pack doesn't set package version dependencies, so Nimbus itself only has a dependency on Nimbus.MessageContracts v0.0.1 no matter what version the Nimbus package is.

Exceptions on Dispose causing long shutdown times

We've been experiencing an intermittent issue with shutting down a Windows Service that uses Nimbus - it can take up to 10 minutes to shutdown. I finally reproduced this locally and have some exception stack traces (see end of post).

Inspecting the call stack of the threads that are still running I can see the following lines of code are blocked waiting for other tasks to complete:

Line 121 of Bus.cs: Task.WaitAll(messagePumpStopTasks);
Line 87 of NimbusMessageReceiver in the FetchBatchInternal method: await Task.Delay(100);

After letting it run for about 30-60 seconds I started seeing these exceptions being thrown:

2014-04-15 13:15:23.694 +10:00 [Error] Worker exception in "NimbusQueueMessageReceiver" for "q.nextgen.messaging.contracts.commands.updateeventsettingsbuscommand"
System.AggregateException: One or more errors occurred. ---> Microsoft.ServiceBus.Messaging.MessagingCommunicationException: Error during communication with Service Bus. Check the connection information, then retry. ---> System.ServiceModel.CommunicationObjectFaultedException: Internal Server Error: The server did not provide a meaningful reply; this might be caused by a premature session shutdown..TrackingId:ce8706ce-c459-41a8-b6e5-5398f85055d9, Timestamp:15/04/2014 3:15:19 AM

Server stack trace: 


Exception rethrown at [0]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.DuplexCorrelationAsyncResult.End(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [1]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.RequestAsyncResult.<>c__DisplayClass17.<GetAsyncSteps>b__a(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [2]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [3]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageReceiver.EndReceiveCommand(IAsyncResult result, IEnumerable`1& messages)
   --- End of inner exception stack trace ---
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.RetryReceiveAsyncResult.TryReceiveEnd(IAsyncResult r, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndTryReceive(IAsyncResult result, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndReceiveBatch(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Nimbus.Infrastructure.MessageSendersAndReceivers.NimbusQueueMessageReceiver.<FetchBatch>d__3.MoveNext() in c:\readify\Centium\NextGen\Nimbus\src\Nimbus\Infrastructure\MessageSendersAndReceivers\NimbusQueueMessageReceiver.cs:line 36
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
   at System.Threading.Tasks.Task`1.get_Result()
   at Nimbus.Infrastructure.MessageSendersAndReceivers.NimbusMessageReceiver.<FetchBatchInternal>d__17.MoveNext() in c:\readify\Centium\NextGen\Nimbus\src\Nimbus\Infrastructure\MessageSendersAndReceivers\NimbusMessageReceiver.cs:line 93
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Nimbus.Infrastructure.MessageSendersAndReceivers.NimbusMessageReceiver.<Worker>d__d.MoveNext() in c:\readify\Centium\NextGen\Nimbus\src\Nimbus\Infrastructure\MessageSendersAndReceivers\NimbusMessageReceiver.cs:line 48
---> (Inner Exception #0) Microsoft.ServiceBus.Messaging.MessagingCommunicationException: Error during communication with Service Bus. Check the connection information, then retry. ---> System.ServiceModel.CommunicationObjectFaultedException: Internal Server Error: The server did not provide a meaningful reply; this might be caused by a premature session shutdown..TrackingId:ce8706ce-c459-41a8-b6e5-5398f85055d9, Timestamp:15/04/2014 3:15:19 AM

Server stack trace: 


Exception rethrown at [0]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.DuplexCorrelationAsyncResult.End(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [1]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.RequestAsyncResult.<>c__DisplayClass17.<GetAsyncSteps>b__a(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [2]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [3]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageReceiver.EndReceiveCommand(IAsyncResult result, IEnumerable`1& messages)
   --- End of inner exception stack trace ---
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.RetryReceiveAsyncResult.TryReceiveEnd(IAsyncResult r, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndTryReceive(IAsyncResult result, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndReceiveBatch(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Nimbus.Infrastructure.MessageSendersAndReceivers.NimbusQueueMessageReceiver.<FetchBatch>d__3.MoveNext() in c:\readify\Centium\NextGen\Nimbus\src\Nimbus\Infrastructure\MessageSendersAndReceivers\NimbusQueueMessageReceiver.cs:line 36<---

Queue naming collision possibility

It would be possible to have a naming collision with queues.

Azure queues are lowercased, but it would be possible to have two message types with the same name and different casing. An edge case, but something we should test for at scan time.

Perhaps a dictionary of known queue names and if we try to spin it up twice we go bang .

NuGet packages appear to be published even when a test run is unsuccessful

I re-enabled integration tests and pushed some changes that broke the test suite but not compilation. The build (1.0.0.129) was marked as failed but there's still a package on nuget.org for that build number.

I've checked and all the TC build steps are set to only execute if the previous step was successful. Odd.

Infinite loop in BatchingMessageSender when SendBatch(toSend) fails

We've experienced several times where a server will go into an endless loop until it crashes with an OutOfMemoryException (about once per day over the last week). I've finally been able to reproduce the issue and I have a theory about the cause.

We've got 600MB of the following log message:

2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)
2014-05-08 07:36:59.537 +00:00 [Debug] Flushing outbound message queue "q.mymessage" (1 messages)

Looking at the code in https://github.com/NimbusAPI/Nimbus/blob/master/src/Nimbus/Infrastructure/MessageSendersAndReceivers/BatchingMessageSender.cs#L75

If the underlying MessageSender throws immediately, say it gets disposed, the BatchingMessageSender will go into an endless loop and never recover because the MessageSender isn't recycled.

A couple of thoughts:

  1. It sounds like this is part of a larger issue on recycling messaging clients
  2. Is the RetryPolicy.Default (exponential backoff) in the MessageSender not sufficient?

Option to use different object serialisers for BrokeredMessage, which is currently the binary XML serialiser.

Option to use different object serialisers for BrokeredMessage, which is currently the binary XML serialiser.

Currently when a new instance of BrokeredMessage is created, the binary XML serialiser is used (default option). I would like the option to specify a custom serialiser (that implements XmlObjectSerializer) when I build the BusBuilderConfiguration in my application setup.

This option would be useful when using non .NET clients and when using Service Bus Explorer to view messages.

I've listed two possible implementations below that could be used.

Example 1
Implementation

BusBuilderConfiguration WithSerializer<T>(this BusBuilderConfiguration configuration) where T : XmlObjectSerializer;

Usage

.WithSerializer<DataContractSerializer>()

Example 2
Implementation

BusBuilderConfiguration WithSerializer(this BusBuilderConfiguration configuration, Type serializerType)

Usage

.WithSerializer(typeof(DataContractSerializer))

Allow long-running handlers to keep locks on their BrokeredMessage instances

When we pop a message from a queue we're given a lock on that message for a short period of time. The idea is that if our handler hangs or crashes without calling .Complete or .Abandon then the message will be returned to the queue so that someone else can handle it.

There's a trade-off between 1) detecting handlers that have crashed and giving someone else a chance at handling the message and 2) allowing long-running handlers to do their job.

I'm thinking about either of:

  • an optional interface on IHandleXXX<..> along the lines of IAmALongRunningTask that would allow the dispatcher to ask the handler whether it was still alive.
  • simply attaching a watcher to any handler task to check if it's still alive.

Any thoughts? @DamianMac? @michaelnoonan?

Is Period the best parts separator for Queue/Topic/Subscription Paths?

The default parts separator for creating the Path to service bus entities in Nimbus is '.' (a period) which is fine in most situations though it can be problemmatic in URLs.

A specific example is that by default IIS will treat a request ending with something that looks like a file as a request for a static resource and returns 404 when it can't find the file on the file system. You can work around this by adding a trailing '/' to the URL or by specifically adding an exception in web.config.

I'm just wondering if period is the most appropriate parts separator?

Perhaps Nimbus could offer some customisable conventions?

AzureQueueManager does not update descriptions if they have changed

The EnsureXXXExists() commands on AzureQueueManager only checks if the queue/topic/subscription exists, however if the description changes after creation of the queue/topic/subscription, nimbus will not update the descriptions.

An appropriate place to check for correctness and update if required may be in the WarmUp method, after task0 and task1 have completed.

MessageRecievers going loopy with CommunicationObjectFaultedExceptions

We've noticed that after running for a period of time the MessageReceivers can end up going into a CPU-expensive loop requiring a recycle of the application to recover.

I was able to repro, by sheer accident, on my local machine and we noticed that after pausing the console app for a minute while taking the stack trace, the app seemed to recover, but I'm not quite sure what state the app was in afterwards.

Microsoft.ServiceBus.Messaging.MessagingCommunicationException: Error during communication with Service Bus. Check the connection information, then retry. ---> System.ServiceModel.CommunicationObjectFaultedException: Internal Server Error: The server did not provide a meaningful reply; this might be caused by a premature session shutdown..TrackingId:2894fb95-7070-4e81-975f-fb71e7eaa5de, Timestamp:22/04/2014 12:46:37 AM

Server stack trace: 


Exception rethrown at [0]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.DuplexCorrelationAsyncResult.End(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [1]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.RequestAsyncResult.<>c__DisplayClass17.<GetAsyncSteps>b__a(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [2]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)

Exception rethrown at [3]: 
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageReceiver.EndReceiveCommand(IAsyncResult result, IEnumerable`1& messages)
   --- End of inner exception stack trace ---
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Common.AsyncResult`1.End(IAsyncResult asyncResult)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.RetryReceiveAsyncResult.TryReceiveEnd(IAsyncResult r, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndTryReceive(IAsyncResult result, IEnumerable`1& messages)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndReceiveBatch(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.SubscriptionClient.EndReceiveBatch(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Nimbus.Infrastructure.MessageSendersAndReceivers.NimbusSubscriptionMessageReceiver.<FetchBatch>d__3.MoveNext() in c:\\projects\\nextgenrelational\\Nimbus\\src\\Nimbus\\Infrastructure\\MessageSendersAndReceivers\\NimbusSubscriptionMessageReceiver.cs:line 42<---

Allow disconnected/offline startup

No, not via magic pixie dust or the NSA's QUANTUM gadgetry; merely that we should at least allow applications to launch even when they can't connect to a bus endpoint.

It might be nice to allow commands and events to be queued for later transmission but that's a separate issue - I don't really have an opinion yet as to whether that's a responsibility for the bus or for the app. Either way, it's not what this issue is about.

What do you want to see in a Wiki ?

OK folks

One of my little holiday tasks is going to be putting some documentation together about the concepts and usage of Nimbus.

Open to any ideas or suggestions as to what this might look like. What would you like to see ? Or if there examples of projects "doing it right" that would be cool too.

Warn me if my handler is async

public class MyHandler : IHandleCommand<MyMessage>
{
   public async void Handle(MyMessage busCommand)
   {
      throw new InvalidOperationException("HA HA HA, you can't catch me!");
   }
}

This message will be viewed as handled, and therefore not retried/put in error queue.

Perhaps Nimbus should enforce (throw upfront) when I have async implementation of the Handle method?

It's easy to miss (we did) and can cause a lot of pain.

Create an auditing event that allows the entire message stream to be consumed

We're going to need this for auditing, at least, but more likely we'll want it for an "Ops Dash" style feature before that.

I'm thinking just something along the lines of:

public class TheAuditors: IHandleMulticastEvent<AuditEvent>
{
    ...
}

where the AuditEvent looks something like this:

public class AuditEvent: IBusEvent
{
    public BrokeredMessage AuditedMessage { get; set; }
}

Thoughts?

Subscription names cannot be longer than 50 characters

7e90b59 introduced a change to subscription naming and we happen to have a lot of Request message names that cause the Subscription Path to break the 50 character limit imposed by Service Bus.

One example is: nextgen.appserver.getrecordsmodifiedsincerequesthandler

public static string SubscriptionNameFor(string applicationName, string instanceName, Type handlerType)
{
     return Sanitize(string.Join(".", new[] {applicationName, instanceName, handlerType.Name}));
}

The offending code in the Microsoft.ServiceBus assembly in NamespaceManager (v2.1.3.0):

NamespaceManager.CheckValidEntityName(description.TopicPath, 260, "description.TopicPath");
NamespaceManager.CheckValidEntityName(description.Name, 50, false, "description.Name");

This issue was raised here also Particular/NServiceBus.Azure#19

Add a .WithDefaultBroker(...) extension method to BusBuilderConfigurationExtensions

Our sample code currently looks like this for non-container installations:

var typeProvider = new AssemblyScanningTypeProvider(Assembly.GetExecutingAssembly());

var messageBroker = new DefaultMessageBroker(typeProvider);

var bus = new BusBuilder().Configure()
                        .WithNames("MyTestSuite", Environment.MachineName)
                        .WithConnectionString(CommonResources.ConnectionString)
                        .WithTypesFrom(typeProvider)
                        .WithCommandBroker(messageBroker)
                        .WithRequestBroker(messageBroker)
                        .WithMulticastEventBroker(messageBroker)
                        .WithCompetingEventBroker(messageBroker)
                        .WithMulticastRequestBroker(messageBroker)
                        .WithDefaultTimeout(TimeSpan.FromSeconds(10))
                        .Build();

It would be nice if it could look like this instead:

var typeProvider = new AssemblyScanningTypeProvider(Assembly.GetExecutingAssembly());

var messageBroker = new DefaultMessageBroker(typeProvider);

var bus = new BusBuilder().Configure()
                        .WithNames("MyTestSuite", Environment.MachineName)
                        .WithConnectionString(CommonResources.ConnectionString)
                        .WithTypesFrom(typeProvider)
                        .WithDefaultBroker(messageBroker)
                        .WithDefaultTimeout(TimeSpan.FromSeconds(10))
                        .Build();

Per-handler ability to specify maximum concurrent handler instances

Some handlers need to have a set maximum number of instances. We've seen this in the wild with spinning up a pool of PhantomJS instances, for instance, as well as where third-party APIs limit incoming connections.

We need a nice way to specify how many concurrently-executing instances of a handler of a particular type should permit, as well as a sensible default.

Should Nimbus have a "Handler Filter" feature?

Working with Nimbus on a project I encountered the situation where I needed to update a database from within a handler. In this solution the front-end web project used ASP.Net MVC's Action Filter feature to create and maintain an ambient unit of work. However I don't have this option for my handlers and need to manage it myself within the "Handle" method. As I encountered this I thought this might be a useful feature for Nimbus to have.

My initial thoughts are that I'd want some method of registering a class which Nimbus could resolve within the lifetime scope that it will use to resolve and call the handler. This class will have an interface something like this.

public interface IHandlerFilter
{
    public void OnPreHandle();
    public void OnPostHandle();
}

This interface is obviously very basic and requires some fleshing out. I plan to do some prototype work but wanted to make sure the idea was something you guys were interested in and get some input on possible issues and direction.

Should Nimbus use the "Async" postfix for public async methods like Send?

The fact these methods are Task-returning makes it fairly obvious, just a thought.

I personally forgot to await a call to Bus.Send() and didn't catch the exception that occurred due to an assembly registration error... Seeing something like SendAsync without an await might provide an early warning sign that something's amiss.

Outbound queue collection thread safety

Possibly a heisenbug but I got an InvalidOperationException in BatchingMessageSender at the end of FlushMessages() where it checks _outboundQueue.Any() (here), with ye olde Collection was modified; enumeration operation may not execute. for more information.

The Any() call is outside of a lock, would this the culprit?

Running up again with the same scenario worked, so I can't easily repro this.

Do not abstract/hijack the IoC container registration

One friction point encountered with frameworks that abstract registration into the container, like Nimbus does right now, is that this often makes it really hard to deal with some more advanced scenarios like customise how a particular handler is registered, or test the registration (a convention test).

Are there any plans to explore alternative ways of integrating with a container, to make it more flexible?

Additional diagnostic properties on dispatch failure

When a dispatch failure occurs the Exception Type, Message and StackTrace are recorded as message properties. Add timestamp, failed path, executing identity and machine name to help diagnose dispatch failures.

Improved dead-letter support

Rather than everyone rolling their own dead-letter support, I believe it makes sense to support several dead-letter features as first-class citizens. Some examples would be:

  1. Creating a dead-letter message pump pipeline that could potentially be used to move dead-letters to a central queue (making detection and management easier), move expired dead-letters to a central queue or ignore them completely, or just leave them where they are. Could be a potential extensibility point.
  2. Adding enough diagnostic data to messages for choosing the appropriate compensation action for any dead-letters
  3. First-class support methods for reliably Destroying and Replaying dead-letters

PathFactory can generate invalid Queue or Topic names in certain circumstances

I setup a very basic script in LINQPad to try Nimbus out, but got the following error response from Azure Service Bus when I tried to run it:

The remote server returned an error: (400) Bad Request. SubCode=40000. 'https://myappname.servicebus.windows.net/Q.UserQuery+TestCommand/?api-version=2013-10' contains character(s) that is not allowed by Service Bus. Entity segments can contain only letters, numbers, periods (.), hyphens (-), and underscores (_)..TrackingId:60e8d3be-f322-4da9-adb3-ebc120d858b9_G8,TimeStamp:12/11/2013 12:44:48 AM

I don't really know much of how .NET names types internally, but this page seems to indicate that a "+" is used for nested classes: http://msdn.microsoft.com/en-us/library/w3f99sx1(v=vs.110).aspx

Michael

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.