Giter Site home page Giter Site logo

i-e-b / diskqueue Goto Github PK

View Code? Open in Web Editor NEW
110.0 10.0 37.0 1.09 MB

A robust, thread-safe, and multi-process persistent queue library for dotnet

License: Other

C# 100.00%
transaction thread storage-location multi-process c-sharp mono disk queue fast net-standard-2

diskqueue's Introduction

DiskQueue

A robust, thread-safe, and multi-process persistent queue.

Based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk

Requirements and Environment

Works on dotnet standard 2.0 platforms

Requires access to filesystem storage.

The file system is used to hold locks, so any bug in your file system may cause issues with DiskQueue -- although it tries to work around them.

Thanks to

These kind folks have helped in the development of DiskQueue

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock on the given storage location. On first use, a directory will be created with the required files inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise disposed of properly. Failure to do this will result in lock contention -- you will get errors that the queue is still in use.

There is also a generic-typed PersistentQueue<T>(...); which will handle the serialisation and deserialization of elements in the queue, as long at the type is decorated with [Serializable]. You can also inject your own ISerializationStrategy<T> into your PersistentQueueSession<T> if you wish to have more granular control over Serialization/Deserialization, or if you wish to use your own serializer (e.g Json.NET).

Use new PersistentQueue<T>(...) in place of new PersistentQueue(...) or PersistentQueue.WaitFor<T>(...) in place of PersistentQueue.WaitFor(...) in any of the examples below.

Note: BinaryFormatter was removed from the default serializer. See https://learn.microsoft.com/en-us/dotnet/standard/serialization/binaryformatter-security-guide.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless you call session.Flush(). Data will only be visible between threads once it has been flushed. Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption). If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Global default settings

Each instance of a PersistentQueue has it's own settings for flush levels and corruption behaviour. You can set these individually after creating an instance, or globally with PersistentQueue.DefaultSettings. Default settings are applied to all queue instances in the same process created after the setting is changed.

For example, if performance is more important than crash safety:

PersistentQueue.DefaultSettings.ParanoidFlushing = false;
PersistentQueue.DefaultSettings.TrimTransactionLogOnDispose = false;

Or if up-time is more important than detecting corruption early (often the case for embedded systems):

PersistentQueue.DefaultSettings.AllowTruncatedEntries = true;
PersistentQueue.DefaultSettings.ParanoidFlushing = true;

Internal Logging

Some internal warnings and non-critical errors are logged through PersistentQueue.Log. This defaults to System.Console.WriteLine, but can be replace with any Action<string>

Removing or resetting queues

Queues create a directory and set of files for storage. You can remove all files for a queue with the HardDelete method. If you give true as the reset parameter, the directory will be written again.

This WILL delete ANY AND ALL files inside the queue directory. You should not call this method in normal use. If you start a queue with the same path as an existing directory, this method will delete the entire directory, not just the queue files.

var subject = new PersistentQueue("queue_a");
subject.HardDelete(true); // wipe any existing data and start again

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed. There is a static helper method PersistentQueue.WaitFor("path", TimeSpan...) which will wait to gain access until other processes release the lock or the timeout expires. If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

Cross-process Locking

DiskQueue tries very hard to make sure the lock files are managed correctly. You can use this as an inter-process lock if required. Simply open a session to acquire the lock, and dispose of the session to release it.

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

diskqueue's People

Contributors

i-e-b avatar mookid8000 avatar skolima avatar stefandascalu64 avatar thalter avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

diskqueue's Issues

[Feature request] Double ended queue functionality

Are you planning to implement a double ended queue functionality?
Otherwise some functionality that allow to dequeue the last element without removing it from the stack.
I need to do some operations that could fail after the dequeuing, in this case I would like to put the element again in the list but it will be the first dequeued the in the next iteration.
Do you think it is possible to implement something like that?

data.0 file does not decrease in size when doing a dequeue?

Version: 1.6.7

Expected Behavior:
When I dequeue something from the queue, I expect the data.0 file to decrease in size or the item to be removed.

Observed Behavior:
The data.0 file remains the same size even though the queue is considered empty.

Description:
I would expect as the queue data is removed/dequeued from the queue that the data.0 file would decrease in size as well but it only grows.

Dequeue Code:
public void Dequeue() { using var queue = PersistentQueue.WaitFor(_queueStorage, TimeSpan.FromSeconds(30)); using var session = queue.OpenSession(); data = session.Dequeue(); if(data != null) session.Flush(); // remove old entry }

Enqueue Code:
public void Enqueue(string entry) { using var queue = PersistentQueue.WaitFor(_queueStorage, TimeSpan.FromSeconds(30)); using var session = queue.OpenSession(); session.Enqueue(Encoding.Unicode.GetBytes(entry)); session.Flush(); }

FIFO versus LIFO

Thanks for this useful package!

As I understand it, this queue works like a FIFO (first in first out) buffer. Did you consider an option to make it a LIFO (last in first out) buffer? This would be a useful option in our application since we wish to send the most recent data in the queue first. In the event of a long network outage, the queue could become quite large and waiting to clear the queue before seeing the latest data is not ideal.

Persistence on app close?

If data is persisted on disk when the app is closed, the queue doesn't appear to read that data and fill itself when the app is restarted. Is that not a supported use case for this library?

Hard resets aren't recoverable

On a hard reset, it seems the queue’s transaction log often ends up being corrupted. Attempting to open it brings up this Exception (I can send the queue data and log/state files if you care to look at them):

Failed to open transmit buffer: System.InvalidOperationException: Unexpected data in transaction log. Expected to get transaction separator but got unknown data. Tx #1
  at DiskQueue.Implementation.PersistentQueueImpl.AssertTransactionSeperator (System.IO.BinaryReader binaryReader, Int32 txCount, Guid expectedValue, System.Action hasData) [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueImpl+<>c__DisplayClass11.<ReadTransactionLog>b__e (System.IO.Stream stream) [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.Atomic.Read (System.String path, System.Action`1 action) [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueImpl.ReadTransactionLog () [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueImpl..ctor (System.String path, Int32 maxFileSize) [0x00000] in <filename unknown>:0

Possible threading issue

...Since the recovery can take a long time, this meant that the logger wasn’t actually logging while it was busy recovering old data. I figured it would be simple to work around that problem by having my recovery method run in a separate thread. Unfortunately, when my main thread is reading and writing to the queue as normal, my recovery thread will eventually throw an exception when trying to add to that same queue:

Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object
  at System.Collections.Generic.LinkedList`1[DiskQueue.Implementation.Entry].CopyTo (DiskQueue.Implementation.Entry[] array, Int32 index) [0x00000] in <filename unknown>:0 
  at System.Linq.Enumerable.ToArray[Entry] (IEnumerable`1 source) [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueImpl.FlushTrimmedTransactionLog () [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueImpl.CommitTransaction (ICollection`1 operations) [0x00000] in <filename unknown>:0 
  at DiskQueue.Implementation.PersistentQueueSession.Flush () [0x00000] in <filename unknown>:0 
  at DataToDesktop.Program.recoverTransmitBuffer (System.String oldBufferLocation) [0x00000] in <filename unknown>:0 
  at DataToDesktop.Program+<>c__DisplayClass2.<Main>b__0 () [0x00000] in <filename unknown>:0 
  at System.Threading.Thread.StartInternal () [0x00000] in <filename unknown>:0

Based on the error, I do know that this is Line 444 of PersistentQueueImpl.cs, in FlushTrimmedTransactionLog() where it converts to an array.

using binaryformatter security risks

Hi
The binaryformatter is obsolete and consider dangerous and should be replaced. DiskQueue is using it in DiskQueue.Implementation.DefaultSerializationStrategy

The [BinaryFormatter](https://learn.microsoft.com/en-us/dotnet/api/system.runtime.serialization.formatters.binary.binaryformatter) 

type is dangerous and is not recommended for data processing. Applications should stop using BinaryFormatter as soon as possible, 

even if they believe the data they're processing to be trustworthy. BinaryFormatter is insecure and can't be made secure.

https://learn.microsoft.com/en-us/dotnet/standard/serialization/binaryformatter-security-guide

Maybe u can use something like this instead, I have only made some limited tests but it seems to work. This code will only work in dotnet 5 + projects but you can replace it with Newtonsoft.Json.

using System.IO;
using System.Text.Json;
using System;

namespace DiskQueue.Implementation
{
    /// <summary>
    /// This class performs basic binary serialization from objects of Type T to byte arrays suitable for use in DiskQueue sessions.
    /// </summary>
    /// <remarks>
    /// You are free to implement your own <see cref="ISerializationStrategy{T}"/> and inject it into <see cref="PersistentQueue{T}"/>.
    /// </remarks>
    /// <typeparam name="T"></typeparam>
    internal class DefaultSerializationStrategy<T> : ISerializationStrategy<T>
    {
        /// <inheritdoc />
        public T? Deserialize(byte[]? bytes)
        {
            if (bytes == null)
            {
                return default;
            }

                using (MemoryStream ms = new MemoryStream(bytes))
                {
                    return JsonSerializer.Deserialize<T>(ms);
                }          
        }

        /// <inheritdoc />
        public byte[]? Serialize(T? obj)
        {
            if (obj == null)
            {
                return default;
            }
            using MemoryStream ms = new();
            JsonSerializer.Serialize(ms, obj);
            return ms.ToArray();
        }
    }
}

[Question] Should queue ever cleanup

I've got examples of my own running... one thing writing to the queue, and another thing reading from the queue at a decent rate.

What I'm noticing is that even though i'm dequeue-ing with a flush, data.0 file in the queue folder continues to grow.... I'm assuming meta.state keeps track of where the "stating position" is in data.0, and thus stopping the app, and restarting doesn't just start replaying all those old queue items... but I've noticed data.0 seems to basically grow indefinitely, regardless if the queue'd items are getting dequeued very shortly after they are being queued.

Is there anything that will basically take whats left in data.0 to process and write that to a new file, and delete data.0?

Using .NET Standard.

Your library does indeed work with .NET Standard, but there are build warnings.

warning NU1701: Package 'DiskQueue 1.1.0' was restored using '.NETFramework,Version=v4.6.1' instead of the project target framework '.NETStandard,Version=v2.0'. This package may not be fully compatible with your project

If I submit a pull request, will you accept and push to NuGet?

Diskqueue.dll version still 1.0.0 ?

Hello.

I am not 100% sure, but it looks like the file version in 1.3.1 nuget package is still 1.0.0. This cause an error with our Installshield installer when trying to upgrade our software.
Can this be fixed in a 1.3.2 package ?
Thanks. Stephen

a bug when run after 2 hours

using v 1.6.0

System.InvalidOperationException
HResult=0x80131509
Message=End of file reached while trying to read queue item. Exceeded retry count.
Source=DiskQueue
StackTrace:
at DiskQueue.Implementation.PersistentQueueImpl.ReadAhead() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueImpl.cs:line 378
at DiskQueue.Implementation.PersistentQueueImpl.Dequeue() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueImpl.cs:line 335
at DiskQueue.Implementation.PersistentQueueSession.Dequeue() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueSession.cs:line 130
at MyQueueDisk`1.Peek() in MyQueueDisk.cs:line 54
at OJ.Worker.OJWorker.WorkThread() in IWorker.cs:line 131

此异常最初是在此调用堆栈中引发的:
DiskQueue.Implementation.PersistentQueueImpl.ReadAhead() (位于 PersistentQueueImpl.cs 中)
DiskQueue.Implementation.PersistentQueueImpl.Dequeue() (位于 PersistentQueueImpl.cs 中)
DiskQueue.Implementation.PersistentQueueSession.Dequeue() (位于 PersistentQueueSession.cs 中)
MyQueueDisk.Peek() (位于 MyQueueDisk.cs 中)
OJ.Worker.OJWorker.WorkThread() (位于 IWorker.cs 中)

Any thoughts to persist the file name as well?

First my apologies to create this as "ISSUE" since I couldn't find your contact details.
Is it possible to persist the file name as well along with file bytes? I couldn't see a way to do that. Do you have any alternate solutions to capture the file name with bytes?

Thanks in advance!

Data Files

The data files in my setup never seem to get deleted. It is also not getting items more than once so that is working correct. I'm sure I'm just missing something small.

End of file reached while trying to read queue item

We are using DiskQueue version 1.3.2 in a C# Windows service application that is running on Windows 7 and Windows 10.
In this application, we are using DiskQueue as a buffer for communication protocols, where every item that needs to be sent will be queued, and another part of the code dequeues and sends these items. After running this application for a long time we received the "End of file reached while trying to read queue item" exception as shown below.

System.InvalidOperationException: End of file reached while trying to read queue item
at DiskQueue.Implementation.PersistentQueueImpl.ReadEntriesFromFile(Entry firstEntry, Int64 currentBufferSize)
at DiskQueue.Implementation.PersistentQueueImpl.ReadAhead()
at DiskQueue.Implementation.PersistentQueueImpl.Dequeue()
at DiskQueue.Implementation.PersistentQueueSession.Dequeue()

The enqueuing of items happens roughly every second, and dequeuing is done continuously while items are queued. Both of these actions run in their own Tasks and queue sessions are opened and closed with each operation.

Although the same exception was described in issue #13, our application buffer contains a data.663 that does not contain any data and was not manually altered.

Is there a way to avoid this issue and/or is there a way to just ignore files like the data.663 while continuing to use the buffer?

System.Security.AccessControl.PrivilegeNotHeldException

Running as a normal user, the library continuously outputs the the said exception. When running as administrator, I do not see the exception. Is there a way to eliminate these exceptions? These are caught inside the library and do not seem to cause harm, but seems like if the exceptions are thrown, something is not quite right.

Exception thrown: 'DiskQueue.Implementation.TruncatedStreamException'

I've built a DiskQueue.dll using today's code as .NET Standard 2.0 class library.
I'm calling the library from both a net472 and core6 console app -- same issue occurring in both.
I'm running on Win11 22H2.

Each call to:
using (IPersistentQueue queue = PersistentQueue.WaitFor(QueuePathName, TimeSpan.FromSeconds(5)))
results in
Exception thrown: 'DiskQueue.Implementation.TruncatedStreamException' in DiskQueue.dll

image

The code is working but the exception is troubling. How am I causing a truncated transaction?

If I manually delete the C:\TEMP\Test queue folder, there's no exception during the first write.

        public static void BasicTest()
        {
            bool bWrite = true;
            bool bRead = true;

            string SharedStoreage = @"C:\TEMP";
            string QueueName = "Test";
            string QueuePathName = Path.Combine(SharedStoreage, QueueName);

            PersistentQueue.DefaultSettings.AllowTruncatedEntries = true;   // Default false
            PersistentQueue.DefaultSettings.ParanoidFlushing = true;        // Default true

            //-----------------------------------------------------------------------------------------
            //  Write
            //-----------------------------------------------------------------------------------------
            string msg = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";

            if (bWrite)
            {
                try
                {
                    System.Diagnostics.Debug.WriteLine("--------------------------------------");
                    System.Diagnostics.Debug.WriteLine($"Write Waiting for {QueuePathName}");

                    for (int i = 0; i < 10; i++)    // Add Messages
                    {
                        System.Diagnostics.Debug.WriteLine("");

                        using (IPersistentQueue queue = PersistentQueue.WaitFor(QueuePathName, TimeSpan.FromSeconds(5)))
                        {
                            System.Diagnostics.Debug.WriteLine($"BEFORE - Queue {QueueName} contains #{queue.EstimatedCountOfItemsInQueue} messages.");
                            System.Diagnostics.Debug.WriteLine("Opening Session");

                            using (var session = queue.OpenSession())
                            {
                                System.Diagnostics.Debug.WriteLine($"Write {QueueName}: {msg}");

                                byte[] data = Encoding.ASCII.GetBytes(msg);

                                session.Enqueue(data);

                                session.Flush();
                            }

                            System.Diagnostics.Debug.WriteLine($"AFTER - Queue {QueueName} contains #{queue.EstimatedCountOfItemsInQueue} messages.");
                        }
                    }
                }
                catch (Exception ex)
                {
                    string sMsg = $"Write {QueueName} Exception: {ex}";

                    System.Diagnostics.Debug.WriteLine(sMsg);
                }
            }

image

image

I've played thru all 4 permutations of the following settings from an empty queue folder and there's no difference:

        PersistentQueue.DefaultSettings.AllowTruncatedEntries = false;   // Default false
        PersistentQueue.DefaultSettings.ParanoidFlushing = false;        // Default true

Lock issue after power failure.

Hello.

I'm running into a weird issue that seems to occur if there is a power failure. I'm going to describe the scenario and please tell me if there's something I'm missing and I'm using the queue wrong.

Scenario:

  1. app starts with PID 1000 and uses the queue.
  2. power failure occurs.
  3. another process starts with PID 1000 and my app starts with some other PID
  4. StandardFileDriver.IsRunning assumes that it's a valid lock even though the process in question has nothing to do with the file.

Since this is an issue that can occur with the app crashing I'm thinking that adding the process StartTime to the lock file could be a possible fix since it would allow to double check that PID refers to the same process (not only the same name) while also having a fixed byte length for the lock file.

App crush and recovery

Hello,

I'm encountering an issue with my app that using DiskQueue. Everything works well when it closes properly because I've implemented proper disposal for my queue. However, if the app closes abruptly, the queue remains locked indefinitely. Do you have any suggestions on how to handle this situation?

Thank you, Orit

a bug when run after 2 hours

System.InvalidOperationException
HResult=0x80131509
Message=End of file reached while trying to read queue item. Exceeded retry count.
Source=DiskQueue
StackTrace:
at DiskQueue.Implementation.PersistentQueueImpl.ReadAhead() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueImpl.cs:line 378
at DiskQueue.Implementation.PersistentQueueImpl.Dequeue() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueImpl.cs:line 335
at DiskQueue.Implementation.PersistentQueueSession.Dequeue() in C:\gits\DiskQueue\src\DiskQueue\Implementation\PersistentQueueSession.cs:line 130
at MyQueueDisk`1.Peek() in MyQueueDisk.cs:line 54
at OJ.Worker.OJWorker.WorkThread() in IWorker.cs:line 131

此异常最初是在此调用堆栈中引发的:
DiskQueue.Implementation.PersistentQueueImpl.ReadAhead() (位于 PersistentQueueImpl.cs 中)
DiskQueue.Implementation.PersistentQueueImpl.Dequeue() (位于 PersistentQueueImpl.cs 中)
DiskQueue.Implementation.PersistentQueueSession.Dequeue() (位于 PersistentQueueSession.cs 中)
MyQueueDisk.Peek() (位于 MyQueueDisk.cs 中)
OJ.Worker.OJWorker.WorkThread() (位于 IWorker.cs 中)

Intermittent AccessViolation Exceptions

I'm running DiskQueue in a .NET Framework 4.8.0 application built as a Windows service. I'm collecting hundreds of thousands of events per day (collecting streaming data from a PLC), but it seems that once a day or so the application will terminate with a low-level Win32 exception.

I'm running with the following settings:
ParanoidFlushing=true
AllowTruncatedEntres=true (I've also tried false and it didn't make any difference)
ThrowOnConflict=false (I've also tried true and it didn't make any difference)

I'm using separate threads (in the same process) for queuing and dequeuing. I have a single global PersistentQueue object, and each thread creates, flushes, and disposes of its own PersistentQueueSession object.

This isn't a super big deal, as I have the service configured to restart, but every time this happens (roughly once a day), I lose about 30 seconds of data capture.

Below is the stack trace from the Windows Application log.

Application: ScannerCollectorService.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.AccessViolationException
   at Microsoft.Win32.Win32Native.MoveFile(System.String, System.String)
   at System.IO.File.InternalMove(System.String, System.String, Boolean)
   at System.IO.File.Move(System.String, System.String)
   at DiskQueue.Implementation.StandardFileDriver.Move(System.String, System.String)
   at DiskQueue.Implementation.StandardFileDriver.PrepareDelete(System.String)
   at DiskQueue.Implementation.StandardFileDriver.WaitDelete(System.String)
   at DiskQueue.Implementation.StandardFileDriver.AtomicWriteInternal(System.String, System.Action`1<System.IO.FileStream>)
   at DiskQueue.Implementation.StandardFileDriver.AtomicWrite(System.String, System.Action`1<DiskQueue.IBinaryWriter>)
   at DiskQueue.Implementation.PersistentQueueImpl.FlushTrimmedTransactionLog()
   at DiskQueue.Implementation.PersistentQueueImpl.CommitTransaction(System.Collections.Generic.ICollection`1<DiskQueue.Implementation.Operation>)
   at DiskQueue.Implementation.PersistentQueueSession.Flush()
   at ScannerCollectorService.ScannerDataCollector.queueReader_DoWork(System.Object, System.ComponentModel.DoWorkEventArgs)
   at System.ComponentModel.BackgroundWorker.OnDoWork(System.ComponentModel.DoWorkEventArgs)
   at System.ComponentModel.BackgroundWorker.WorkerThreadStart(System.Object)
   at System.Runtime.Remoting.Messaging.StackBuilderSink._PrivateProcessMessage(IntPtr, System.Object[], System.Object, System.Object[] ByRef)
   at System.Runtime.Remoting.Messaging.StackBuilderSink.AsyncProcessMessage(System.Runtime.Remoting.Messaging.IMessage, System.Runtime.Remoting.Messaging.IMessageSink)
   at System.Runtime.Remoting.Proxies.AgileAsyncWorkerItem.DoAsyncCall()
   at System.Runtime.Remoting.Proxies.AgileAsyncWorkerItem.ThreadPoolCallBack(System.Object)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(System.Object)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

Dequeued items are reversed when queue is Disposed without Flush

When queue Dispose is run, the list of dequeue operations to reinstate is fetched and iterated "in order".
This causes the items to be put back into the queue in wrong order

//original queue
//[ 1, 2, 3 ]

q.Dequeue()
q.Dequeue()

//queue now
//[3]

q.Dispose()

//queue after dispose
// [2, 1, 3]

fix:
in PersistentQueueImpl.cs line 369, change
from entry in reinstatedOperations
to
from entry in reinstatedOperations.Reverse()

This will cause the items to be reinstated in the order they were dequeued.

End of file reached while trying to read queue item

I am using DiskQueue v1.3.0 in a c# project on Windows Server [Win2K12R2, Win2K16, Win2K19] that runs as a native NT service. The interactions with the queue take place within the service's process, with one thread being the event provider and another thread being the event consumer. The event provider detects events of interest occurring on the server and places them in the on-disk queue as rapidly as possible while the event consumer is responsible for streaming events out to the network. All Enqueue() and Dequeue() method calls are immediately followed by a Flush() call out of a sense of paranoia about ensuring that the operations are committed to disk immediately.

After a couple of days [2 to 4] of continuous execution, I find that dequeue operations start to fail with the following exception being caught & logged:

System.InvalidOperationException: End of file reached while trying to read queue item
at DiskQueue.Implementation.PersistentQueueImpl.ReadEntriesFromFile(Entry firstEntry, Int64 currentBufferSize)
at DiskQueue.Implementation.PersistentQueueImpl.ReadAhead()
at DiskQueue.Implementation.PersistentQueueImpl.Dequeue()
at DiskQueue.Implementation.PersistentQueueSession.Dequeue()

The "while" loop in my code that dequeues events from the on-disk queue catches the exception, logs it, sleeps for 100ms and then iterates through the loop again. Once this problem starts occurring, the condition does not clear itself and my log files fill up with messages showing the same exception occurring repeatedly.

At the time that this happens, I find that the directory used by DiskQueue contains the following:

02/03/2021 06:45 AM

.
02/03/2021 06:45 AM ..
02/03/2021 12:13 AM 0 data.158
02/03/2021 03:46 AM 33,557,302 data.159
02/03/2021 06:45 AM 33,557,302 data.160
01/31/2021 07:28 PM 0 lock
02/03/2021 06:45 AM 12 meta.state
02/03/2021 06:45 AM 96,834 transaction.log
6 File(s) 67,211,450 bytes
2 Dir(s) 7,109,398,528 bytes free

Nothing appears unusual in any way w/respect to the number of data files, the sizes of the data files or the size of the transaction log.

Are there any known issues with DiskQueue that would cause the Dequeue() method to attempt to read data for an event/message which appears to be present based on metadata but which was never actually committed to the data files on disk?

Is there any recommendation regarding a fallback/recovery strategy short of disposing of the DiskQueue class instance and then instantiating a new one?

Is there any test that can be performed to verify that the on-disk queue files are corrupted in some way so that they can be deleted?

The process cannot access the file in v 1.6.0

Hi
I am playing around with this project to see if I can use this in my application. I need a thread safe and persistent way to queue data.

The problem i have now is that I am not able to catch the error below it's seems random, I am probably using the library wrong, so any pointer is appreciated.

The error:

Add Test 144 thrd: 9
Add Test 145 thrd: 9
Add Test 146 thrd: 9
Read: 138 Test 138 thrd: 9  thrd: 10
Read: 139 Test 139 thrd: 9  thrd: 10
Read: 140 Test 140 thrd: 9  thrd: 10
Read: 141 Test 141 thrd: 9  thrd: 10
Read: 142 Test 142 thrd: 9  thrd: 10
Read: 143 Test 143 thrd: 9  thrd: 10
System.IO.IOException: The process cannot access the file 'C:\Playground\TestDiskQueue\TestDiskQueue\bin\Debug\net6.0\testqueue\transaction.log' because it is being used by another process.
   at Microsoft.Win32.SafeHandles.SafeFileHandle.CreateFile(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options)
   at Microsoft.Win32.SafeHandles.SafeFileHandle.Open(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize)
   at System.IO.Strategies.OSFileStreamStrategy..ctor(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize)
   at System.IO.Strategies.FileStreamHelpers.ChooseStrategyCore(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize)
   at System.IO.Strategies.FileStreamHelpers.ChooseStrategy(FileStream fileStream, String path, FileMode mode, FileAccess access, FileShare share, Int32 bufferSize, FileOptions options, Int64 preallocationSize)
   at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess access, FileShare share, Int32 bufferSize, FileOptions options)
   at DiskQueue.Implementation.StandardFileDriver.OpenTransactionLog(String path, Int32 bufferLength)
   at DiskQueue.Implementation.PersistentQueueImpl.WaitForTransactionLog(Byte[] transactionBuffer)
Read: 144 Test 144 thrd: 9  thrd: 10
Read: 144 Test 144 thrd: 9  thrd: 10
Read: 145 Test 145 thrd: 9  thrd: 10
Read: 146 Test 146 thrd: 9  thrd: 10
Add Test 147 thrd: 9
Add Test 148 thrd: 9

The code:

using DiskQueue;

namespace TestDiskQueue
{
    public class RunTest : IDisposable
    {
        public static bool Pause { get; set; }
        private static IPersistentQueue<Test>? _persistentQueue = PersistentQueue.WaitFor<Test>(@".\testque", TimeSpan.FromSeconds(30));
        private bool disposedValue;

        public RunTest()
        {
        }

        public void Run()
        {
            var ran = new Random();

            var t1 = new Thread(() =>
            {
                var count = 0;
                while (true)
                {
                    if (ran.Next(0, 2000) > 1000 && !Pause)
                    {
                        var ts = new Test(++count, $"Test {count} thrd: {Thread.CurrentThread.ManagedThreadId}");
                        Console.WriteLine($"Add {ts.Name}");
                        try
                        {
                            using (var session = _persistentQueue?.OpenSession())
                            {
                                session?.Enqueue(ts);
                                session?.Flush();
                            }
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex);
                        }
                    }
                    Thread.Sleep(10);
                }
            });

            var t2 = new Thread(() =>
            {
                while (true)
                {
                    using (var session = _persistentQueue?.OpenSession())
                    {
                        Test? data;
                        try
                        {
                            Pause = true;
                            while ((data = session?.Dequeue()) != null)
                            {
                                Console.WriteLine($"Read: {data.Id} {data.Name}  thrd: {Thread.CurrentThread.ManagedThreadId}");
                                session?.Flush();
                            }
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex);
                            continue;
                        }
                        finally
                        {
                            Pause = false;
                            Thread.Sleep(500);
                        }
                    }
                }
            });

            t1.IsBackground = true;
            t2.IsBackground = true;
            t1.Start();
            t2.Start();
        }

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    _persistentQueue?.Dispose();
                }

                disposedValue = true;
            }
        }

        public void Dispose()
        {
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }
    }

    [Serializable]
    public record Test(int Id, string Name) { }
}

Console.WriteLine

Thank you for the excellent package!
I have a request: in several places in the code, you use Console.writeLine, which we can't override. Could this be addressed? Appreciate it!

Orit

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.