Giter Site home page Giter Site logo

blehnen / dotnetworkqueue Goto Github PK

View Code? Open in Web Editor NEW
89.0 8.0 14.0 7.94 MB

A work queue for dot.net with SQL server, SQLite, Redis and PostGreSQL backends

License: Other

C# 99.54% Smalltalk 0.46%
dotnet dotnet-core workqueue redis-queue job-scheduler

dotnetworkqueue's Introduction

DotNetWorkQueue

License LGPLv2.1 Build status Coverity status codecov

A producer / distributed consumer library for dot net applications. Dot net 4.6.2, 4.7.2, 4.8, 6.0, 8.0 and Dot net standard 2.0 are supported

High level features

  • Queue / De-queue POCO for distributed processing
  • Queue / Process LINQ statements; compiled or dyanamic (expressed as string)
  • Re-occurring job scheduler

See the Wiki for more indepth documention

Installation

Base

Transports

Metrics

Differences between versions

Dot net standard 2.0 / 6.0 / 8.0 are missing the following features from the full framework versions

  • No support for aborting threads when stopping the consumer queues
  • No support for dynamic linq statements

Usage - POCO

[Producer - Sql server]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLServer/SQLServerProducer/Program.cs

[Producer - SQLite]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQLiteProducer/Program.cs

[Producer - Redis]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/Redis/RedisProducer/Program.cs

[Producer - PostgreSQL]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/PostgreSQL/PostgreSQLProducer/Program.cs

[Producer - LiteDb]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/LiteDb/LiteDbProducer/Program.cs

[Consumer - Sql server]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLServer/SQLServerConsumer/Program.cs

[Consumer - SQLite]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQLiteConsumer/Program.cs

[Consumer - Redis]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/Redis/RedisConsumer/Program.cs

[Consumer - PostgreSQL]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/PostgreSQL/PostGreSQLConsumer/Program.cs

[Consumer - LiteDb]

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/LiteDb/LiteDbConsumer/Program.cs

Usage - Linq Expression

You can choose to send Linq expressions to be executed instead. This has some advantages, as the producers and consumers are generic; they no longer need to be message specific. The below examples are not transport specifc and assume that any queue creation steps have already been performed.

NOTE: It's possbile for a producer to queue up work that a consumer cannot process. In order for a consumer to execute the Linq statement, all types must be resolvable. For dynamic statements, it's also possible to queue up work that doesn't compile due to syntax errors. That won't be discovered until the consumer dequeues the work.

####Example#####

[Producer] NOTE - if passing in the message or worker notifications as arguments to dynamic linq, you must cast them. The internal compiler treats those as objects. You can see this syntax in the examples below. That's not nessasry if using standard Linq expressions.

Message
(IReceivedMessage<MessageExpression>)

WorkerNotification
(IWorkerNotification)

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQLiteProducerLinq/Program.cs

If you are passing value types, you will need to parse them. Here is an example. The Guid and the int are both inside string literials and parsed via the built in dot.net methods.

var id = Guid.NewGuid();
var runTime = 200;
$"(message, workerNotification) => StandardTesting.Run(new Guid(\"{id}\"), int.Parse(\"{runTime}\"))"

This will produce a linq expression that can be compiled and executed by the consumer, assuming that it can resolve all of the types.

[Consumer] The consumer is generic; it can process any linq expression. However, it must be able to resolve all types that the linq expression uses. You may need to wire up an assembly resolver if your DLL's cannot be located.

https://msdn.microsoft.com/en-us/library/system.appdomain.assemblyresolve(v=vs.110).aspx

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQLiteConsumerLinq/Program.cs

The above queue will process all Linq statements sent to the specified connection / queue.

[Considerations]

No sandboxing or checking for risky commands is performed. For instance, the below statement will cause your consumer host to exit.

"(message, workerNotification) => Environment.Exit(0)"

If you decide to allow configuration files to define dyanmic Linq statements (or if you cannot trust the producer), you should consider running the consumer in an application domain sandbox. Otherwise, the only thing stopping a command like the following from executing would be O/S user permissions.

"(message, workerNotification) => System.IO.Directory.Delete(@"C:\Windows\, true)"

Usage - Job Scheduler

Jobs may be scheduled using Schyntax format. The scheduler and consumers are seperate; schedulers don't process any work, they queue it for processing by a consumer. The standard LINQ consumers are used to process work enqueued by a scheduler / schedule.

Any LINQ statement that a linq producer supports can be scheduled using the scheduler.

Multiple schedulers with the same schedule may be ran if needed for redundancy. However, it's important that the clocks on the machines are in sync, or that the same time provider is injected into the schedulers and consumers. See the WIKI for more information on this.

Generally speaking, you may get funny results if you are using multiple machines and the clocks are not in sync. The server based transports tend to provide solutions for this if you can't sync the clocks of the local machines; see the WIKI.

See Schyntax for event scheduling format.

[Scheduler]

The scheduler and container must be kept in scope until you are done scheduling work or shutting down. No work will be queued if the scheduler is disposed or falls out of scope.

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQliteScheduler/Program.cs

To consume / process scheduled jobs, a Linq Consumer is used

https://github.com/blehnen/DotNetWorkQueue.Samples/blob/master/Source/Samples/SQLite/SQLiteSchedulerConsumer/Program.cs


Samples

Building the source

You'll need VS2022 (any version) and you'll also need to install the dot net core 2.0/6.0 SDKs

All references are either in NuGet or the \lib folder - building from Visual studio should restore any needed files.

License

Copyright � 2015-2024 Brian Lehnen

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 2.1 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see http://www.gnu.org/licenses/.

3rd party Libraries

This library uses multiple 3rd party libaries, listed below.

[DotNetWorkQueue]

[DotNetWorkQueue.Transport.Redis]

[DotNetWorkQueue.Transport.SqlServer]

  • None

[DotNetWorkQueue.Transport.SQLite]

[DotNetWorkQueue.Transport.SQLite.Microsoft]

[DotNetWorkQueue.Transport.PostgreSQL]

[DotNetWorkQueue.Transport.LiteDb]

[DotNetWorkQueue.AppMetrics]

[Unit / Integration Tests]

Developed with:

dotnetworkqueue's People

Contributors

blehnen avatar brian-lehnen avatar dependabot[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dotnetworkqueue's Issues

Performance of enqueue and dequeue with sqlite

Hi,

The performance of en-queuing and de-queuing seems to be very slow as compared to msmq when using sqlite based persistance. Can you pls comment on the same? I can share the results what I observe for the time taken for such enqueue/dequeue if needed.

How to create tables beforehand?

Hi! For security reasons, my app doesn't have access to make schema changes, therefore something I'd like to do is create the required database tables as part of my schema migration process, rather than using CreateQueue().

What would be the recommended way to obtain the required DLL please?

How to clear the queue?

When my application starts, I'd like to check one of the queues to see if it has anything in it and clear it out if necessary.

Redis - Command timeout on MoveDelayed

Hi
I have recently got a lot of problems with timeouts.
This is for all of my applications where I use Redis Workqueues
There are about 10 of this at the same second in my log.

StackExchange.Redis.RedisTimeoutException: Timeout performing EVAL (5000ms), next: EVAL, inst: 2, qu: 0, qs: 1, aw: False, bw: SpinningDown, rs: ReadAsync, ws: Idle, in: 4, in-pipe: 0, out-pipe: 0, last-in: 314, cur-in: 0, sync-ops: 2864, async-ops: 1, serverEndpoint: redis-qa.ad.impleoit.se:6379, conn-sec: 3087.67, aoc: 0, mc: 1/1/0, mgr: 10 of 10 available, clientName: adm-server-5488bf97c5-tzsz2(SE.Redis-v2.6.122.38350), IOCP: (Busy=0,Free=1000,Min=1,Max=1000), WORKER: (Busy=13,Free=32754,Min=4,Max=32767), POOL: (Threads=14,QueuedItems=51,CompletedItems=150671,Timers=57), v: 2.6.122.38350 (Please take a look at this article for some common client-side issues that can cause timeouts: https://stackexchange.github.io/StackExchange.Redis/Timeouts)
at StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T](Message message, ResultProcessor`1 processor, ServerEndPoint server, T defaultValue) in //src/StackExchange.Redis/ConnectionMultiplexer.cs:line 2099
at StackExchange.Redis.RedisDatabase.ScriptEvaluate(String script, RedisKey[] keys, RedisValue[] values, CommandFlags flags) in /
/src/StackExchange.Redis/RedisDatabase.cs:line 1527
at StackExchange.Redis.RedisDatabase.ScriptEvaluate(LoadedLuaScript script, Object parameters, CommandFlags flags) in /_/src/StackExchange.Redis/RedisDatabase.cs:line 1542
at DotNetWorkQueue.Transport.Redis.Basic.Lua.BaseLua.TryExecute(Object parameters)
at DotNetWorkQueue.Transport.Redis.Basic.Lua.MoveDelayedToPendingLua.Execute(Int64 unixTime, Int32 count)
at DotNetWorkQueue.Transport.Redis.Basic.CommandHandler.MoveDelayedRecordsCommandHandler.Handle(MoveDelayedRecordsCommand command)
at DotNetWorkQueue.Transport.Redis.Basic.DelayedProcessingAction.Run(CancellationToken token)
at DotNetWorkQueue.Transport.Redis.Basic.Metrics.Decorator.DelayedProcessingActionDecorator.Run(CancellationToken token)
at DotNetWorkQueue.Transport.Redis.Basic.Logging.Decorator.DelayedProcessingActionDecorator.Run(CancellationToken token)
at DotNetWorkQueue.Queue.BaseMonitor.RunMonitor()

Is there a way to retrieve queue Count?

I am facing some troubles with the SQL Consumer, since when I call the start method, it just run once. I am trying to implement a logic to run until the queue is empty but i was not able to find a count property in queue.

using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
                       {

                            var queueConnection = new QueueConnection("QUEUENAME", Settings.ConnectionString);
                            using (var queue = queueContainer.CreateConsumer(queueConnection))
                            {
                                //queue.Configuration.Worker.WorkerCount = 1; //lets run 4 worker threads
                                //queue.Configuration.Worker.TimeToWaitForWorkersToStop = TimeSpan.FromSeconds(300);
                                //queue.Configuration.Worker.TimeToWaitForWorkersToCancel = TimeSpan.FromSeconds(300);
                                //queue.Configuration.HeartBeat.UpdateTime = "sec(*%10)"; //set a heartbeat every 10 seconds
                                //queue.Configuration.HeartBeat.MonitorTime = TimeSpan.FromSeconds(15); //check for dead records every 15 seconds
                                //queue.Configuration.HeartBeat.Time = TimeSpan.FromSeconds(300); //records with no heartbeat after 35 seconds are considered dead
                                
                                //an invalid data exception will be re-tried 3 times, with delays of 3, 6 and then finally 9 seconds
                                //queue.Configuration.TransportConfiguration.RetryDelayBehavior.Add(typeof(Exception), new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1) });
                                queue.Start<SimpleMessage>(HandleMessages);
                                queue.ToString();
                            }
                        }
                    }
                //}
                
            //}
        }

        private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
        {
            _fileLogger.Trace($"Test {message.Body.Message}");
        }

Unhanded system exception Newtonsoft.Json.JsonSerializationException: Error resolving type specified in JSON 'DotNetWorkQueue.ValueTypeWrapper`1[[System.DateTime, System.Private.CoreLib]]

I am trying to use the Framework with both .NET Core 3.1 and .NET Framework 4.7.2 consumers where each component has a different queue.

The producer that generates work is in written in .NET Core 3.1.

When consuming the messages in a .NET Framework 4.7.2 process that have been generated by the .NET Core process, I get the following exception:

18:46:51 ERR] Unhanded system exception
Newtonsoft.Json.JsonSerializationException: Error resolving type specified in JSON 'DotNetWorkQueue.ValueTypeWrapper1[[System.DateTime, System.Private.CoreLib]], DotNetWorkQueue'. Path 'Queue-FirstPossibleDeliveryDate.$type', line 1, position 140. ---> Newtonsoft.Json.JsonSerializationException: Could not find type 'DotNetWorkQueue.ValueTypeWrapper1[[System.DateTime, System.Private.CoreLib]]' in assembly 'DotNetWorkQueue, Version=0.4.5.0, Culture=neutral, PublicKeyToken=null'. ---> Newtonsoft.Json.JsonSerializationException: Could not load assembly 'System.Private.CoreLib'.
at Newtonsoft.Json.Serialization.DefaultSerializationBinder.GetTypeFromTypeNameKey(StructMultiKey`2 typeNameKey)

This exception is caused by a difference in the value type for the "Queue-FirstPossibleDeliveryDate" that is stored in the message Header, that is different between runtimes:

Namely:

"Queue-FirstPossibleDeliveryDate": {
"$type": "DotNetWorkQueue.ValueTypeWrapper`1[[System.DateTime, System.Private.CoreLib]], DotNetWorkQueue",
"Value": "2020-08-11T15:39:19.5178914Z"
},

"Queue-FirstPossibleDeliveryDate": {
"$type": "DotNetWorkQueue.ValueTypeWrapper`1[[System.DateTime, mscorlib]], DotNetWorkQueue",
"Value": "2020-08-12T07:52:19.3955732Z"
},

I doubt that this mode of operation is supported, however we're migrating a set of code from .NET Framework to Core where we're using the Queue as a decoupling pattern and we're not yet in a place to migrate the whole system into Core in one go.. Is it possible to customise the serialisation of the header?

I've had a couple of questions. Please answer, thank you.

1, After the consumer initializes the instance, if the REDIS is disconnected, the consumer can not reconnect to the REDIS when the REDIS is reopened.
2, The consumer once Start(), unless the withdrawal process, it has been running, can not find any way to terminate, can you consider joining Stop()?
3. In QueueContainer, only Create has no Remove. @@how can I remove the created producer or comsumer?

Redis Transport - NOSCRIPT Error when Redis Server is restarted

Hi.
If the Redis server gets restarted, (quite normal with Azure Redis cache) my application (that was not restarted) is experiencing errors like
An exception has occurred in the monitor delegate, StackExchange.Redis.RedisServerException: NOSCRIPT No matching script. Please use EVAL. at StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T](Message message, ResultProcessor1 processor, ServerEndPoint server) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 2258 at StackExchange.Redis.RedisBase.ExecuteSync[T](Message message, ResultProcessor1 processor, ServerEndPoint server) in C:\projects\stackexchange-redis\src\StackExchange.Redis\RedisBase.cs:line 54 at StackExchange.Redis.RedisDatabase.ScriptEvaluate(LoadedLuaScript script, Object parameters, CommandFlags flags) in C:\projects\stackexchange-redis\src\StackExchange.Redis\RedisDatabase.cs:line 1173 at DotNetWorkQueue.Transport.Redis.Basic.Lua.TimeLua.Execute() at DotNetWorkQueue.Transport.Redis.Basic.Time.RedisServerUnixTime.GetUnixTime() at DotNetWorkQueue.Transport.Redis.Basic.CommandHandler.MoveDelayedRecordsCommandHandler.Handle(MoveDelayedRecordsCommand command) at DotNetWorkQueue.Transport.Redis.Basic.DelayedProcessingAction.Run(CancellationToken token) at DotNetWorkQueue.Transport.Redis.Basic.Metrics.Decorator.DelayedProcessingActionDecorator.Run(CancellationToken token) at DotNetWorkQueue.Transport.Redis.Basic.Logging.Decorator.DelayedProcessingActionDecorator.Run(CancellationToken token) at DotNetWorkQueue.Queue.BaseMonitor.RunMonitor()

Is this something that can be fixed or can you point me in right direction to try to do a PR.

Best Regards, Peter

Issue with redis on netcore

Hi, i've tried to run this on netcore (linux and all) using a redis docker container, i can write to the queue (i do have to change your code, since it is simplemessage and not simplemessage.simplemessage) but for some reason, my consumer does not trigger and never consume the item.

Producer :

public void Run()
        {
            var queueName = "OSO";
            var connectionString = "localhost:6379";        
            using (var queueContainer = new QueueContainer<RedisQueueInit>())
            {
                using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueName, connectionString))
                {
                    queue.Send(new SimpleMessage{Message = "hello world"});
                }
            }
        }

and consumer :

public void Run()
        {     
            using (var queueContainer = new QueueContainer<RedisQueueInit>())
            {
                using (var queue = queueContainer.CreateConsumer("OSO", "localhost:6379"))
                {
                    queue.Start<SimpleMessage>(HandleMessages);
                    Console.WriteLine("Processing messages - press any key to stop");
                    Console.ReadKey((true));
                }
            }
        }
        private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
        {
            Console.WriteLine(message.Body.Message);
        }

A few questions

Hi. I haven't been able to find answers to the following in the Wiki, and I hope you can help me:

How do I configure Heartbeat frequency?
How do I configure Heartbeat timeout?
How do I configure Heartbeat retries?
How do I configure retries on exceptions?
How do I (is it possible to?) configure actions to take if the last retry also fails?

Simple Producer and Consumer with Redis

Hi,

I'm trying to create a simple Producer and Consumer with Redis. The producer is creating the messages but the consumer cannot see those messages.

using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Transport.Redis.Basic;
using System;

namespace RedisProducer
{
    public class SimpleMessage
    {
        public string Message { get; set; }
        public int ProcessingTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var queueName = "imedical_options";
            var connectionString = "localhost";
            var queueConnection = new QueueConnection(queueName, connectionString);
            using var queueContainer = new QueueContainer<RedisQueueInit>();

            using var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection);
            queue.Send(new SimpleMessage { Message = "hello queue world" });


            Console.WriteLine("Hello Queue World!");
        }
    }
}

using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Transport.Redis.Basic;
using System;
using System.Collections.Generic;
using System.IO;

namespace RedisConsumer
{
    public class SimpleMessage
    {
        public string Message { get; set; }
        public int ProcessingTime { get; set; }
    }

    internal class Program
    {
        static void Main(string[] args)
        {
            var queueName = "imedical_options";
            var connectionString = "localhost";

            static void HandleMessages(IReceivedMessage<SimpleMessage> m, IWorkerNotification n)
            {
                Console.WriteLine($"Processing Message {m.Body.Message}");
            }

            var queueConnection = new QueueConnection(queueName, connectionString);
            using (var queueContainer = new QueueContainer<RedisQueueInit>())
            {
                using var queue = queueContainer.CreateConsumer(queueConnection);
                //set some processing options and start looking for work
                queue.Configuration.Worker.WorkerCount = 1; //lets run 1 worker threads

                queue.Configuration.MessageExpiration.Enabled = false;                
                queue.Start<SimpleMessage>(HandleMessages);
                Console.WriteLine("Processing messages - press any key to stop");
                Console.ReadKey((true));
            }

            Console.WriteLine("Hello Queue Consumer World!");
        }
    }
}

I attached my proof of concept

DotNetRedisQueue.zip

What am I doing wrong?

Thanks in advance

[SqlLite, .NET5]: RuntimeBinderException: 'System.MulticastDelegate' does not contain a definition for 'Invoke'

I tried a simple example by following the guide for SqlLite. However, it throws an exception:

Debug [Local] server difference is -0,0006 MS Debug [Local] server difference is -0,0001 MS Debug Worker 1 created Information Initializing with 1 workers Information Queue started Processing messages - press any key to stop Information Scheduler time is 2021-09-21 11:35:16 Information Local time is 2021-09-21 11:35:16 Debug Worker 1 created Information Initializing with 1 workers Information Queue started Error Message with ID 8 has failed and has been moved to the error queue Microsoft.CSharp.RuntimeBinder.RuntimeBinderException: 'System.MulticastDelegate' does not contain a definition for 'Invoke' at CallSite.Target(Closure , CallSite , Object , Object , IWorkerNotification ) at System.Dynamic.UpdateDelegates.UpdateAndExecuteVoid3[T0,T1,T2](CallSite site, T0 arg0, T1 arg1, T2 arg2) at DotNetWorkQueue.Messages.MessageHandler.Handle(IReceivedMessageInternal message, IWorkerNotification workerNotification) in C:\Git\DotNetWorkQueue\Source\DotNetWorkQueue\Messages\MessageHandler.cs:line 48 at DotNetWorkQueue.Metrics.Decorator.MessageHandlerDecorator.Handle(IReceivedMessageInternal message, IWorkerNotification workerNotification) in C:\Git\DotNetWorkQueue\Source\DotNetWorkQueue\Metrics\Decorator\IMessageHandlerDecorator.cs:line 50 at DotNetWorkQueue.Trace.Decorator.MessageHandlerDecorator.Handle(IReceivedMessageInternal message, IWorkerNotification workerNotification) in C:\Git\DotNetWorkQueue\Source\DotNetWorkQueue\Trace\Decorator\MessageHandlerDecorator.cs:line 54 at DotNetWorkQueue.Messages.HandleMessage.Handle(IReceivedMessageInternal message, IWorkerNotification workerNotification) in C:\Git\DotNetWorkQueue\Source\DotNetWorkQueue\Messages\HandleMessage.cs:line 57 at DotNetWorkQueue.Queue.ProcessMessage.Handle(IMessageContext context, IReceivedMessageInternal transportMessage) in C:\Git\DotNetWorkQueue\Source\DotNetWorkQueue\Queue\ProcessMessage.cs:line 69 Debug Stopping worker thread Worker 1 Debug Stopping worker thread Worker 1

Here's the code sample to reproduce:

using System;
using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Logging;
using DotNetWorkQueue.Transport.SQLite.Basic;

namespace ConsoleTest
{
    internal class Program
    {
        private static readonly string connectionString = @"Data Source=c:\temp\queue.db;Version=3;";
        private static readonly string queueName = "testing";
        private static readonly QueueConnection queueConnection = new(queueName, connectionString);

        private static void Main()
        {
            try
            {
                CreateQueue();
                Send();

                using QueueContainer<SqLiteMessageQueueInit> queueContainer = new();
                using IConsumerQueue queue = queueContainer.CreateConsumer(queueConnection);
                queue.Start<SimpleMessage>(HandleMessages);
                Console.WriteLine("Processing messages - press any key to stop");
                _ = Console.ReadKey(true);

                static void HandleMessages(
                    IReceivedMessage<SimpleMessage> message,
                    IWorkerNotification notifications)
                {
                    notifications.Log.LogInformation($"start transaction {message.CorrelationId}");
                    notifications.Log.LogInformation($"received {message.MessageId}");
                    LogEntry entry = new(
                        LoggingEventType.Debug,
                        $"Processing Message {message.Body.Message}");
                    notifications.Log.Log(entry);
                    notifications.Log.LogInformation($"end transaction {message.CorrelationId}");
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

        private static void Send()
        {
            using QueueContainer<SqLiteMessageQueueInit> queueContainer = new();
            using IProducerQueue<SimpleMessage> queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection);
            IQueueOutputMessage output = queue.Send(new SimpleMessage { Message = "Hello World" });
        }

        private static void CreateQueue()
        {
            // Create the queue if it doesn't exist
            QueueCreationContainer<SqLiteMessageQueueInit> createQueueContainer = new();
            SqLiteMessageQueueCreation queueCreation = createQueueContainer.GetQueueCreation<SqLiteMessageQueueCreation>(queueConnection);
            if (!queueCreation.QueueExists)
            {
                _ = queueCreation.CreateQueue();
            }
        }
    }
}

No way to create Named Logs anymore??

Hi

I have updated to 0.6.2 from 0.5.
As the interface ILogProvider has been removed, is there any new way to create "named" loggers??
The ILogProvider interface worked very nice together with Microsoft.Extensions.Logging.

Am I just missing something, or is the functionality removed in 0.6.2??

Question: Transactions

Hi,

I was looking into a scenario where we want to save a record to our sql database, but then trigger a bunch of plugins that are interested into responding when that event has taken place.

My thoughts are to save the record to the database, and then write a message to the queue in a single transaction. I can then write the logic to dequeue the message and fire the plugins, and only remove the event from the queue once all the plugins have responded. Plugins will be written to handle the message idempotently.

This idea behind this is to avoid a scenario in which we have saved the record to the database but then the application dies, before a message can be written to the queue, and therefore plugins are never notified of the event.

With this in mind, is it possible to use the sql server transport with an existing transaction? For example if I was using EFCore to save a record I'd want to save that and push the message to the queue using the same DbConnection and a transaction for this to work.

Thanks

Multiple Instances

Hi!

I'm not sure if this library can handle my requirements and i haven't found an answer in the wiki.
So i hope you can answer my Question...

I have two (or more) Windows Services which are running on different Servers.
If i configure all those Services the same, will the Task be executed on only one of those services, or on every service/server?

I want to build a distributed Webservice, which uses a Loadbalancer. So, all Services can create "Messages" but only one of them should execute the Task. At best the one which is idle or is the fastest...

Br
Bernhard

OpenTelemetry 1.1.0 dependency issues with NET6

Hi, is it possible you could update OpenTelemetry to the latest version.
The current version has some issues with Microsoft.Extensions.Logging 6.0.0 as it's dependency is < 6.0.0.

We managed to work around the issue, but it would be nice if the dep of this project gets updated.

Issue when consumer run

I use redis db to save it. Queue job is oke job is saved in db, but when i start queue in consumer, it doesn't go to my function. Job in db still disappear in pending but it is appear in Error db. Here is my code.

    class Program
    {
        static void Main(string[] args)
        {
            using (var queueContainer = new QueueContainer<RedisQueueInit>())
            {
                string connectionString = "localhost:1234";
                using (var queue = queueContainer.CreateConsumer("tracking", connectionString))
                {
                    queue.Configuration.Worker.WorkerCount = 1;
                    Console.WriteLine(DateTime.Now);
                    queue.Start<dynamic>(Handle);
                    Console.WriteLine(DateTime.Now);
                    Console.WriteLine("Processing messages - press any key to stop");
                    Console.ReadKey((true));
                }
            }
        }

        public static void Handle(IReceivedMessage<dynamic> arg1, IWorkerNotification arg2)
        {
            // Not run to here
            Log.Logger.Information($"{DateTime.Now} Processing message {arg1.MessageId.Id.Value.ToString()}");
            Thread.Sleep(20000);
            Log.Logger.Information($"{DateTime.Now} Message {arg1.MessageId.Id.Value.ToString()} complete");
        }
    }

And here is log

20/08/2019 1:47:19 PM
20/08/2019 1:47:19 PM
Processing messages - press any key to stop

Please help me, thank you very much!

.Net Standard support?

This looks like a promising library!

Are there any plans to support .Net standard / .Net core?

Messages not re-queuing after expired Heartbeat

The transport is MSSQL.
I have set up a consumer as follows:

HeartBeat.UpdateTime = "sec(*%10)";
HeartBeat.MonitorTime = TimeSpan.FromSeconds(15);
HeartBeat.Time = TimeSpan.FromSeconds(35);
MessageExpiration.Enabled = false;

I have an item in queue whose Heartbeat is expired:

SELECT 
	  A.[QueueID]
	  ,B.[Status] AS StatusStatus
	  ,C.HeartBeat
	  ,C.QueuedDateTime
	  ,C.[Status] AS MetaDataStatus
	  ,GETDATE() AS CurrentTime
  FROM [MessageQueue] A
LEFT JOIN
	[MessageQueueStatus] B
ON A.[QueueID] = B.[QueueID]
LEFT JOIN
	[MessageQueueMetaData] C
ON A.[QueueID] = C.[QueueID]
QueueID	StatusStatus	HeartBeat			QueuedDateTime		MetaDataStatus	CurrentTime
1	1		2019-12-19 16:40:31.153		2019-12-19 16:40:30.857	1		2019-12-20 13:06:14.140

All other MessageQueue tables are empty.

But when I start the consumer nothing happens with this item. If I enqueue a new item, the new item gets handled...

Access to the Error Queue

Hi. Just started to use DNWQ and find it very suitable to my use case with the Redis Transport.
I have one question though, I would like to intercept when a message is moved to the error queue, either direct or after the defined number of retries.

Also, is there any automatic handling (clearing) of the error queue?

Is there any samples regarding the error queue that I've missed?

Btw, is this the right place for this kind of questions?

Tnx for a nice Library

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.