Giter Site home page Giter Site logo

zarusz / slimmessagebus Goto Github PK

View Code? Open in Web Editor NEW
455.0 16.0 75.0 21.13 MB

Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.

License: Apache License 2.0

C# 99.44% PowerShell 0.55% Shell 0.01%
message-bus messaging azure-event-hubs pub-sub request-response c-sharp azure-service-bus redis apache-kafka kafka bus azure avro rabbitmq ddd mqtt dotnet

slimmessagebus's Introduction

SlimMessageBus

SlimMessageBus is a client façade for message brokers for .NET. It comes with implementations for specific brokers (RabbitMQ, Kafka, Azure EventHub, MQTT, Redis Pub/Sub) and in-memory message passing (in-process communication). SlimMessageBus additionally provides request-response implementation over message queues.

Gitter GitHub license Build Maintainability Rating Coverage Duplicated Lines (%) Vulnerabilities Quality Gate Status

The v2 release is available (see migration guide). The v3 release is under construction.

Key elements of SlimMessageBus

  • Consumers:
    • IConsumer<in TMessage> - subscriber in pub/sub (or queue consumer)
    • IRequestHandler<in TRequest, TResponse> & IRequestHandler<in TRequest> - request handler in request-response
  • Producers:
    • IPublishBus - publisher in pub/sub (or queue producer)
    • IRequestResponseBus - sender in req/resp
    • IMessageBus - extends IPublishBus and IRequestResponseBus
  • Misc:
    • IRequest<out TResponse> & IRequest - a marker for request messages
    • MessageBus - static accessor for current context IMessageBus

Docs

Packages

Name Description NuGet
SlimMessageBus The core API for SlimMessageBus NuGet
Transport providers
.Host.AzureEventHub Transport provider for Azure Event Hubs NuGet
.Host.AzureServiceBus Transport provider for Azure Service Bus NuGet
.Host.Kafka Transport provider for Apache Kafka NuGet
.Host.Memory Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) NuGet
.Host.MQTT Transport provider for MQTT NuGet
.Host.RabbitMQ Transport provider for RabbitMQ NuGet
.Host.Redis Transport provider for Redis NuGet
.Host.Sql (pending) Transport provider implementation for SQL database message passing NuGet
Serialization
.Host.Serialization.Json Serialization plugin for JSON (Newtonsoft.Json library) NuGet
.Host.Serialization.SystemTextJson Serialization plugin for JSON (System.Text.Json library) NuGet
.Host.Serialization.Avro Serialization plugin for Avro (Apache.Avro library) NuGet
.Host.Serialization.Hybrid Plugin that delegates serialization to other serializers based on message type NuGet
.Host.Serialization.GoogleProtobuf Serialization plugin for Google Protobuf NuGet
Plugins
.Host.AspNetCore Integration for ASP.NET Core NuGet
.Host.Interceptor Core interface for interceptors NuGet
.Host.FluentValidation Validation for messages based on FluentValidation NuGet
.Host.Outbox.Sql Transactional Outbox using SQL NuGet
.Host.Outbox.DbContext Transactional Outbox using EF DbContext NuGet
.Host.AsyncApi AsyncAPI specification generation via Saunter NuGet

Typically the application layers (domain model, business logic) only need to depend on SlimMessageBus which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (SlimMessageBus.Host.*) which are the messaging transport providers and additional plugins.

Samples

Basic usage

Some service (or domain layer) publishes a message:

IMessageBus bus; // injected

await bus.Publish(new SomeMessage());

Another service (or application layer) handles the message:

public class SomeMessageConsumer : IConsumer<SomeMessage>
{
   public async Task OnHandle(SomeMessage message)
   {
       // handle the message
   }
}

Note: It is also possible to avoid having to implement the interface IConsumer<T> (see here).

The bus also supports request-response implemented via queues, topics or in-memory - depending on the chosen transport provider. The sender side sends a request message:

var response = await bus.Send(new SomeRequest());

Note: It is possible to configure the bus to timeout a request when the response does not arrive within the allotted time (see here).

The receiving side handles the request and replies:

public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
   public async Task<SomeResponse> OnHandle(SomeRequest request)
   {
      // handle the request message and return a response
      return new SomeResponse { /* ... */ };
   }
}

The bus will ask the DI container to provide the consumer instances (SomeMessageConsumer, SomeRequestHandler).

There is also support for one-way request-response.

Configuration

The Microsoft.Extensions.DependencyInjection is used to compose the bus:

// IServiceCollection services;

services.AddSlimMessageBus(mbb =>
{
   mbb
      // First child bus - in this example Kafka transport
      .AddChildBus("Bus1", (builder) =>
      {
         builder
            .Produce<SomeMessage>(x => x.DefaultTopic("some-topic"))
            .Consume<SomeMessage>(x => x.Topic("some-topic")
               //.WithConsumer<SomeMessageConsumer>() // Optional: can be skipped as IConsumer<SomeMessage> will be resolved from DI
               //.KafkaGroup("some-kafka-consumer-group") // Kafka: Consumer Group
               //.SubscriptionName("some-azure-sb-topic-subscription") // Azure ServiceBus: Subscription Name
            );
            // ...
            // Use Kafka transport provider (requires SlimMessageBus.Host.Kafka package)
            .WithProviderKafka(cfg => { cfg.BrokerList = "localhost:9092"; }); // requires SlimMessageBus.Host.Kafka package
            // Use Azure Service Bus transport provider
            //.WithProviderServiceBus(cfg => { ... }) // requires SlimMessageBus.Host.AzureServiceBus package
            // Use Azure Event Hub transport provider
            //.WithProviderEventHub(cfg => { ... }) // requires SlimMessageBus.Host.AzureEventHub package
            // Use Redis transport provider
            //.WithProviderRedis(cfg => { ... }) // requires SlimMessageBus.Host.Redis package
            // Use RabbitMQ transport provider
            //.WithProviderRabbitMQ(cfg => { ... }) // requires SlimMessageBus.Host.RabbitMQ package
            // Use in-memory transport provider
            //.WithProviderMemory(cfg => { ... }) // requires SlimMessageBus.Host.Memory package
      })

      // Add other bus transports (as child bus), if needed
      //.AddChildBus("Bus2", (builder) => {  })

      // Scan assembly for consumers, handlers, interceptors, and register into MSDI
      .AddServicesFromAssemblyContaining<SomeMessageConsumer>()
      //.AddServicesFromAssembly(Assembly.GetExecutingAssembly());

      // Add JSON serializer
      .AddJsonSerializer(); // requires SlimMessageBus.Host.Serialization.Json or SlimMessageBus.Host.Serialization.SystemTextJson package
});

The configuration can be modularized.

Use Case: Domain Events (in-process pub/sub messaging)

This example shows how SlimMessageBus and SlimMessageBus.Host.Memory can be used to implement the Domain Events pattern. The provider passes messages in the same process (no external message broker is required).

The domain event is a simple POCO:

// domain event
public record OrderSubmittedEvent(Order Order, DateTime Timestamp);

The domain event handler implements the IConsumer<T> interface:

// domain event handler
public class OrderSubmittedHandler : IConsumer<OrderSubmittedEvent>
{
   public Task OnHandle(OrderSubmittedEvent e)
   {
      // ...
   }
}

The domain event handler (consumer) is obtained from the MSDI at the time of event publication. The event publish enlists in the ongoing scope (web request scope, external message scope of the ongoing message).

In the domain model layer, the domain event gets raised:

// aggregate root
public class Order
{
   public Customer Customer { get; }
   public OrderState State { get; private set; }

   private IList<OrderLine> lines = new List<OrderLine>();
   public IEnumerable<OrderLine> Lines => lines.AsEnumerable();

   public Order(Customer customer)
   {
      Customer = customer;
      State = OrderState.New;
   }

   public OrderLine Add(string productId, int quantity) { }

   public Task Submit()
   {
      State = OrderState.Submitted;

      // Raise domain event
      return MessageBus.Current.Publish(new OrderSubmittedEvent(this));
   }
}

Sample logic executed by the client of the domain model:

var john = new Customer("John", "Whick");

var order = new Order(john);
order.Add("id_machine_gun", 2);
order.Add("id_grenade", 4);

await order.Submit(); // events fired here

Notice the static MessageBus.Current property is configured to resolve a scoped IMessageBus instance (web request-scoped or pick-up message scope from a currently processed message).

The SlimMessageBus configuration for the in-memory provider looks like this:

//IServiceCollection services;

// Configure the message bus
services.AddSlimMessageBus(mbb =>
{
   mbb.WithProviderMemory();
   // Find types that implement IConsumer<T> and IRequestHandler<T, R> and declare producers and consumers on the mbb
   mbb.AutoDeclareFrom(Assembly.GetExecutingAssembly());
   // Scan assembly for consumers, handlers, interceptors, and register into MSDI
   mbb.AddServicesFromAssemblyContaining<OrderSubmittedHandler>();
});

For the ASP.NET project, set up the MessageBus.Current helper (if you want to use it, and pick up the current web-request scope):

services.AddSlimMessageBus(mbb =>
{
   // ...
   mbb.AddAspNet(); // requires SlimMessageBus.Host.AspNetCore package
});
services.AddHttpContextAccessor(); // This is required by the SlimMessageBus.Host.AspNetCore plugin

See the complete sample for ASP.NET Core where the handler and bus are web-request scoped.

Use Case: MediatR replacement

The SlimMessageBus in-memory provider can replace the need to use MediatR library:

  • It has similar semantics and has the interceptor pipeline enabling the addition of custom behavior.
  • The generic interceptors can introduce common behavior like logging, authorization or audit of messages.
  • The FluentValidation plugin can introduce request/command/query validation.
  • The external communication can be layered on top of SlimMessageBus which allows having one library for in-memory and out-of-process messaging (Hybrid Provider).

See the CQRS and FluentValidation samples.

Use Case: Request-response over Kafka topics

See sample.

Features

  • Types of messaging patterns supported:
    • Publish-subscribe
    • Request-response
    • Queues
    • A hybrid of the above (e.g. Kafka with multiple topic consumers in one group)
  • Modern async/await syntax and TPL
  • Fluent configuration
  • SourceLink support
  • Because SlimMessageBus is a facade, chosen messaging transports can be swapped without impacting the overall application architecture.

Principles

  • The core of SlimMessageBus is "slim"
    • Simple, common and friendly API to work with messaging systems
    • No external dependencies.
    • The core interface can be used in the domain model (e.g. Domain Events)
  • Plugin architecture:
    • Message serialization (JSON, Avro, Protobuf)
    • Use your favorite messaging broker as a provider by simply pulling a NuGet package
    • Add transactional outbox pattern or message validation
  • No threads created (pure TPL)
  • Async/Await support
  • Fluent configuration
  • Logging is done via Microsoft.Extensions.Logging.Abstractions so that you can connect to your favorite logger provider.

License

Apache License 2.0

Build

cd src
dotnet build
dotnet pack --output ../dist

NuGet packages end up in dist folder

Testing

To run tests you need to update the secrets.txt to match your cloud infrastructure or local infrastructure. SMB has some message brokers set up on Azure for integration tests (secrets not shared).

Run all tests:

dotnet test

Run all tests except integration tests that require local/cloud infrastructure:

dotnet test --filter Category!=Integration

Credits

Thanks to Gravity9 for providing an Azure subscription that allows running the integration test infrastructure.

Gravity9

Thanks to the following service cloud providers for providing free instances for our integration tests:

If you want to help and sponsor, please write to me.

slimmessagebus's People

Contributors

achehre avatar baoduy avatar dependabot[bot] avatar diseks avatar etherza avatar jeffdoolittle avatar nover avatar spragalas avatar zarusz avatar ziurek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

slimmessagebus's Issues

Missing MaxAutoRenewDuration on Azure Service Bus implementation

Hi @zarusz

I'm looking for a way to change the default MaxAutoRenewDuration parameter. On Azure ServiceBus message is visible again after 5 minutes if the user doesn't mark it as completed manually.
We have a case that processing on a single message could take up to 2 hours. Right now by handle that passing MaxAutoRenewDuration parameter to MessageHandlerOptions.

Unfortunately, BaseConsumer does not set this method.

Is there any method that we can use to achieve the same result?
Or maybe can we made a pull request with changes (but probably you will have to tell us how do you see that).

[Host.Memory] Publishing messages into memory transport are blocking until execution is done

Hi, thank you for providing this nice to start and easy to use message bus library. I searched a while for a message bus with quite exactly these features and simpleness. For one of my projects, I plan to start with a simple self-contained application running in a single process, where all parts are connected through an in-memory message bus. But I want to have the ability to scale out to multiple processes/servers without having to rewrite the whole application. This plan looks to be easily made by using your library.

I have a question regarding the pub/sub behavior of the MemoryMessageBus. For my understanding the bub/sub messaging using the Publish-method should work like a queue where I put things in and stop worrying how long the execution takes. On the other hand, the request-response messaging using the Send-method with the await keyword should block the execution of my calling method until the response has arrived.
If I see it correctly, this behavior is supported by all available transport implementations but not by the MemoryMessageBus. The MemoryMessageBus awaits the execution of the work even if I use the Publish-method. I did not see a note about it in the docs but there is an example in the DomainEvents sample projects. The OrderSubmittedHandler returns a delay of one second which blocks the Post-method in the OrdersController for that second before it can respond to the HTTP request.

Is this the intentional behavior? And what is the reason for it? Maybe I simply have a wrong understanding about the intended behavior of general pub/sub messaging in the message bus.

Cheers from Germany

Request response is not working with Kafka as transport

When I set kafka as provider Request Response is not working.

I am using Sample.Simple.ConsoleApp

  1. I changed provider as Kafka in var provider = Provider.Kafka; in CreateMessageBus function
  2. I am using local on premise Kafka Broker
  3. As I am not using SSL in kafka so I had commented addssl function for producerconfig and conumerconfig
  4. When I run the program I can see Request and response topic for Multiply in Kafka but getting timeout after this statement
    var response = await bus.Send(new MultiplyRequest { Left = a, Right = b });

[Question] Is multi-topic (Kafka) consumer supported?

Hi, congrats on the abstraction library. I have recently come across it, and there is an aspect of it I wanted to check with you guys.

One of Kafka's best practices is to reuse a consumer by subscribing it to multiple topics. Is such set up supported? I see most Topic methods take only a String (e.g., ConsumerBuilder.Topic(string topic)) and not a list of strings, but perhaps it is supported some other way. I recognize beforehand that I haven't dug too deep in the implementation classes still.

Thanks in advance.
Regards,
JP

Hybrid bus provider (route messages to different transports)

I've been thinking on building an hybrid bus implementation where you would be able to compose multiple bus implementations (e.g. in-memory, azure service bus) to have part of the bus messages route via one provider (in-memory), and others messages via another provider (Azure Service Bus). That opens up a lot of possibilities.

Concurrent message consumers not working

Hi, we can't seem to get messages to be consumed concurrently in our .NET Core 3.1 web app. Here is the code:

MessageBusBuilder

public class MyMessageBusBuilder
{
    private const int NUM_CONSUMER_INSTANCES = 10;

    public IMessageBus Build(IServiceProvider provider)
    {
        return MessageBusBuilder.Create()
            .PerMessageScopeEnabled(true)
            .Produce<ClientRegisteredMessage>(x => x
                .DefaultTopic("test topic"))
            .Consume<ClientRegisteredMessage>(x => x
                .Topic("test topic")
                .WithConsumer<ClientRegisteredMessageConsumer>()
                .Instances(NUM_CONSUMER_INSTANCES))
            .WithSerializer(new JsonMessageSerializer())
            .WithDependencyResolver(new LookupDependencyResolver(provider.GetRequiredService))
            .WithProviderRedis(new RedisMessageBusSettings("redis connection string"))
            .Build();
    }
}

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    var messageBusBuilder = provider.GetRequiredService<MyMessageBusBuilder>();

    services
        .AddSingleton<IMessageBus>(messageBusBuilder.Build)
        .AddTransient<ClientRegisteredMessageConsumer>();
}

public void Configure(IApplicationBuilder app)
{
    // Force the singleton SMB instance to be created on app start 
    app.ApplicationServices.GetRequiredService<IMessageBus>();
}

At startup, after the "Creating consumers" log entry we see only one "Creating consumer X for topic Y and message type Z" message logged. We are seeing only one message processed at a time.

Have looked through the docs and samples including https://zarusz.github.io/SlimMessageBus/docs/intro.html#concurrently-processed-messages, not sure if we are missing something or if there is a bug.

Reflective registration of producers/consumers in samples seem off

I'm trying to use the .Do(...) reflective registration of producers and consumers as outlined in the samples repository of the WebApi: https://github.com/zarusz/SlimMessageBus/blob/master/src/Samples/Sample.DomainEvents.WebApi/Startup.cs#L98

However, when I use this registration method for my one sample producer/consumer, the following is logged:

[20:01:35 DBG] Found a base type of Namespace.XyzEvent that is configured in the bus: System.Object

Seems to me like the sample is missing the following piece of code down in the .ForEach(find => ..) method:

  builder.Produce(find.EventType,
                            x => x.DefaultTopic(x.Settings.MessageType.Name));

I'll happily contribute an PR fixing it if you agree :)

What is the full IMessageBus lifecyle?

So I read all the available documents (all the Readme.md files), and I would like to get clarification on the lifecyle of the IMessageBus.

Let's take the Domain Events sample as an example. In a typical web application, incoming request made -> create an event -> publish the event -> some handler receives it and process it -> happy day.

Now, the sample uses MessageBus.Current, as it mentions, it depends on the DI setting (scoped/singleton etc.).

The question is: if it's scoped (as the sample sets), will all the messages (from different sessions) be delivered and consumed?

And for the event consumer, I have a scenario that it uses in-memory transport, and each instance of the consumer only processes certain messages (same type).

Maybe I should use "topic" for this?

[Host.Memory] DI Message scope should be joined if a scope has already been started by an outer bus

In the scenario that there is a hybrid bus composed of Memory bus for domain events and Azure Service Bus for out of process events, when there is an consumption of external ASB message that already created a scope and during the message processing the memory bus is used to send some domain events and later handle, the memory bus ends up creating another nested child scope.

This might not be desired and instead of creating a new child scope, the memory bus should join the scope started by the azure service bus message consumption.

Hotspot:
https://github.com/zarusz/SlimMessageBus/blob/master/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs#L123

Make IRequestMessage optional

Please consider to make IRequestMessage interface optional. It will allow to define Infrastructure message bus adapter without specific message bus dependencies on application level. I believe Send method could be extended by another generic parameter TResponse, which would indicate what is expected response.

Azure Service Bus questions

Hi, would like to ask for few things:

  1. How to configure it to use with situation where different types of events are sent to the same topic and I would like to have one subscription. Reason is to have order of messages around one aggregate. Is this possible?

  2. Is it possible to configure serialization based on some header from message? Like I don't want to use type.Name because in case someone change type name everything will fail. And as we know people are often changing names during refactor. I prefer to have some const with type ma to which deserialize it. What you think?

  3. There is some idea to support Azure Service Bus sessions? Now it fails.

At the end very, very good code. I don't remember when last time I have seen such high quality repository!

Thumb up! For you!

Add support for key in consumer builder

HI, Is possible to add support for a key in the consumer builder?
I am talking about this line:

ConsumerBuilderFactory = (config) => new ConsumerBuilder<Ignore, byte[]>(config);

Key is helpful when someone want to separate messages by type in one topic, and let consumer listen only on own typed partition, or if someone need to implement specific consumers per partition key, this could be also consumer per saga.
For example i can define partition key in producer, to be based on message namespace :

public static MessageBusBuilder AddProducer<TMessage>(
            this MessageBusBuilder builder, string defaultProduceTopic)
        {

            builder.Produce<TMessage>(x =>
            {
                x.DefaultTopic(defaultProduceTopic);
                x.KeyProvider((request, topic) => Encoding.ASCII.GetBytes(typeof(TMessage).Name));
            });

            return builder;
        }

But key sellector on consumer side is missing. Would be nice to extend cconsumer configuration to be able to specify partition key as byte array, or delegate + key deserializer for other types.
I want to implement something like on this image: https://docs.datastax.com/en/kafka/doc/kafka/images/partitionsKafka.png

IConsumer as a message loop (Consumer without the IConsumer<T> interface)

Hi,

Currently IConsumer is used as an interface, and message received through OnHandle function.

In scenarios like a infinite loop, it makes more sense to register a handler like IReceiverClient from Microsoft.Azure.ServiceBus.Core which process incoming messages in a local function, which can fit into the scanario I mentioned.

receiverClient.RegisterMessageHandler<FooMessageType>((message, token) => 
{
    //process the message here
});

[Host.AzureServiceBus] Ability to auto generate topology (queue/topic/subscription) in Azure Service Bus

There should be an option (or another plugin) added to Host.AzureServiceBus plugin that would generate a topic (with the relevant subscruption) or queue if it does not exist yet.

The information about the topology could be read from the queue / topic / subscriptions registered within the SMB (on the producer and consumer side).

Ideally some additional options should be made available that would affect the topic/queue attributes (partitioning enabled, duplicate detection, etc).

It should be an opt-in feature.

MessageType filtering

I'm implementing SlimMessageBus with EventHubs and am targeting a single EventHubs instance (topic) for all messages. Using pub/sub so my registrations more or less look something like:

MessageBusBuilder.Create()
    .Consume<FooBar>(config => config
        .Topic(configuration["SharedTopic"])
        .Group(configuration["ServiceConsumerGroup"])
        .WithConsumer<FooBarConsumer>())

I'm noticing that each of my consumer groups picks up messages for all message types...ie. the message processing does not filter on message type. Since the messages are not filtered by type, I'm wondering if your intention was each message would get its own topic.

NET 5 Support - Event Hubs

Are there any plans to support .NET 5 applications?

I have an application that I am upgrading from a netcoreapp3.1 to net5 and it seems like the EventHub code no longer works after I make the upgrade.

I get the following error when trying to publish a message:

Operation is not valid due to the current state of the object.

at Microsoft.Azure.Amqp.Transport.TransportStream.Flush()
at System.IO.Stream.<>c.b__39_0(Object state)
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.<>c.<.cctor>b__277_0(Object obj)
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at System.Net.Security.SslStream.d__1711.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Microsoft.Azure.Amqp.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult) at Microsoft.Azure.Amqp.StreamExtensions.EndAuthenticateAsClient(SslStream sslStream, IAsyncResult asyncResult) at Microsoft.Azure.Amqp.Transport.TlsTransport.HandleOpenComplete(IAsyncResult result, Boolean syncComplete) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception) at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.HandleTransportOpened(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.OnTransportOpened(IAsyncResult result) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.EventHubs.Amqp.AmqpEventHubClient.d__32.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.d__6.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.Amqp.Singleton1.<GetOrCreateAsync>d__13.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at Microsoft.Azure.Amqp.Singleton1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.<CreateLinkAsync>d__12.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.<OnCreateAsync>d__6.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at Microsoft.Azure.Amqp.Singleton1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.Amqp.Singleton`1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.d__10.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.d__10.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.EventHubs.EventHubClient.d__25.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at SlimMessageBus.Host.AzureEventHub.EventHubMessageBus.d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at SlimMessageBusTest.Program.d__4.MoveNext() in
\SlimMessageBusTest\Program.cs:line 88

[Host.AzureEventHub] Support partition keys for producers

Azure Event Hub supports a partition key for messages. We need to support assigning partition keys during publishing as we do for other transport providers (Kafka / ASB).

mbb
	.Produce<CustomerUpdated>(x => 
	{
		x.DefaultTopic("topic1");
		// Message key could be set for the message
		x.KeyProvider((message) => message.CustomerId.ToString());
	})

Support for passing message types in the message headers (for supported transport providers)

It would be beneficial if the SMB transport providers send the message type (e.g. the full name of the NET type) in the native headers of a message during message publish. That way:

  • we could avoid having to rely on the serializer add additional message type discriminator (e.g. $type for Newtonsoft.Json)
  • allow to introduce message filtering as discussed in #32
  • allow to multiplex multiple message types (not part of the same class hierarchy) via one channel.

Respectively, that header could be used on the respective consumer transport implementation to achieve the above.

This would be only applicable for transports that support message headers (Azure Service Bus, Kafka, Event Hubs, etc).

Support for configuring polymorphic messages (inheritance)

Consider the following message types and inheritance:

public abstract class BaseMessage
{
  public DateTime Created { get; set; }
}

public class CustomerEvent : BaseMessage
{
   public Guid CustomerId { get; set; }
}

public class CustomerCreatedEvent: CustomerEvent { }
public class CustomerChangedEvent: CustomerEvent { }

All of these messages are serialized by Newtonsoft.Json, and thus can be serialized/deserialized into the same topic. The Json serializer is able to infer the proper message type using the $type property.

Now, using SMB I can send these messages:

await bus.Publish(new CustomerCreatedEvent { CustomerId = Guid.NewGuid() });
await bus.Publish(new CustomerChangedEvent { CustomerId = Guid.NewGuid() });

However, I need to configure each message separately:

   MessageBusBuilder mbb = ...;
   mbb
      .Produce<CustomerEvent>(x => x.DefaultTopic("customer-events"))
      .Produce<CustomerCreatedEvent>(x => x.DefaultTopic("customer-events"))
      .Produce<CustomerChangedEvent>(x => x.DefaultTopic("customer-events"))

It would be better, if we could just configure the CustomerEvent and have any subclass of that message follow the same config, eg:

   mbb
      // will apply to CustomerCreatedEvent and CustomerChangedEvent
      .Produce<CustomerEvent>(x => x.DefaultTopic("customer-events")) 

Support for scoped Consumers

In some cases the consumer (IConsumer<T>) needs to inject scoped dependencies (like the EF DbContext) hence needs to be registered as scoped in the DI container. As a result, the SMB should create a scope and resolve such consumers in that scoped container. The scope should be bound to each message which represents a transaction or a unit of work.

Also related to #34 .

Can't use DbContext in message consumer

I'm following the approach in the WebApi sample for our web application. My message consumer needs a DbContext (or a class which needs a DbContext), which must be scoped to avoid concurrency errors. However, since the IMessageBus is a singleton, created when the application starts, it cannot have a scoped service as a dependency. How can the message consumer use a scoped dependency, such as a DbContext?

Exception swallowed for Publish-ed messages

On pub/sub, if a handler throws or anything else happens, the last error message in logs is:
SlimMessageBus.Host.Memory.MemoryMessageBus: Waiting on 1 consumer tasks

I believe some info about what went wrong might be expected.

[Host.ApachePulsar] Contributing provider Apache Pulsar

Hi,

I would like to contribute to the project by building a provider for Apache Pulsar.

I would like to hear what you think. I saw in the CONTRIBUTING.md file that we need a discussion about the high-level design.

I am new to open source projects. If you can guide me a little, I would appreciate it. Thank you so much

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.