Giter Site home page Giter Site logo

Comments (13)

cajuncoding avatar cajuncoding commented on June 25, 2024

Hey John... Thanks again for the Coffee! I truly appreciate it!

Great question, it's quite easy . . . the OutboxTableConfig class actually implements the interface ISqlTransactionalOutboxTableConfig which can be injected as an optional dependency when constructing the DefaultSqlServerOutboxRepository (or it's base class SqlServerOutboxRepository if you are customizing any other elements)...

Though the default OutboxTableConfig is readonly (on purpose as a good habit), all you have to do is implement your own class with the interface and pass it in, and you can inherit from the default OutboxTableConfig so that you don't have to implement all properties.... so here's an example:

public class CustomSchemaNameOutboxTableConfig : OutboxTableConfig, ISqlTransactionalOutboxTableConfig
{
	public CustomSchemaNameOutboxTableConfig(string customOutboxSchemaName)
	{
		TransactionalOutboxSchemaName = customOutboxSchemaName;
	}

	public new string TransactionalOutboxSchemaName { get; }
}

Then you can specify it as an optional dependency when you construct the DefaultSqlServerOutboxRepository:

var customOutboxTableConfig = new CustomSchemaOutboxTableConfig("sqloutbox");

var sqlTransaction = (SqlTransaction)await sqlConnection.BeginTransactionAsync().ConfigureAwait(false);

var sqlOutboxRepository = new DefaultSqlServerOutboxRepository<string>(
	sqlTransaction,
	outboxTableConfig: customOutboxTableConfig //<== Inject your Custom Configuration!
);

/// . . . Carry on with the Outbox processing . . .

from sqltransactionaloutbox.

cajuncoding avatar cajuncoding commented on June 25, 2024

I also just realized that the properties of OutboxTableConfig should be virtual to be a little cleaner, so I've updated that to be included in a future incremental update.

But that doesn't affect the above solution which should work just fine since the interface is key and there is no other polymorphic access to the base class.

To validate and illustrate, I added a Unit Test which illustrates the above example in the project:

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Thank you @cajuncoding (Brandon).

I am implementing a background task within a microservice that pushes domain events to the service bus from the TransactionalOutBoxQueue at a defined interval using the code below.

It appears that I will have to create my own SQLServerOutboxRepository as there appears no other means to inject the ISqlTransactionalOutboxTableConfig using the OTB classes.

  protected virtual async Task ExecuteSqlTransactionalOutboxProcessingInternalAsync()
        {
            try{
          
                  //************************************************************
                  //*** Execute processing of the Transactional Outbox...
                  //************************************************************
                  await using var sqlConnection = new Microsoft.Data.SqlClient.SqlConnection(SqlConnectionString);
                  await sqlConnection.OpenAsync().ConfigureAwait(false);
                  await sqlConnection.ProcessPendingOutboxItemsAsync(OutboxPublisher, SqlTransactionalOutboxProcessingOptions).ConfigureAwait(false);
       
                  //************************************************************
                  //*** If specified then Execute Cleanup of Historical Outbox Data...
                  //************************************************************
                  if (OutboxHistoryToKeepTimeSpan > TimeSpan.Zero)
                  {
                       
                      await sqlConnection.CleanupHistoricalOutboxItemsAsync(OutboxHistoryToKeepTimeSpan).ConfigureAwait(false);
                  }
            }
            catch (Exception e)
            {
                _logger.LogError($"  ERROR => {e.GetMessagesRecursively()}");
                throw e;
            }
        }

Once again I want to thank you for providing this solution. I do have one other question that I am concerned about, which I am hoping you can help me to better understand if I will have an issue or not.

As mentioned, each microservice will have a background processor task, who's primary role is to process domain events from the `TransactionalOutBoxQueue table and where, each microservice will own it's own TransactionalOutBoxQueue table. I need to ensure that if there are multiple instances of a microservice and as such, competing consumers to those events, that the background task processor(s) will read and process each block of events to the Azure service bus in the order (and at most one) that they were added to the event table. It seems like that this will work using your library, but I just wanted to be absolutely certain it will.

Thanks for your support!
John

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

I am really confused as to how I would integrate with your library. I fully understand that I need to create a custom outbox config however what class am I placing the code below into?

var customOutboxTableConfig = new CustomSchemaOutboxTableConfig("sqloutbox");

var sqlTransaction = (SqlTransaction)await sqlConnection.BeginTransactionAsync().ConfigureAwait(false);

var sqlOutboxRepository = new DefaultSqlServerOutboxRepository<string>(
	sqlTransaction,
	outboxTableConfig: customOutboxTableConfig //<== Inject your Custom Configuration!
);

... such that I can easily call through the SqlClientOutboxProcessingCustomExtensions. ProcessPendingOutboxItemsAsync method from within my background task.

Can you please help me with the classes I need to implement.

from sqltransactionaloutbox.

cajuncoding avatar cajuncoding commented on June 25, 2024

Regarding customizing the OutboxTableConfig and using the Sql Connection/Transaction custom extensions.... yeah I see your point now. The custom extensions are very helpful and handle a bunch of logistics for transaction, rollback, etc. And ideally you wouldn't have to copy/paste that 🤔

Ok, I've got an idea or two.... but let me think on it and I'll take a closer look tomorrow.

Ideally I think it'd be good offer an easy to use way to change the OutboxTableConfig... in an unobtrusive way (globally) so that all Custom Extensions will use the values... Without having to add optional parameters to clutter up the custom extensions...

from sqltransactionaloutbox.

cajuncoding avatar cajuncoding commented on June 25, 2024

Regarding your more involved question above:

As mentioned, each microservice will have a background processor task, who's primary role is to process domain events from the `TransactionalOutBoxQueue table and where, each microservice will own it's own TransactionalOutBoxQueue table. I need to ensure that if there are multiple instances of a microservice and as such, competing consumers to those events, that the background task processor(s) will read and process each block of events to the Azure service bus in the order (and at most one) that they were added to the event table. It seems like that this will work using your library, but I just wanted to be absolutely certain it will.

Yes I'm pretty sure that the library will accomplish what you are looking to do... but just to make sure I understand the nuances of the question here are some thoughts off the top of my head...

  • Yes, by design many instances of the Outbox library can attempt to run at the same time -- as it was intentionally developed to work reliably in Azure Functions which could have dozens or hundreds of instances spin up and attempt ot process the outbox. It does this by either:
    • A) letting them all grab a batch and process, knowing that some messages could be sent multiple times (but guaranteeing at-least-once delivery). This will provide maximum throughput but does not guarantee order, or that a given message might not be processed multiple times due to race condition before SQL updates the record.
    • B) Using a distributed Mutex Lock (via Sql Server database lock) so that one and only one instance can process the outbox at any givent time. All other instances will attemp to acquire a lock and skip processing for that attempt if they are unable to get a lock... this is controlled by the Options that you can pass in OutboxProcessingOptions.FifoEnforcedPublishingEnabled = true
      • It sounds like you def. want to use option (B)... which is also the primary mode I use also.
      • The default time that a single instance will wait to acquire a lock is 1 second by default... but that is also a parameter that is customizeable . . . but not from the Custom Extensions, so this would also be included in what mentioned in my comment above 👆 .
  • You also called out "(and at most once)" in your question... in general that is very likely to hold true, but it cannot be guaranteed. This is an implication of the Outbox pattern in that it only guarantees 'at-least-once' delivery... though it will rarely if ever duplicate.
    • This is because once the event if fired off on an external message bus, then it will attempt to clear it in the Outbox Queue, but there is always a residual risk that a network glitch, Cloud DB outate, etc. could cause that followup update to fail. This means that the outbox item is left in a pending state and will be picked up again . . . eventually when the outage or network hiccup is no longer an issue. But, the item was never lost, and was safely left pending, so it will be processed again . . . proving out the guarantee of 'at-least-once' delivery (which is a fundamental element of the Outbox pattern).
      • Now in reality the likelihood of this will be closely related to your infrastructure, and other details, but in most cases will none-the-less be rare (if ever) in reality.

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Hi and thank you @cajuncoding for your detailed response, most awesome!

From review of your code and stepping through of your console sample I was thinking it worked as described above, for which I am super happy to learn that it works that way.

To provide a bit more perspective as to who we are and what we are doing, my company is NextWare Group and we are a very small tech-start-up.

We have developed an in-house Domain modeling tool that is a VS Code extension that we use to capture Business Domains for some up-coming product ideas we are working on.

We are following the key principals of Domain Driven Design, in which we utilize Event Storming to capture any business domain and then use our Domain Tooling to graphically capture each related Bounded Context per the domain, including each related Aggregate schema(s), API Command(s) and Domain Event(s) in a drag-n-drop diagram editor as seen below.

We have a separate CLI tool that utilizes MS Roslyn to code-generate each domain model into the actual microservices.

A domain model contains 1 or more Bounded context(s) which code-generates to a ASP.Net Web API microservice and each Bounded Context has 1 or more Domain Aggregates. We are deploying to Azure Container Apps.

The domain model screen shot below is from our ERP platform we are currently working on. Here, I am showing just two aggregates (JournalConfiguration and JournalBatch) within the GeneralLedgerServices microservice.

On the left side in the VS Code Explorer you will see the code that is generated by the CLI tooling from this model. Currently we have 20 domain models in our ERP domain, consisting of 40+ microservices and 500+ aggregates.

image

Below is a test model and some code-generated ChangeFeedBackGroundService.cs task, we will use to process MS SQL domain events using your SQLTransactionalOutbox pattern.

image

Our tooling code-generates either to MS SQL (EF Core), or MS CosmosDB.

With CosmosDB we utilize the built-in change-feed mechanism to publish the domain events to the service bus as well as project to MongoDB, via a code-generated background task processor.

We generate REST, gRPC and GraphQL API integrations for each microservice (configurable).

We have an integrated Business Rules Engine (Code Effects), as such we rarely need to write any code, although we can if we need some special business logic not able to be captured via Business Rules.

I would love to hook up and provide you a demo of this if you have time, and then we can chat more about what we are doing with your library.

Please let me know if you have time to join a Zoom call, I can be reached via LinkedIn.

Cheers
John Kears
CTO NextWare Group LLC.

from sqltransactionaloutbox.

cajuncoding avatar cajuncoding commented on June 25, 2024

John,

That's alot of details to process 🤯😁

I've just merged in the PR to provide full configuration at initialization... added some details to the README for how to use it during app startup.... it's very straightforward bootstrapping/initialization feature now added. All unit tests are passing, so it's pushed to an updated Nuget Package v1.0.1 now 👍

For your case -- if all you need to change is the Schema name -- then you'll just add this to your Startup (application root):

    //This is the global SqlTransactionalOutbox initializer that allows configuring custom settings to be used...
    //NOTE: Not all values need to be specified, any values that are not specified (e.g. or are set to null)
    //      will retain the default values.
    SqlTransactionalOutboxInitializer.Configure(config =>
    {
        config.WithOutboxTableConfig(new OutboxTableConfig(
            transactionalOutboxSchemaName: "YourCustomSchemaName"
        ));
    });

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Hi @cajuncoding (Brandon), that worked, however I am had to make one change, which I am certain I shouldn't have to do.

I changed the following line in the base initializers within the DefaultSqlServerOutboxRepository....

outboxTableConfig: outboxTableConfig ?? SqlTransactionalOutboxDefaults.OutboxTableConfig,

namespace SqlTransactionalOutbox.SqlServer.MicrosoftDataNS
{
    public class DefaultSqlServerOutboxRepository<TPayload> : SqlServerOutboxRepository<Guid, TPayload>
    {
        public DefaultSqlServerOutboxRepository(
            SqlTransaction sqlTransaction,
            ISqlTransactionalOutboxTableConfig outboxTableConfig = null,
            ISqlTransactionalOutboxItemFactory<Guid, TPayload> outboxItemFactory = null,
            int? distributedMutexAcquisitionTimeoutSeconds = null
        ) 
        : base (
            sqlTransaction: sqlTransaction,
            outboxTableConfig: outboxTableConfig ?? SqlTransactionalOutboxDefaults.OutboxTableConfig,
            outboxItemFactory: outboxItemFactory ?? new DefaultOutboxItemFactory<TPayload>(),
            distributedMutexAcquisitionTimeoutSeconds
        )
        {
            //Preconfigured defaults are used in base constructor above...
        }
    }
}

Is that the correct way to bring the new configuration in, or is there a proper way that I need to follow?

from sqltransactionaloutbox.

cajuncoding avatar cajuncoding commented on June 25, 2024

@jkears Yes, good catch, that is a bug that I missed in the DefaultSqlServerOutboxRepository().... I'm pushing a fix to Nuget now 👍

v1.0.2 should now resolve this as expected ✅

PR: #5

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Awesome! I will try now.

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Perfection! Many, many thanks!

Here is a quick test model that a created to test this. The UpdateAgg1Name command takes in a string and applies that to the name property of the the Aggregate.

image

GraphQL of that aggregate prior to the change...

image

Make a change...

image

This will generate the Agg1NameHasChanged domain event which is pushed into the TransactionalOutBoxQueue table for that service, which is subsequently pushed out by a background task running in the same service to Azure Service Bus (session based)...

image

Which arrives nicely as a topic in ASB ...

image

And the changes is applied to the aggregate...

image

from sqltransactionaloutbox.

jkears avatar jkears commented on June 25, 2024

Thank you so very much (@cajuncoding) for both this wonderful community contribution as well as helping me out. We will continue testing against real domain models and will let you know if we find any issues.

Cheers
John

from sqltransactionaloutbox.

Related Issues (3)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.