Giter Site home page Giter Site logo

pipelines.sockets.unofficial's Introduction

Pipelines.Sockets.Unofficial

This is a managed sockets connector for the System.IO.Pipelines API, intended to act as a stop-gap while there is no official such connector. Pipelines are pretty useless if you can't actually connect them to anything...

It draws inspiration from:

and aims to provide a high-performance implementation of the IDuplexPipe interface, providing both client and server APIs. At the moment the API is very preliminary.

Release Notes

Key APIs:

  • SocketConnection - interacting with a Socket as a pipe
  • StreamConnection - interacting with a Stream as a pipe, or a pipe as a Stream
  • Arena / Arena<T> / Sequence<T> / Reference<T> - arena allocation APIs

It is provided under the MIT license.

pipelines.sockets.unofficial's People

Contributors

adamsitnik avatar crocuis avatar eerhardt avatar jasper-d avatar maksimkim avatar mgravell avatar michalstrehovsky avatar nickcraver avatar patricksuo avatar vonzshik avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipelines.sockets.unofficial's Issues

Linux performance problem

echo program:
https://github.com/sillyousu/TryoutPipelineSockets

Linux

supei@sandbox-dev-hk:~/frame_echo/frame_echo_client$ dotnet EchoClient.dll  
5000 clients, payload 64 bytes, 2 rounds
Total Time Elapsed: 18469.8927 Milliseconds
0 error of 5000
connect	p90: 1,011.27ms  p95: 3,003.08ms    p99: 3,005.71ms  p99.9: 3,007.05ms
echo	p90: 113.15ms    p95: 288.96ms      p99: 3059.55ms   p99.9: 15394.67ms
total	p90: 1,257.19ms  p95: 3008.27ms     p99: 4809.88ms   p99.9: 18401.26ms

win:

~\source\dotnet\TryPipelineSockets\EchoClient> dotnet.exe .\bin\Release\netcoreapp2.2\EchoClient.dll
5000 clients, payload 64 bytes, 2 rounds
Total Time Elapsed: 1186.6827 Milliseconds
0 error of 5000
connect p90:9.31ms  p95:25.44ms p99:49.06ms     p99.9:500.18ms
echo    p90:61.87ms p95:70.42ms p99:95.84ms     p99.9:112.35ms
total   p90:75.67ms p95:95.45ms p99:137.40ms    p99.9:501.40ms

update: cleanup percentile table a little bit.

Fix Thread Safety and other static code analysis issues reported by Infer#

#0
/_/src/Pipelines.Sockets.Unofficial/Helpers.cs:160: error: Null Dereference
%0.Pipelines.Sockets.Unofficial.Helpers$<>c__DisplayClass4_0.failures could be null (last assigned on line 146) and is dereferenced in the call to Helpers..

#1
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:100: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because another access to the same memory occurs on a background thread, although this access may not.

#2
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:342: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.BeginRead(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#3
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:382: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.EndRead(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#4
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:483: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.ProcessDataFromAwaiterImpl() indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#5
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:32: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#6
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:40: warning: Thread Safety Violation
Read/Write race. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly reads with synchronization from this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead. Potentially races with unsynchronized write in method StreamConnection$AsyncPipeStream.Read(...).
Reporting because this access may occur on a background thread.


#0
/_/src/Pipelines.Sockets.Unofficial/Helpers.cs:160: error: Null Dereference
%0.Pipelines.Sockets.Unofficial.Helpers$<>c__DisplayClass4_0.failures could be null (last assigned on line 146) and is dereferenced in the call to Helpers..

#1
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:100: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because another access to the same memory occurs on a background thread, although this access may not.

#2
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:342: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.BeginRead(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#3
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:382: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.EndRead(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#4
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:483: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.ProcessDataFromAwaiterImpl() indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#5
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:32: warning: Thread Safety Violation
Unprotected write. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly writes to field this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead outside of synchronization.
Reporting because this access may occur on a background thread.

#6
/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:40: warning: Thread Safety Violation
Read/Write race. Non-private method StreamConnection$AsyncPipeStream.Read(...) indirectly reads with synchronization from this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead. Potentially races with unsynchronized write in method StreamConnection$AsyncPipeStream.Read(...).
Reporting because this access may occur on a background thread.

Protocol with length in front

Protocol of database Tarantool requires you to write length of packet in front of it, it can be kinda tricky, you know, since protocol is msgpack and user can use strings, so I don't know that length until I do full serialization.

I do simple test now (sc is SocketConnection):

var offset = 10;
var bytes = sc.Output.GetMemory(100);
var length = Write(bytes.Span.Slice(offset, bytes.Length - offset));

bytes = sc.Output.GetMemory(100); // <--- here we got same buffer as above
length += Write(bytes.Span);

length += AddCrLf(bytes.Span.Slice(bytes.Length - 2));

Encoding.ASCII.GetBytes(length.ToString(), bytes.Span.Slice(0, offset));

sc.Output.Advance(length + offset);

And I get this exception:

Unhandled Exception: System.InvalidOperationException: Can't advance past buffer size.
   at System.IO.Pipelines.ThrowHelper.ThrowInvalidOperationException_AdvancingPastBufferSize()
   at System.IO.Pipelines.Pipe.DefaultPipeWriter.Advance(Int32 bytes)
   at pipeline.client.Program.WorkWithSc(SocketConnection sc) in C:\Users\apopov\source\repos\github.com\aensidhe\pipeline.socket.unofficial.reproduction\pipeline.client\Program.cs:line 46
   at pipeline.client.Program.Main(String[] args) in C:\Users\apopov\source\repos\github.com\aensidhe\pipeline.socket.unofficial.reproduction\pipeline.client\Program.cs:line 23

How should I proceed in cases where my packet is not yet written because it's too large and I don't know length of it yet?

It seems that if I ask at second time for more memory that I did first time, pipe throws all the stuff away and I need to redo serialization from scratch. It's easier for writing, but it will be not as efficient as it could be.

var bytes = sc.Output.GetMemory(100);
var length = Write(bytes.Span.Slice(offset, bytes.Length - offset));

bytes = sc.Output.GetMemory(bytes.Length + 1); // here we get empty buffer with zeroes in it

PS: protocol in illustration is ascii based with \r\n as packet separation for clarity and pipeline studying.

SocketConnection pattern guidance

I'm using SocketConnections to handle all the unpleasantness of buffer reconstruction which requires that write something to handle incoming data in async ways and I wondering whether the implementation I'm arriving at is safe and relatively performant, I'm not particularly well versed in TAP subtleties.

This basic implementation needs to wrap a socket and keep receiving data then feeding it into a parser, Valid message output from the parser will be dispatched through an event and the data passed out through that event should hopefully remain alive until the handler returns so that large payloads don't end up being copied about. Could you look and comment on whether this is appropriate/correct/improvable?

Unnecessary _queue.Count check in DedicatedThreadPoolPipeScheduler.RunWorkLoop

Specifically, if (_queue.Count == 0):

while (_queue.Count == 0)
{
if (_disposed) break;
_availableCount++;
Monitor.Wait(_queue);
_availableCount--;
}
if (_queue.Count == 0)
{
if (_disposed) break;
else continue;
}

The only way it can be true if the previous check (while (_queue.Count == 0)) exited due to _disposed = true.
if (_queue.Count == 0) can replaced with changing if (_disposed) break; to if (_disposed) return;.

Trouble interpretting Jetbrains DotTrace backtraces involving Pipelines.Sockets.Unofficial via StackExchange.Redis

Hi,
Firstly, StackExchange.Redis is an amazing library and I'm loving the improvements in stability to it since you've switched over to using piplines.

I think I may be doing something wrong, but I can't find a way forward myself and this seems to be originating in this library so thought I would ask some advice.

Have been playing with a toy project to get into .NET core development and pick up tools I haven't used in while (like dottrace) - and ran into trouble interpretting its output to try and find the slow/hot path in the app when SE.Redis is involved:

image

Specifically I'm trying to backtrace out to which parts of my code are initiating the calls in the first place - but dottrace can't see any 'higher' than:
System.Threading.ExecutionContext.RunInternal(ExecutionContext, ContextCallback, Object) which immediately calls into Pipelines.Sockets.Unofficial.DedicatedThreadPoolPipeScheduler.RunWorkLoop().

Given that I never used to have this problem I am making the assumption that through the DedicatedThreadPoolPipeScheduler tasks are being scheduled in a way that means dottrace can't piece together the stack and so my trail runs cold.

So really a few questions:

  1. Is my analysis right? Or way off the mark.
  2. Is there anything I can do about it? Or is this a bug with dottrace? Or am I being an idiot and I'm just missing something basic.
  3. If dottrace is blocked by this, are there other tools (perfview?) that might be worth learning to help diagnose issues?

TLS/SSL support

Hi! Can we expect SSL/TLS support? I've googled this SO question, and looks like the proposed workaround can be generic and included in this library.

if the implementation of this library has relation ship for Rio(Winsock registered I/O extensions) in Windows?

It seems that System.IO.Pipelines in .net core has some confusing concepts with RIO.It is said that System.IO.Pipelines does not apply to UDP protocol, but only to TCP protocol.
However, RIO is not related to the Socket protocol. but RIO is only supported on windows 2012+.

http://www.serverframework.com/asynchronousevents/2012/08/winsock-registered-io-io-completion-port-performance.html

UDP Sockets

One question:

Is this supposed to work with UDP sockets as well?

Did somebody try this?

Thanks,

softworkz

Thread Safety Violation Warnings found by InferSharp static analysis tool

I am using StackExchange.Redis v2.6.86 (which uses Pipelines.Sockets.Unofficial v2.2.2 as a dependency) in my .NET 7 API. I am using the new Microsoft Static Analysis Tool (InferSharp) to analyze my code as well as dependent packages and I found a few warnings about Thread Safety Violation (and one Null Dereference Warning):

/_/src/Pipelines.Sockets.Unofficial/Helpers.cs:160: error: Null Dereference
  `%0.Pipelines.Sockets.Unofficial.Helpers$<>c__DisplayClass4_0.failures` could be null (last assigned on line 146) and is dereferenced in the call to `Helpers.`.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:100: warning: Thread Safety Violation
  Unprotected write. Non-private method `StreamConnection$AsyncPipeStream.Read(...)` indirectly writes to field `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead` outside of synchronization.
 Reporting because another access to the same memory occurs on a background thread, although this access may not.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:342: warning: Thread Safety Violation
  Unprotected write. Non-private method `StreamConnection$AsyncPipeStream.BeginRead(...)` indirectly writes to field `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead` outside of synchronization.
 Reporting because this access may occur on a background thread.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:382: warning: Thread Safety Violation
  Unprotected write. Non-private method `StreamConnection$AsyncPipeStream.EndRead(...)` indirectly writes to field `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead` outside of synchronization.
 Reporting because this access may occur on a background thread.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.cs:483: warning: Thread Safety Violation
  Unprotected write. Non-private method `StreamConnection$AsyncPipeStream.ProcessDataFromAwaiterImpl()` indirectly writes to field `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead` outside of synchronization.
 Reporting because this access may occur on a background thread.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:32: warning: Thread Safety Violation
  Unprotected write. Non-private method `StreamConnection$AsyncPipeStream.Read(...)` indirectly writes to field `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead` outside of synchronization.
 Reporting because this access may occur on a background thread.

/_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncPipeStream.netcoreapp21.cs:40: warning: Thread Safety Violation
  Read/Write race. Non-private method `StreamConnection$AsyncPipeStream.Read(...)` indirectly reads with synchronization from `this.Pipelines.Sockets.Unofficial.StreamConnection$AsyncPipeStream._pendingRead`. Potentially races with unsynchronized write in method `StreamConnection$AsyncPipeStream.Read(...)`.
 Reporting because this access may occur on a background thread.

Flush data to socket without OnReaderCompleted

I have a SocketConnection and push a fixed amount (say 10 MiB) of data into the PipeWriter to be sent to the client.

In the past, after I had written all my data, I would wait for the OnReaderCompleted "event" before disposing the socketConnection to make sure all bytes in the Pipe have been flushed to the socket. Otherwise, the client would not receive all bytes.

What can I do in .NET Core 3 between writing the last byte and closing the socket?

Investigating SemaphoreSlim on net6.0+

Issue for tracking this discussion (so I can also put benchmarks somewhere).

Getting benchmarks running again for a discussion around going back to SemaphoreSlim now that the async issue has been fixed. Current benchmark run shows positive results there:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22000
AMD Ryzen 9 5900HX with Radeon Graphics, 1 CPU, 16 logical and 8 physical cores
.NET SDK=6.0.101
  [Host]               : .NET 6.0.1 (6.0.121.56705), X64 RyuJIT
  Job-LZCCVU           : .NET 6.0.1 (6.0.121.56705), X64 RyuJIT
  Job-MZIHQK           : .NET Framework 4.8 (4.8.4420.0), X64 RyuJIT
  .NET 6.0             : .NET 6.0.1 (6.0.121.56705), X64 RyuJIT
  .NET Framework 4.7.2 : .NET Framework 4.8 (4.8.4420.0), X64 RyuJIT
Method Job Runtime Toolchain Mean Error StdDev Min Max Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
Monitor_Sync Job-LZCCVU .NET 6.0 net6.0 5.526 ns 0.0613 ns 0.0573 ns 5.434 ns 5.615 ns 0.93 0.02 - - - -
Monitor_Sync Job-MZIHQK .NET Framework 4.7.2 net472 5.928 ns 0.1174 ns 0.1098 ns 5.753 ns 6.115 ns 1.00 0.00 - - - -
Monitor_Sync .NET 6.0 .NET 6.0 Default 5.337 ns 0.0380 ns 0.0317 ns 5.250 ns 5.378 ns 0.90 0.02 - - - -
Monitor_Sync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 6.172 ns 0.0383 ns 0.0358 ns 6.110 ns 6.215 ns 1.04 0.02 - - - -
SemaphoreSlim_Sync Job-LZCCVU .NET 6.0 net6.0 18.246 ns 0.2540 ns 0.2375 ns 17.968 ns 18.590 ns 0.28 0.00 - - - -
SemaphoreSlim_Sync Job-MZIHQK .NET Framework 4.7.2 net472 65.291 ns 0.5218 ns 0.4357 ns 64.263 ns 66.220 ns 1.00 0.00 - - - -
SemaphoreSlim_Sync .NET 6.0 .NET 6.0 Default 18.510 ns 0.3072 ns 0.2874 ns 18.105 ns 19.075 ns 0.28 0.00 - - - -
SemaphoreSlim_Sync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 63.148 ns 0.3578 ns 0.3347 ns 62.689 ns 63.714 ns 0.97 0.01 - - - -
SemaphoreSlim_Async Job-LZCCVU .NET 6.0 net6.0 20.920 ns 0.2461 ns 0.2302 ns 20.527 ns 21.220 ns 0.36 0.00 - - - -
SemaphoreSlim_Async Job-MZIHQK .NET Framework 4.7.2 net472 57.513 ns 0.5600 ns 0.5238 ns 56.611 ns 58.147 ns 1.00 0.00 - - - -
SemaphoreSlim_Async .NET 6.0 .NET 6.0 Default 21.221 ns 0.1525 ns 0.1426 ns 20.910 ns 21.382 ns 0.37 0.00 - - - -
SemaphoreSlim_Async .NET Framework 4.7.2 .NET Framework 4.7.2 Default 55.002 ns 0.0908 ns 0.0805 ns 54.828 ns 55.092 ns 0.96 0.01 - - - -
SemaphoreSlim_Async_HotPath Job-LZCCVU .NET 6.0 net6.0 15.182 ns 0.1708 ns 0.1598 ns 14.925 ns 15.495 ns 0.30 0.01 - - - -
SemaphoreSlim_Async_HotPath Job-MZIHQK .NET Framework 4.7.2 net472 50.912 ns 0.8782 ns 0.8215 ns 49.626 ns 52.584 ns 1.00 0.00 - - - -
SemaphoreSlim_Async_HotPath .NET 6.0 .NET 6.0 Default 14.724 ns 0.1402 ns 0.1311 ns 14.473 ns 14.935 ns 0.29 0.01 - - - -
SemaphoreSlim_Async_HotPath .NET Framework 4.7.2 .NET Framework 4.7.2 Default 47.183 ns 0.1631 ns 0.1525 ns 46.863 ns 47.457 ns 0.93 0.01 - - - -
MutexSlim_Sync Job-LZCCVU .NET 6.0 net6.0 22.292 ns 0.1948 ns 0.1627 ns 22.014 ns 22.564 ns 0.52 0.01 - - - -
MutexSlim_Sync Job-MZIHQK .NET Framework 4.7.2 net472 43.145 ns 0.3944 ns 0.3689 ns 42.555 ns 43.827 ns 1.00 0.00 - - - -
MutexSlim_Sync .NET 6.0 .NET 6.0 Default 22.173 ns 0.1043 ns 0.0975 ns 22.052 ns 22.354 ns 0.51 0.01 - - - -
MutexSlim_Sync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 41.903 ns 0.2398 ns 0.2243 ns 41.381 ns 42.193 ns 0.97 0.01 - - - -
MutexSlim_Async Job-LZCCVU .NET 6.0 net6.0 42.810 ns 0.4583 ns 0.4287 ns 41.971 ns 43.376 ns 0.56 0.01 - - - -
MutexSlim_Async Job-MZIHQK .NET Framework 4.7.2 net472 76.444 ns 0.3811 ns 0.3379 ns 75.667 ns 76.857 ns 1.00 0.00 - - - -
MutexSlim_Async .NET 6.0 .NET 6.0 Default 42.568 ns 0.2998 ns 0.2804 ns 42.157 ns 42.997 ns 0.56 0.00 - - - -
MutexSlim_Async .NET Framework 4.7.2 .NET Framework 4.7.2 Default 72.216 ns 0.2122 ns 0.1985 ns 71.871 ns 72.586 ns 0.94 0.01 - - - -
MutexSlim_Async_HotPath Job-LZCCVU .NET 6.0 net6.0 29.913 ns 0.5884 ns 0.6776 ns 29.128 ns 30.955 ns 0.54 0.01 - - - -
MutexSlim_Async_HotPath Job-MZIHQK .NET Framework 4.7.2 net472 55.409 ns 0.7513 ns 0.6660 ns 54.379 ns 56.648 ns 1.00 0.00 - - - -
MutexSlim_Async_HotPath .NET 6.0 .NET 6.0 Default 29.443 ns 0.1544 ns 0.1444 ns 29.206 ns 29.638 ns 0.53 0.01 - - - -
MutexSlim_Async_HotPath .NET Framework 4.7.2 .NET Framework 4.7.2 Default 53.680 ns 0.3121 ns 0.2919 ns 53.190 ns 54.231 ns 0.97 0.01 - - - -
MutexSlim_ConcurrentLoadAsync Job-LZCCVU .NET 6.0 net6.0 2,120.714 ns 28.5866 ns 26.7399 ns 2,061.812 ns 2,149.688 ns 0.74 0.02 0.0125 - - 125 B
MutexSlim_ConcurrentLoadAsync Job-MZIHQK .NET Framework 4.7.2 net472 2,883.994 ns 46.5535 ns 41.2685 ns 2,762.735 ns 2,927.068 ns 1.00 0.00 0.0438 0.0063 0.0031 284 B
MutexSlim_ConcurrentLoadAsync .NET 6.0 .NET 6.0 Default 2,109.083 ns 17.1215 ns 16.0154 ns 2,072.544 ns 2,126.885 ns 0.73 0.01 0.0125 - - 125 B
MutexSlim_ConcurrentLoadAsync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 2,982.906 ns 11.4533 ns 9.5640 ns 2,967.364 ns 2,995.070 ns 1.04 0.02 0.0406 0.0063 0.0031 279 B
MutexSlim_ConcurrentLoadAsync_DisableContext Job-LZCCVU .NET 6.0 net6.0 2,077.706 ns 16.0061 ns 14.9721 ns 2,041.016 ns 2,094.909 ns 0.74 0.02 0.0063 - - 61 B
MutexSlim_ConcurrentLoadAsync_DisableContext Job-MZIHQK .NET Framework 4.7.2 net472 2,830.493 ns 54.1615 ns 57.9522 ns 2,658.874 ns 2,864.015 ns 1.00 0.00 0.0313 0.0031 - 223 B
MutexSlim_ConcurrentLoadAsync_DisableContext .NET 6.0 .NET 6.0 Default 2,007.664 ns 14.7015 ns 13.7518 ns 1,980.703 ns 2,029.217 ns 0.71 0.02 0.0063 - - 61 B
MutexSlim_ConcurrentLoadAsync_DisableContext .NET Framework 4.7.2 .NET Framework 4.7.2 Default 2,863.572 ns 8.1600 ns 6.8140 ns 2,853.829 ns 2,875.984 ns 1.02 0.03 0.0344 0.0031 - 223 B
SemaphoreSlim_ConcurrentLoadAsync Job-LZCCVU .NET 6.0 net6.0 2,084.316 ns 4.5909 ns 4.0698 ns 2,072.476 ns 2,090.046 ns 0.55 0.01 0.0656 0.0063 - 547 B
SemaphoreSlim_ConcurrentLoadAsync Job-MZIHQK .NET Framework 4.7.2 net472 3,812.444 ns 42.4014 ns 37.5877 ns 3,760.819 ns 3,890.354 ns 1.00 0.00 0.2250 0.0125 - 1,449 B
SemaphoreSlim_ConcurrentLoadAsync .NET 6.0 .NET 6.0 Default 2,124.172 ns 15.9106 ns 14.1043 ns 2,077.397 ns 2,135.378 ns 0.56 0.01 0.0656 0.0063 - 547 B
SemaphoreSlim_ConcurrentLoadAsync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 3,596.999 ns 31.8994 ns 29.8387 ns 3,540.641 ns 3,652.883 ns 0.94 0.01 0.2250 0.0125 - 1,448 B
AsyncSemaphore_ConcurrentLoadAsync Job-LZCCVU .NET 6.0 net6.0 2,027.560 ns 40.4162 ns 51.1136 ns 1,823.880 ns 2,072.102 ns 0.70 0.03 0.0188 - - 163 B
AsyncSemaphore_ConcurrentLoadAsync Job-MZIHQK .NET Framework 4.7.2 net472 2,911.152 ns 57.9191 ns 59.4786 ns 2,767.561 ns 2,985.664 ns 1.00 0.00 0.0500 0.0125 0.0031 328 B
AsyncSemaphore_ConcurrentLoadAsync .NET 6.0 .NET 6.0 Default 2,192.759 ns 42.9130 ns 60.1580 ns 1,910.755 ns 2,232.827 ns 0.75 0.03 0.0188 - - 163 B
AsyncSemaphore_ConcurrentLoadAsync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 3,017.960 ns 13.2881 ns 11.7795 ns 3,001.102 ns 3,039.382 ns 1.04 0.02 0.0500 0.0094 0.0031 328 B
AsyncSemaphore_Sync Job-LZCCVU .NET 6.0 net6.0 15.422 ns 0.2894 ns 0.2565 ns 15.093 ns 15.865 ns 0.75 0.01 - - - -
AsyncSemaphore_Sync Job-MZIHQK .NET Framework 4.7.2 net472 20.470 ns 0.2239 ns 0.2094 ns 20.101 ns 20.818 ns 1.00 0.00 - - - -
AsyncSemaphore_Sync .NET 6.0 .NET 6.0 Default 15.330 ns 0.2029 ns 0.1898 ns 15.131 ns 15.654 ns 0.75 0.01 - - - -
AsyncSemaphore_Sync .NET Framework 4.7.2 .NET Framework 4.7.2 Default 29.478 ns 0.5677 ns 0.6074 ns 27.719 ns 30.165 ns 1.44 0.04 - - - -
AsyncSemaphore_Async Job-LZCCVU .NET 6.0 net6.0 19.468 ns 0.1271 ns 0.1189 ns 19.285 ns 19.687 ns 0.73 0.00 - - - -
AsyncSemaphore_Async Job-MZIHQK .NET Framework 4.7.2 net472 26.625 ns 0.1710 ns 0.1600 ns 26.295 ns 26.857 ns 1.00 0.00 - - - -
AsyncSemaphore_Async .NET 6.0 .NET 6.0 Default 18.151 ns 0.1799 ns 0.1682 ns 17.834 ns 18.381 ns 0.68 0.01 - - - -
AsyncSemaphore_Async .NET Framework 4.7.2 .NET Framework 4.7.2 Default 24.769 ns 0.1090 ns 0.0910 ns 24.530 ns 24.856 ns 0.93 0.01 - - - -
AsyncSemaphore_Async_HotPath Job-LZCCVU .NET 6.0 net6.0 16.201 ns 0.3189 ns 0.5058 ns 15.063 ns 17.121 ns 0.79 0.02 - - - -
AsyncSemaphore_Async_HotPath Job-MZIHQK .NET Framework 4.7.2 net472 20.112 ns 0.1511 ns 0.1340 ns 19.905 ns 20.335 ns 1.00 0.00 - - - -
AsyncSemaphore_Async_HotPath .NET 6.0 .NET 6.0 Default 14.742 ns 0.1243 ns 0.1163 ns 14.502 ns 14.923 ns 0.73 0.01 - - - -
AsyncSemaphore_Async_HotPath .NET Framework 4.7.2 .NET Framework 4.7.2 Default 20.343 ns 0.1257 ns 0.1115 ns 20.154 ns 20.541 ns 1.01 0.01 - - - -

cc @mgravell ^ I can take a poke at what a swap looks like this week just wanted to post numbers up!

[Question][SocketConnection] Why do I need to read to send data?

Disclaimer: probably, in production code this will not be a problem.

I'm playing with library, trying to understand how to plug it in my code. I do basically this:

await SocketConnection().ConnectAsync();
await Read();
await Send();
await Flush();
await Read(); // <--- If I don't do this read, data from Send above probably will never be sent

Questions:

  1. Should I be worried about it?
  2. Do I use library in intended way?

Possible race condition

I'm writing a .net core library on top of Pipelines.Sockets.Unofficial.

I found Pipelines.Sockets.Unofficial.SocketConnection.DoSend throw random exceptions ( ArgumentNullException/ArgumentOutOfRangeException etc) from
https://github.com/dotnet/corefx/blob/7e484cac1442b0c3e1dcb1f42f43c4cd7d8ba4e5/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.cs#L177

The BufferList was corrupted somehow. It's seems like a race condition bug/heap corruption.
I have not track down the root cause nor have any theory yet.

19:09:47 Error DoSend excpetion 2 System.IO.IOException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index ---> System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
   at System.Collections.Generic.List`1.get_Item(Int32 index)
   at System.Net.Sockets.SocketAsyncEventArgs.set_BufferList(IList`1 value)
   at Pipelines.Sockets.Unofficial.SocketConnection.DoSend(Socket socket, SocketAwaitableEventArgs args, ReadOnlySequence`1& buffer, String name) in C:\code\Pipelines.Sockets.Unofficial\src\Pipelines.Sockets.Unofficial\SocketConnection.Send.cs:line 159
   at Pipelines.Sockets.Unofficial.SocketConnection.DoSendAsync() in C:\code\Pipelines.Sockets.Unofficial\src\Pipelines.Sockets.Unofficial\SocketConnection.Send.cs:line 64
   --- End of inner exception stack trace ---
   at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetFlushResult(FlushResult& result)
   at System.IO.Pipelines.Pipe.GetFlushAsyncResult()
   at System.Threading.Tasks.ValueTask`1.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location where exception was thrown ---
   at Kaka.Clusters.RemoteMessageAgent.DoSend(CancellationTokenSource cancellation, FrameProtocol protocol) in C:\Users\supei\source\dotnet\Kaka\src\Kaka\Clusters\RemoteMessageAgent.cs:line 288    at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetFlushResult(FlushResult& result)
   at System.IO.Pipelines.Pipe.GetFlushAsyncResult()
   at System.Threading.Tasks.ValueTask`1.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location where exception was thrown ---
   at Kaka.Clusters.RemoteMessageAgent.DoSend(CancellationTokenSource cancellation, FrameProtocol protocol) in C:\Users\supei\source\dotnet\Kaka\src\Kaka\Clusters\RemoteMessageAgent.cs:line 288
19:25:30 Error DoSend excpetion 2 System.IO.IOException: Value cannot be null.
Parameter name: segment ---> System.ArgumentNullException: Value cannot be null.
Parameter name: segment
   at System.Net.RangeValidationHelpers.ValidateSegment(ArraySegment`1 segment)
   at System.Net.Sockets.SocketAsyncEventArgs.set_BufferList(IList`1 value)
   at Pipelines.Sockets.Unofficial.SocketConnection.DoSend(Socket socket, SocketAwaitableEventArgs args, ReadOnlySequence`1& buffer, String name) in C:\code\Pipelines.Sockets.Unofficial\src\Pipelines.Sockets.Unofficial\SocketConnection.Send.cs:line 159
   at Pipelines.Sockets.Unofficial.SocketConnection.DoSendAsync() in C:\code\Pipelines.Sockets.Unofficial\src\Pipelines.Sockets.Unofficial\SocketConnection.Send.cs:line 64
   --- End of inner exception stack trace ---
   at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetFlushResult(FlushResult& result)
   at System.IO.Pipelines.Pipe.GetFlushAsyncResult()
   at System.Threading.Tasks.ValueTask`1.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location where exception was thrown ---
   at Kaka.Clusters.RemoteMessageAgent.DoSend(CancellationTokenSource cancellation, FrameProtocol protocol) in C:\Users\supei\source\dotnet\Kaka\src\Kaka\Clusters\RemoteMessageAgent.cs:line 288    at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetFlushResult(FlushResult& result)
   at System.IO.Pipelines.Pipe.GetFlushAsyncResult()
   at System.Threading.Tasks.ValueTask`1.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location where exception was thrown ---
   at Kaka.Clusters.RemoteMessageAgent.DoSend(CancellationTokenSource cancellation, FrameProtocol protocol) in C:\Users\supei\source\dotnet\Kaka\src\Kaka\Clusters\RemoteMessageAgent.cs:line 288

How do I flush synchronously?

TLDR; I noticed I seemingly have to call SocketConnection.Output.FlushAsync to get messages sent to the client, but all of my code is synchronous. How do I flush synchronously, or can I just use _ = SocketConnection.Output.FlushAsync().Result?

Longer version:
I have a sample length prefixed protocol writing messages to a client like this:

class MyMessage
{
    public string Text { get; set; }
}

// somewhere create a SocketConnection from an accepted Socket:
SocketConnection connection = SocketConection.Create(acceptedSocket);

// somewhere send a message:
WriteMessage(connection.Output, new() { Text = "Blablabla" });

void WriteMessage(MyMessage message, IBufferWriter<byte> output)
{
    int textLength = message.Text.GetLength();
    int messageLength = sizeof(int) + textLength;
    Span<byte> span = output.GetSpan(messageLength);

    BinaryPrimitives.WriteInt32BigEndian(span, textLength);
    Encoding.ASCII.GetBytes(message.Text, span[sizeof(int)..]);

    output.Advance(messageLength);
}

Using this, the client never receives this message, despite my call to connection.Output.Advance(messageLength), which I thought would be enough to kick off sending data.

I experimented around a bit and noticed that if I append a _ = client.Connection.Output.FlushAsync().Result to the end of the SendMessage method, it starts to receive messages.

That looks like an ugly hack as I did not find a way to flush synchronously. Am I missing something here or can I just go with this?

Test: MutexSlim_ConcurrentLoadAsync_DisableContext - fails on my machine

Output of build.cmd

[xUnit.net 00:00:12.23]     Pipelines.Sockets.Unofficial.Tests.LockBenchmarkTests.MutexSlim_ConcurrentLoadAsync_DisableContext [FAIL]
  X Pipelines.Sockets.Unofficial.Tests.LockBenchmarkTests.MutexSlim_ConcurrentLoadAsync_DisableContext [2s 544ms]
  Error Message:
   System.InvalidOperationException : expected 2500 but was 2451
  Stack Trace:
     at Benchmark.Utils.AssertIs(Int32 actual, Int32 expected) in /_/tests/Benchmark/Utils.cs:line 13
   at Benchmark.LockBenchmarks.<MutexSlim_ConcurrentLoadAsync_DisableContext>d__14.MoveNext() in /_/tests/Benchmark/LockBenchmarks.cs:line 230
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Pipelines.Sockets.Unofficial.Tests.BenchmarkTests`1.<Run>d__12`1.MoveNext() in /_/tests/Pipelines.Sockets.Unofficial.Tests/BenchmarkTests.cs:line 63
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
  Standard Output Messages:
  Standard Output Messages:
 failed at i=1,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=2,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=3,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=4,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=5,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=6,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=7,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=8,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=9,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=10,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=11,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=12,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=13,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=14,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=15,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=16,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=17,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=18,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=19,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=20,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=21,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=22,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=23,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=24,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=25,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=26,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=27,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=28,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=29,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=30,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=31,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=32,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=33,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=34,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=35,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=36,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=37,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=38,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=39,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=40,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=41,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=42,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=43,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=44,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=45,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=46,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=47,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=48,t=0,MutexSlim, Succeeded, 0 pending
 failed at i=49,t=0,MutexSlim, Succeeded, 0 pending

How to find out when/if the reader/writer completed

Hello,
Firstly, thank you for this nice library ๐Ÿ‘

My situation is, that I need a way to find out if a socket connection is closed or some kind of event which is fired when the reader or writer completed writing to the Pipe. Is there currently any way to get this information? If not, could it be added?
Thank you

Pump a pair of PipeReader/PipeWriter

I'm working on adding TLS support to RSocket.NET library. Currently, the library runs two tasks for reading from socket and writing to a PipeWriter and reading from PipeReader and writing to socket:

while (true)
{
	var result = await Back.Input.ReadAsync();
	var buffer = result.Buffer;
	var consumed = buffer.Start;        //RSOCKET Framing

	try
	{
		if (result.IsCanceled) { break; }
		if (!buffer.IsEmpty)
		{
			try
			{
				consumed = await socket.SendAsync(buffer, buffer.Start, SocketFlags.None);     //RSOCKET Framing
			}
			catch (Exception)
			{
				if (!Aborted) { /*Log.ErrorWritingFrame(_logger, ex);*/ }
				break;
			}
		}
		else if (result.IsCompleted) { break; }
	}
	finally
	{
		Back.Input.AdvanceTo(consumed, buffer.End);     //RSOCKET Framing
    }
}	
while (!token.IsCancellationRequested)
{
	var memory = Back.Output.GetMemory(out var memoryframe, haslength: true);    //RSOCKET Framing
	var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment); Debug.Assert(isArray);
	var received = await socket.ReceiveAsync(arraySegment, SocketFlags.None);   //TODO Cancellation?

	Back.Output.Advance(received);
	var flushResult = await Back.Output.FlushAsync();
	if (flushResult.IsCanceled || flushResult.IsCompleted) { break; }
}

I added support for secure connections by wrapping the socket in SslStream and using StreamConnection.GetDuplex from this project as it was advised in one of the issues.

Is it possible to go further and get rid of the "manual pumping" between the PipeReader/PipeWriter?

The blogpost at Pipe Dreams, part 2 mentions that PipeScheduler can be used to "start these two pumps" but doesn't have any concrete example. Does this project provide any such mechanism?

Question on StreamConnection.GetReader

The call to StreamConnection.GetReader creates an AsyncStreamPipe. In the ctor of AsyncStreamPipe
if read is true, it ensures the stream is writable and if not throws an exception.
and if write is true it checks to ensure the stream is readable
because of this, when trying to get a reader for a readonly file, an exception is thrown.

How can we get a reader for a readonly stream?

Socket callbacks spill to the main thread pool on Linux

We are in the process of migrating our .NET project from Framework to Core (and from Windows to Linux) and, during peak hours and thread starvation, we are getting a lot of Redis timeouts.

It's best illustrated with an MRE.

Here's what the script does:

  1. Creates and connects a Redis multiplexer.
  2. Saturates the thread pool with lots of units of work that do nothing except sleeping.
  3. Runs and awaits a single GET from Redis.

Run it by installing dotnet tool install -g dotnet-script and then running dotnet script mre.csx

#nullable enable

#r "nuget: StackExchange.Redis, 2.7.33"

using System.Runtime.CompilerServices;
using System.Threading;
using StackExchange.Redis;
using StackExchange.Redis.Profiling;

const int sleepTimeMilliseconds = 7000;
const int extraSleepingThreads = 10;
const string redisConnectionString = "127.0.0.1:7200";

ThreadPool.GetMinThreads(out var workerThreads, out var completionPortThreads);
WriteLine($"Min worker threads: {workerThreads}, min completion port threads: {completionPortThreads}");

var sleepingThreadCount = workerThreads + extraSleepingThreads;

var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(redisConnectionString, options =>
{
    options.ClientName = "redis-timeout-test";
}, Error);
await connectionMultiplexer.GetDatabase(0).StringGetAsync("test");
var profilingSession = new ProfilingSession();
connectionMultiplexer.RegisterProfiler(() => profilingSession);


void PrintStats(string stage, int id, [CallerMemberName] string? caller = null)
{
    WriteLine($"{caller}\t{id}\t{stage}\t{Thread.CurrentThread.ManagedThreadId}");
}

void Sleep(object? state)
{
    var i = (int)state!;
    try
    {
        PrintStats("start", i);
        Thread.Sleep(sleepTimeMilliseconds);
        PrintStats("end", i);
    }
    catch
    {
        PrintStats("exception", i);
        throw;
    }
}

async Task Redis(int i)
{
    try
    {
        PrintStats("start", i);
        await connectionMultiplexer.GetDatabase().StringGetAsync("test");
        PrintStats("end", i);
    }
    catch
    {
        PrintStats("exception", i);
        throw;
    }
}

for (var i = 0; i < sleepingThreadCount; i++)
{
    ThreadPool.QueueUserWorkItem(Sleep, i);
}

await Redis(0);
foreach (var profiledCommand in profilingSession.FinishProfiling())
{
    WriteLine(profiledCommand);
}

Our expectation is that thread starvation should not affect the inner workings of Redis client, and that the request should succeed.

It works on Windows (both in .NET 4.7.2 and .NET 8), but not on Linux.

On Linux, it fails with a timeout:

Sleep   0       start   14
Sleep   3       start   8
Sleep   4       start   11
Sleep   5       start   7
Sleep   1       start   12
Redis   0       start   5
Sleep   6       start   13
Sleep   2       start   10
Sleep   7       start   9
Sleep   8       start   15
Sleep   9       start   29
Sleep   10      start   30
Sleep   11      start   31
Sleep   12      start   5
Sleep   13      start   32
Sleep   14      start   33
Sleep   15      start   34
Sleep   16      start   35
Sleep   17      start   36
Redis   0       exception       37
StackExchange.Redis.RedisTimeoutException: Timeout awaiting response (outbound=0KiB, inbound=0KiB, 5461ms elapsed, timeout is 5000ms), command=GET, next: GET test, inst: 0, qu: 0, qs: 1, aw: False, bw: Inactive, rs: ReadAsync, ws: Idle, in: 5, in-pipe: 0, out-pipe: 0, last-in: 0, cur-in: 0, sync-ops: 0, async-ops: 3, serverEndpoint: 127.0.0.1:7201, conn-sec: 5,49, aoc: 1, mc: 1/1/0, mgr: 10 of 10 available, clientName: redis-timeout-test, PerfCounterHelperkeyHashSlot: 6918, IOCP: (Busy=0,Free=1000,Min=1,Max=1000), WORKER: (Busy=19,Free=32748,Min=12,Max=32767), POOL: (Threads=19,QueuedItems=5,CompletedItems=101,Timers=1), v: 2.7.33.41805 (Please take a look at this article for some common client-side issues that can cause timeouts: https://stackexchange.github.io/StackExchange.Redis/Timeouts)
   at Submission#0.<Redis>d__11.MoveNext() in /mnt/c/Users/******/Projects/redis-test/main.csx:line 54
--- End of stack trace from previous location ---
   at Submission#0.<<Initialize>>d__0.MoveNext() in /mnt/c/Users/******/Projects/redis-test/main.csx:line 69
--- End of stack trace from previous location ---
   at Dotnet.Script.Core.ScriptRunner.Execute[TReturn](String dllPath, IEnumerable`1 commandLineArgs) in C:\Users\runneradmin\AppData\Local\Temp\tmp8BAF\Dotnet.Script.Core\ScriptRunner.cs:line 110

The reason for this is that even though the library creates a dedicated thread pool and does all the work inside this pool, some continuations still run on the main thread pool (which kinda defeats the purpose).

The problem seems to be here:

Breakpoint reached: /home/******/.config/JetBrains/Rider2024.1/resharper-host/SourcesCache/37cf93917ec38cc6a96176b4240fe2fe05f294df6d9955a4c023fd9b30aa58/SocketAwaitableEventArgs.cs:142,21 
   at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.OnCompleted(SocketAsyncEventArgs e) in /_/src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs:line 145
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.TransferCompletionCallbackCore(Int32 bytesTransferred, Memory`1 socketAddress, SocketFlags receivedFlags, SocketError socketError)
   at System.Net.Sockets.SocketAsyncEngine.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

The breakpoint above shows that there is a unit of work within Pipelines.Sockets.Unofficial that is being run on the main thread pool. This unit of work is being scheduled from here:

if (!socket.ReceiveAsync(args)) args.Complete();

Apparently, on Linux, Socket.ReceiveAsync(SocketAsyncEventArgs), when running asyncronously, always schedules the event callback on the main thread pool. If the main thread pool is backlogged, the continuation can't go through in time, and Redis client throws a timeout error.

This particular problem can be fixed by having the socket process its continuations on the socket thread. It can be done by setting the environment library DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS=1. However, this setting causes more problems with third-party libraries than it solves. Apparently, there is no way in .NET to set it for a particular socket (or I couldn't find it).

Is there any other way to have Pipelines.Sockets.Unofficial do all its work on the dedicated thread pool (and, ultimately, make Redis not time out if the process is thread starved)?

MemoryMappedPipeReader Feedback

I was playing around with Pipelines.Sockets.Unofficial.MemoryMappedPipeReader this morning, and I'd like to share some feedback from my experience:

  1. Nothing seems to have a finalizer, so those pointers we acquire won't go away unless the consumer explicitly disposes the MemoryMappedPipeReader (ideally, see the remarks and make sure that ReleasePointer is always called in a CER (see CriticalFinalizerObject), though just making sure it's released in a finalizer may be good enough... probably nobody's going to hunt you down if you assume that people won't be regularly aborting threads or unloading app domains).
  2. The return type of MemoryMappedPipeReader.Create does not implement IDisposable, so most people may not even realize that they're supposed to dispose this.
  3. CreateFromFile throws an exception if file.Length is 0... probably just fall back in this case too.
  4. We access the SafeBuffer on UnmanagedMemoryAccessor through reflection (and fall back all the way to the FileStream approach if that's impossible), but wouldn't it be equivalent to go through MemoryMappedViewAccessor.SafeMemoryMappedViewHandle? From my reading of the code, it looks like that would even be the same object instance.

High CPU due to lock contention on _queue in DedicatedThreadPoolPipeScheduler.Schedule

Under high load, our application is seeing very high CPU spikes due to lock contention in the StackExchange.Redis library. I tracked it down to a lock in this method: DedicatedThreadPoolPipeScheduler.Schedule

This is the stack trace:
LockContentionStackTrace.txt

--

  • pipelines.sockets.unofficial!DedicatedThreadPoolPipeScheduler.Schedule
    |+ system.io.pipelines!Pipe.FlushAsync
    ||+ system.io.pipelines!System.IO.Pipelines.Pipe+DefaultPipeWriter.FlushAsync(value class System.Threading.CancellationToken)
    || + stackexchange.redis!PhysicalConnection.FlushAsync
    || |+ stackexchange.redis!PhysicalBridge.WriteMessageTakingWriteLockAsync
    || | + stackexchange.redis!PhysicalBridge.TryWriteAsync
    || | ย + stackexchange.redis!ServerEndPoint.TryWriteAsync
    || | ย ย + stackexchange.redis!ConnectionMultiplexer.ExecuteAsyncImpl
    || | ย ย ย + stackexchange.redis!RedisBase.ExecuteAsync
    || | ย ย ย ย + OTHER <<clr!InstantiatingMethodStubWorker>>
    || | ย ย ย ย ย + stackexchange.redis!RedisDatabase.HashGetAllAsync
    || | ย ย ย ย ย + stackexchange.redis!RedisDatabase.ScriptEvaluateAsync

I am using several (20-30) ConnectionMultiplexer objects in a round-robin pool to accommodate the amount of data the service is requesting.

Does this project only support .Net Core, instead of .Net Framework?

Hi, Marc

Firstly, thanks to your contributions of this project. When I used StackExchange.Redis, I found one problem. Here it is. One Property named "IsHardwareAccelerated" was not found under .Net Framework. It's in .Net Core. The problem code line

private static void CheckNumerics() => _ = System.Numerics.Vector.IsHardwareAccelerated;

And with the document of "Vector.IsHardwareAccelerated Property" https://docs.microsoft.com/en-us/dotnet/api/system.numerics.vector.ishardwareaccelerated?view=netcore-3.1

I want to know, Does this project only support .Net Core, instead of .Net Framework? ๐Ÿ˜‰

pauseWriterThreshold and streaming data

Hello! I have discovered a strange behavior on client side when receiving large chunks of data (i.e. file parts) - data i am receiving is limited by pauseWriterThreshold.
Scenario: i have a simple ptotocol over tcp, there each message has length(first 4 bytes) and body. I am stacking data inpipe reader until whole message received, after that i am processing the message. In case of large message i never receive whole message data in pipe. Increasing pauseWriterTreshold solves my problem , but this looks strange.
Important: server sends packets with PSH flag is on.
Question is - is this by design?

Pooling/reuse of SAEA in SocketConnection

I've had a great time reading/analyzing/learning from this library. I came across a bit of code that had me wondering if I've been using SocxketAsyncEventArgs wrong.

In SocketConnection.Receive.cs, you don't re-use (pool) the SAEA.

Specifically, this line is called on each loop of the DoReadAsync() function:

_readerArgs = new SocketAwaitableEventArgs(InlineReads ? null : _receiveOptions.WriterScheduler);

I'm curious to know if this is deliberate. Most of the information I've read about SAEA so far seems to suggest pooling them for re-use.

SocketConnection / Pipelines usage advice

Hello!

Trying to experiment pipelines and ended up on your blog and this package.

I have a case here where I'm experimenting with the library and pipelines for a socket client and for whatever reason the server is disconnecting me with read timeouts. This is basically the test class I'm using:

 public class PipeTestConnection
    {
        private const int TIMEOUT = 20000;
        private readonly string _endpoint;
        private readonly int _port;
        private SocketConnection _connection;

        public bool Connected => this._connection?.Socket != null && this._connection.Socket.Connected;

        public PipeTestConnection(string endpoint, int port)
        {
            this._endpoint = endpoint;
            this._port = port;
        }

        public async Task Connect()
        {
            var timeout = Task.Delay(TIMEOUT);
            var connect = SocketConnection.ConnectAsync(
                new IPEndPoint((Dns.GetHostAddresses(this._endpoint))[0], this._port),
                connectionOptions: SocketConnectionOptions.None);
            var first = await Task.WhenAny(timeout, connect);
            if (first == timeout) throw new TimeoutException();
            this._connection = connect.Result;
        }

        public async Task<byte[]> SendPacket(GPRequest packet)
        {
            if (packet == null) throw new ArgumentNullException(nameof(packet));
            if (!this.Connected) throw new InvalidOperationException("Not connected");

            var requestBuffer = packet.ToBuffer();

            await this._connection.Output.WriteAsync(requestBuffer).ConfigureAwait(false);
            var cancellationSource = new CancellationTokenSource();
            var timeout = Task.Delay(TIMEOUT);
            var receive = ReceivePacket(cancellationSource.Token);

            var first = await Task.WhenAny(timeout, receive).ConfigureAwait(false);
            if (first == timeout)
            {
                cancellationSource.Cancel();
                throw new TimeoutException();
            }
            return receive.Result;
        }

        private async Task<byte[]> ReceivePacket(CancellationToken token)
        {
            var readResult = await this._connection.Input.ReadAsync(token).ConfigureAwait(false);

            var buffer = readResult.Buffer;
            var responseBuffer = buffer.ToArray().Skip(2); // Theoretically I can discard the 2 bytes for the message size header when reading it as I'm only interested in the data.
            this._connection.Input.AdvanceTo(buffer.End);

            return responseBuffer.ToArray();
        }

        public void Disconnect() => this._connection.Dispose();
    }

The protocol the server expects is a 2 bytes in the beginning of the message with the content size and then the data following it. It is basically request/response based so the server will never send something to the client other than replying to its requests.

Sometimes server (which I don't control) disconnect me with read timeout and sometimes incomplete data arrive on the client and is returned by the ReceivePacket method.

Can you try pinpoint where I'm being dumb?

Thanks! Awesome blog post btw!

ConnectionResetException missing Serializable attribute

https://github.com/mgravell/Pipelines.Sockets.Unofficial/blob/master/src/Pipelines.Sockets.Unofficial/ConnectionResetException.cs

Type 'Pipelines.Sockets.Unofficial.ConnectionResetException' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. If the type is a collection, consider marking it with the CollectionDataContractAttribute. See the Microsoft .NET Framework documentation for other supported types.

https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1032-implement-standard-exception-constructors?view=vs-2019

Seeing this when getting connection failures with the StackExchange.Redis assembly.

proposal: add option to enable tunning socket listen backlog

--- a/src/Pipelines.Sockets.Unofficial/SocketServer.cs
+++ b/src/Pipelines.Sockets.Unofficial/SocketServer.cs
 using System.IO.Pipelines;
 using System.Net;
@@ -22,6 +22,7 @@ namespace Pipelines.Sockets.Unofficial
             AddressFamily addressFamily = AddressFamily.InterNetwork,
             SocketType socketType = SocketType.Stream,
             ProtocolType protocolType = ProtocolType.Tcp,
+            int listenBacklog = 20,
             PipeOptions sendOptions = null, PipeOptions receiveOptions = null)
         {
             if (_listener != null) Throw.InvalidOperation("Server is already running");

Test: BufferWriterDoesNotLeak also fails on my machine

Test Name: Pipelines.Sockets.Unofficial.Tests.BufferWriterTests.BufferWriterDoesNotLeak
Test Outcome: Failed
Result StackTrace: at Pipelines.Sockets.Unofficial.Tests.BufferWriterTests.BufferWriterDoesNotLeak() in C:\repo\Pipelines.Sockets.Unofficial\tests\Pipelines.Sockets.Unofficial.Tests\BufferWriterTests.cs:line 94
Result Message:
Assert.Equal() Failure
Expected: 4
Actual: 7
Result StandardOutput:
before flush, wrote 0... (nil)
after flush: (nil)
before flush, wrote 5... [0,5) - 11/11 available; counts: 1/
after flush: [5,5) - 11/11 available; counts: 2/
before flush, wrote 10... [5,15) - 1/1 available; counts: 2/
after flush: [15,15) - 1/1 available; counts: 3/
before flush, wrote 15... [15,30) - 1/1 available; counts: 1/
after flush: [30,30) - 1/1 available; counts: 2/
before flush, wrote 20... [30,50) - 11/11 available; counts: 1/1/
after flush: [50,50) - 11/11 available; counts: 2/

Improve package readme

Is your feature request related to a problem? Please describe.
The current package readme is not informative about how to use the package

Describe the solution you'd like

The nuget package should be using the same readme as the repo to make it as easy as possible for a user to get started with the package.

Describe alternatives you've considered
Accept needing to go to github to get information

Additional context
N/a

Loopback fast path is deprected on Windows and can cause high cpu on Win Server 2019

See deprecation statement: https://docs.microsoft.com/en-us/windows-hardware/drivers/network/sio-loopback-fast-path
After upgrade from Server 2016 to Server 2019 we noticed issue with high cpu usage by system.exe process. Windows Server support engineers claim that it could be caused by some bug in loopback fast path implementation in tcp.sys.
Given the feature is deprecated does it make sense to remove it from the source code?

https://github.com/mgravell/Pipelines.Sockets.Unofficial/blob/master/src/Pipelines.Sockets.Unofficial/SocketConnection.Connect.cs#L67

How to get a Multisegment stream backed ReadOnlySequence

I'm writing something to parse a utf8 based network protocol and I'm trying to setup multiple segments so I can ensure the parser handles the boundaries correctly without having to take the data off the wire each time. It may well be the case that I've misunderstood the pipelines api but I can't figure out how to get a multi segment sequence setup using a stream.

The code I'm using so far is this:

public static async Task TestParser()
{
	byte[] msg = new byte[] { 239, 187, 191, 76, 111, 103, 111, 110, 58, 58, 68, 73, 68, 35, 95, 123, 101, 97, 101, 102, 50, 50, 57, 102, 45, 52, 51, 56, 97, 45, 52, 52, 52, 52, 45, 97, 102, 55, 56, 45, 98, 56, 51, 101, 101, 49, 100, 52, 97, 56, 49, 101, 125, 126, 65, 112, 112, 78, 97, 109, 101, 35, 80, 68, 65, 126, 65, 112, 112, 86, 101, 114, 115, 105, 111, 110, 35, 49, 46, 51, 51, 126, 69, 110, 97, 98, 108, 101, 67, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 35, 84, 114, 117, 101, 64, 64 };

	using (var memory = new MemoryStream())
	{
		int length = 5;
		var duplex = StreamConnection.GetDuplex(memory);
		for (int count = 0; count<msg.Length/length; count++)
		{
			duplex.Output.Write(msg.AsSpan(count*length, length));
		}
		if (msg.Length%length>0)
		{
			duplex.Output.Write(msg.AsSpan(msg.Length-(msg.Length%length)));
		}
		await duplex.Output.FlushAsync();

		duplex.Output.Complete();

		var reader = await duplex.Input.ReadAsync();

		ReadOnlySequence<byte> buffer = reader.Buffer; // always a zero length single segment, what am i getting wrong?
	}
}

Trim and AOT warnings in Delegates and PerTypeHelpers

In trying to use Microsoft.Extensions.Caching.StackExchangeRedis in a Native AOT app (see dotnet/aspnetcore#45910), I'm getting the following warnings from these two places:

private static readonly Func<MulticastDelegate, object> s_getArr = GetGetter<object>("_invocationList");
private static readonly Func<MulticastDelegate, IntPtr> s_getCount = GetGetter<IntPtr>("_invocationCount");
private static readonly bool s_isAvailable = s_getArr != null & s_getCount != null;
internal static bool IsAvailable => s_isAvailable;
private static Func<MulticastDelegate, T> GetGetter<T>(string fieldName)
{
try
{
var field = typeof(MulticastDelegate).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
if (field == null || field.FieldType != typeof(T)) return null;
#if !NETSTANDARD2_0
try // we can try use ref-emit
{
var dm = new DynamicMethod(fieldName, typeof(T), new[] { typeof(MulticastDelegate) }, typeof(MulticastDelegate), true);
var il = dm.GetILGenerator();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, field);
il.Emit(OpCodes.Ret);
return (Func<MulticastDelegate, T>)dm.CreateDelegate(typeof(Func<MulticastDelegate, T>));
}

/_/src/Pipelines.Sockets.Unofficial/Delegates.cs(52): AOT analysis warning IL3050: Pipelines.Sockets.Unofficial.Delegates.GetGetter<T>(String): Using member 'System.Reflection.Emit.DynamicMethod.DynamicMethod(String,Type,Type[],Type,Boolean)' which has 'RequiresDynamicCodeAttribute' can break functionality when AOT compiling. Creating a DynamicMethod requires dynamic code. [C:\git\azure-activedirectory-identitymodel-extensions-for-dotnet\test\Microsoft.IdentityModel.AotCompatibility.TestApp\Microsoft.IdentityModel.AotCompatibility.TestApp.csproj]

This looks simply like we can just check for RuntimeFeature.IsDynamicCodeSupported and fallback to reflection when it isn't supported. But the bigger issue is that _invocationList and _invocationCount don't appear to be fields of MulticastDelegate in Native AOT: https://github.com/dotnet/runtime/blob/main/src/coreclr/nativeaot/System.Private.CoreLib/src/System/MulticastDelegate.cs. So I'm not sure how to fix this.

static Allocator<T> Calculate()
{
if (IsBlittable)
{
try
{
typeof(UnmanagedAllocator<>).MakeGenericType(typeof(T))
.GetProperty(nameof(UnmanagedAllocator<int>.Shared))
.GetValue(null);
}
catch { }
}
return PreferPinned(); // safe fallback
}
}
public static Allocator<T> PreferPinned()
{
return _preferPinned ??= Calculate();
static Allocator<T> Calculate()
{
if (IsBlittable)
{
try
{
typeof(PinnedArrayPoolAllocator<>).MakeGenericType(typeof(T))
.GetProperty(nameof(PinnedArrayPoolAllocator<int>.Shared))
.GetValue(null);
}
catch { }
}
return ArrayPoolAllocator<T>.Shared; // safe fallback
}
}

/_/src/Pipelines.Sockets.Unofficial/Arenas/PerTypeHelpers.cs(39): Trim analysis warning IL2090: Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1.<PreferPinned>g__Calculate|3_0(): 'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicParameterlessConstructor' in call to 'System.Type.MakeGenericType(Type[])'. The generic parameter 'T' of 'Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to. [C:\git\azure-activedirectory-identitymodel-extensions-for-dotnet\test\Microsoft.IdentityModel.AotCompatibility.TestApp\Microsoft.IdentityModel.AotCompatibility.TestApp.csproj]
/_/src/Pipelines.Sockets.Unofficial/Arenas/PerTypeHelpers.cs(39): AOT analysis warning IL3050: Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1.<PreferPinned>g__Calculate|3_0(): Using member 'System.Type.MakeGenericType(Type[])' which has 'RequiresDynamicCodeAttribute' can break functionality when AOT compiling. The native code for this instantiation might not be available at runtime. [C:\git\azure-activedirectory-identitymodel-extensions-for-dotnet\test\Microsoft.IdentityModel.AotCompatibility.TestApp\Microsoft.IdentityModel.AotCompatibility.TestApp.csproj]
/_/src/Pipelines.Sockets.Unofficial/Arenas/PerTypeHelpers.cs(19): Trim analysis warning IL2090: Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1.<PreferUnmanaged>g__Calculate|2_0(): 'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicParameterlessConstructor' in call to 'System.Type.MakeGenericType(Type[])'. The generic parameter 'T' of 'Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to. [C:\git\azure-activedirectory-identitymodel-extensions-for-dotnet\test\Microsoft.IdentityModel.AotCompatibility.TestApp\Microsoft.IdentityModel.AotCompatibility.TestApp.csproj]
/_/src/Pipelines.Sockets.Unofficial/Arenas/PerTypeHelpers.cs(19): AOT analysis warning IL3050: Pipelines.Sockets.Unofficial.Arenas.PerTypeHelpers`1.<PreferUnmanaged>g__Calculate|2_0(): Using member 'System.Type.MakeGenericType(Type[])' which has 'RequiresDynamicCodeAttribute' can break functionality when AOT compiling. The native code for this instantiation might not be available at runtime. [C:\git\azure-activedirectory-identitymodel-extensions-for-dotnet\test\Microsoft.IdentityModel.AotCompatibility.TestApp\Microsoft.IdentityModel.AotCompatibility.TestApp.csproj]

Maybe these can be wrapped in RuntimeFeature.IsDynamicCodeSupported? I'm a bit confused by this code because it isn't actually returning the result of invoking those properties through reflection.

cc @mgravell

Xamarin.Android crash with native exception on SocketConnection.Create

Hi all, sorry if it's the wrong place, first time I publish an issue on GitHub.

When using Pipelines.Sockets.Unofficial version 2.0.25 in a Xamarin.Android project and in a Xamarin.Forms (netstandard 2.1) shared library, the application crash with a native exception when calling SocketConnection.Create (with any parameters) . The version 2.0.22 is working.
Here are the native error crash logs:
Native_Error_log.txt

Thank you!

Failing on Azure Worker role - BadImageFormatException

Hi, using this in an Azure Worker Role project; .net 4.7.2, package reference style references. I get System.Buffers/System.IO.Pipelines assembly load errors unless both those projects are explicitly referenced; and binding redirect errors for System.Buffers unless the same version (4.4.0) is installed as Pipelines.Sockets.Unofficial uses.

Now it's throwing the following, when await is called on DefaultPipeReader.ReadAsync():
System.BadImageFormatException: 'Could not load file or assembly 'System.Runtime.CompilerServices.Unsafe, Version=4.0.4.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. Reference assemblies should not be loaded for execution. They can only be loaded in the Reflection-only loader context. (Exception from HRESULT: 0x80131058)'

At the moment I'm just running locally in the Azure Compute Emulator (Azure SDK 2.9.6).

Any ideas?

Race?

We've been seeing intermittent problems with data getting lost and have finally narrowed it down to a simple test case that sometimes fails. It almost always fails on a Threadripper box and perhaps one of three attempts fail on a six year old laptop. Is this code reasonable and, if so, why does it sometimes fail with no data in the MemoryStream?

Trying to wait for the reader to complete doesn't help since it doesn't actually complete.

        [TestMethod]
        public async Task SingleByte()
        {
            var expected = new byte[] { 7 };
            using (var ms = new MemoryStream())
            {
                var pipeWriter = StreamConnection.GetWriter(ms);
                var memory = pipeWriter.GetMemory(expected.Length);
                expected.AsMemory().CopyTo(memory);
                pipeWriter.Advance(expected.Length);
                await pipeWriter.FlushAsync().ConfigureAwait(false);
                pipeWriter.Complete();
                ms.Position = 0;
                var actual = ms.ToArray();
                Assert.IsNotNull(actual);
                CollectionAssert.AreEqual(expected, actual);
            }
        }

        [TestMethod]
        public async Task SingleByteWithWait()
        {
            var expected = new byte[] { 7 };
            using (var ms = new MemoryStream())
            {
                var pipeWriter = StreamConnection.GetWriter(ms);
                var memory = pipeWriter.GetMemory(expected.Length);
                expected.AsMemory().CopyTo(memory);
                pipeWriter.Advance(expected.Length);
                await pipeWriter.FlushAsync().ConfigureAwait(false);
                pipeWriter.Complete();
                var doneTcs = new TaskCompletionSource<bool>();
                pipeWriter.OnReaderCompleted((ex, obj) =>
                {
                    var tcs = (TaskCompletionSource<bool>)obj;
                    if (null != ex) tcs.TrySetException(ex);
                    else tcs.TrySetResult(true);
                }, doneTcs);
                await doneTcs.Task.ConfigureAwait(false);
                ms.Position = 0;
                var actual = ms.ToArray();
                Assert.IsNotNull(actual);
                CollectionAssert.AreEqual(expected, actual);
            }
        }

Unable to wait for socket closure

As far as I can tell, there's no way to receive a callback when the socket used by SocketConnection is closed or disposed. I'd like to make a proxy server which accepts connections using a proprietary protocol, unpacks the messages from that protocol, and sends the content on to another server. I'll end up with two SocketConnections, and a class which is translating between them. When one SocketConnection is closed, I need to close the other one to ensure I'm not leaking connections.

  • I could include a 'disconnect' message in the protocol, but that doesn't handle network errors or application errors
  • I could poll the SocketConnection, but that seems too hacky

Ideally, I'd like to register a callback, or be able to await a task, so I can run some cleanup code once either of the two sockets has disconnected.

Complete SocketConnection pipe without disposing the underlying socket

I'm working on a client for Apache Kudu, using both your protobuf-net and this library, thanks!

The simplified exchange goes something like this (when first connecting to Kudu):

  1. Send negotiate request PB
  2. Do TLS handshake over PB
  3. The remaining negotation happens over TLS
  4. At the end of negotation, we may keep or drop TLS (depending on what the user decided)

What I attempted:

  • Create a Socket and SocketConnection
  • Do steps 1 and 2 above
  • Complete SocketConnection (I don't dispose the underlying socket)
  • Create a StreamConnection (SslStream -> InnerSslStream -> NetworkStream)
  • Do steps 3

Unfortunately completing the SocketConnection disposes the underlying socket. If I don't complete the SocketConnection, I end up with 2 pipes trying to read from the same socket.

Not sure if this is relevant, but since I have to do the TLS handshake over protobuf messages, this is a snippet of what I do in the InnerSslStream I pass to SslStream,

public override Task WriteAsync(byte[] buffer, ...)
{
    if (_isAuthenticated)
    {
        _networkStream.WriteAsync(buffer, ...);
    }
    else
    {
        // Wrap TLS handshake in protobuf message
        _result = _negotiator.SendTlsHandshakeAsync(buffer);
    }
}

Do you have any advice for dealing with this scenario? The gist of it is I need to be able to switch to/from TLS at an arbitrary time.

Multiple threads writing to the same Pipe

Hi Marc,

I just started using Pipes and it works great.
I also make use of dotnetty for the connection part.
Basic implementation is:

  1. Dotnetty makes the connection, gets the data and I write the data directly to Pipe.
  2. Dotnetty reads 1024 bytes at a time and if the message is huge, it makes a second read and continues until the stream is empty.
  3. If I have 2 clients connected, then, if there is a message that came in at the same time, then I am seeing issues at present.
    Message format:
    This is what is happening:
    At 10.03.30.000: (size of message: lets say 4000)
    Dotnetty reads at first :
    This is directly put into Pipe
    Dotnetty reads second bunch:
    This is directly put into pipe
    Now Pipe has
    Now, there is one more part of message1 that is to be read by Dotnetty -
    Now, at the exact same time, with just change in Microseconds, another message comes in which is smaller in size .
    Dotnetty connects to client 2 and gets the message

    This is put into Pipe
    So, Pipe now looks like below:

    Now, the custom filter condition that I have is, check if the data in pipe has STX and ETX... At this point in time, the condition is met, as it sees an STX and an ETX. But, as far as the messages are concerned, they are 2 different messages.

This happened because the clients sent the message at the exact same time and there is only one Pipe object all through the application.
Could you please suggest a solution for the above?
-Thanks in advance!

High percentage of threads blocked on DedicatedThreadPoolPipeScheduler.RunWorkLoop

Hi,

we are having an Issue in prod where we see a high percentage / number of Threads in waiting status, and when looking at full Net call stack, it is pointing to "Pipelines.Sockets.Unofficial.DedicatedThreadPoolPipeScheduler.RunWorkLoop()+99
--"

We were using version 1.0.9 and I came across this article https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html where you explain an issue you fixed on 1.1.*, which was very similar to what we were seeing in PROD so i went ahead and raised a PR and upgrading this NuGet to 2.0.7 and at same time I updated StackExchange.Redis from 2.0.519 to 2.0.571

However after deploying this new version and doing more testing we are still seeing high number of blocked threads of course way less than what we saw before but still happening.

this is call stack details, our app is running on Net472. if you need further details let me know.

[[GCFrame]]

[[HelperMethodFrame_1OBJ] (System.Threading.Monitor.ObjWait)] System.Threading.Monitor.ObjWait(Boolean, Int32, System.Object)
Pipelines.Sockets.Unofficial.DedicatedThreadPoolPipeScheduler.RunWorkLoop()+99
mscorlib_ni!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)+163
mscorlib_ni!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)+14
mscorlib_ni!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)+52
mscorlib_ni!System.Threading.ThreadHelper.ThreadStart(System.Object)+5c
[[GCFrame]]
[[DebuggerU2MCatchHandlerFrame]]
[[ContextTransitionFrame]]
[[DebuggerU2MCatchHandlerFrame]]

ntdll!NtWaitForMultipleObjects+14

KERNELBASE!WaitForMultipleObjectsEx+f9
clr!WaitForMultipleObjectsEx_SO_TOLERANT+62
clr!Thread::DoAppropriateWaitWorker+1e4
clr!Thread::DoAppropriateWait+7d
clr!CLREventBase::WaitEx+c4
clr!Thread::Block+27
clr!SyncBlock::Wait+19d
[[GCFrame]]
clr!ObjectNative::WaitTimeout+e1
[[HelperMethodFrame_1OBJ] (System.Threading.Monitor.ObjWait)] System.Threading.Monitor.ObjWait(Boolean, Int32, System.Object)
Pipelines.Sockets.Unofficial.DedicatedThreadPoolPipeScheduler.RunWorkLoop()+99
mscorlib_ni!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)+163
mscorlib_ni!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)+14
mscorlib_ni!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)+52
mscorlib_ni!System.Threading.ThreadHelper.ThreadStart(System.Object)+5c
clr!CallDescrWorkerInternal+83
clr!CallDescrWorkerWithHandler+4e
clr!MethodDescCallSite::CallTargetWorker+f8
clr!ThreadNative::KickOffThread_Worker+109
[[GCFrame]]
clr!ManagedThreadBase_DispatchInner+39
clr!ManagedThreadBase_DispatchMiddle+6c
clr!ManagedThreadBase_DispatchOuter+75
[[DebuggerU2MCatchHandlerFrame]]
clr!ManagedThreadBase_DispatchInCorrectAD+15
clr!Thread::DoADCallBack+278
[[ContextTransitionFrame]]
clr!ManagedThreadBase_DispatchInner+2fc3
clr!ManagedThreadBase_DispatchMiddle+6c
clr!ManagedThreadBase_DispatchOuter+75
[[DebuggerU2MCatchHandlerFrame]]
clr!ManagedThreadBase_FullTransitionWithAD+2f
clr!ThreadNative::KickOffThread+db
clr!Thread::intermediateThreadProc+86
kernel32!BaseThreadInitThunk+14
ntdll!RtlUserThreadStart+21

image

netstandard2.1 DoReceive fails on Zero Length Reads on Mono

Seems that netstandard2.1 fails to read zero length reads without exception from a combination of the SOCKET_STREAM_BUFFERS path combined with the fact that Mono checks the buffer and bufferlist is non-empty or non-null here: https://github.com/mono/mono/blame/main/mcs/class/System/System.Net.Sockets/Socket.cs#L1508

I don't know why suddenly this issue, after like a year of use, has come up but I guess it can only be because I've updated Unity3D and started targeting netstandard2.1.

I don't have a great solution other than just removing the SOCKET_STREAM_BUFFERS path in SocketConnection.DoRecieve and also setting the null buffer to an empty array with size 1. This solution is probably not ideal so hopefully you'll know what's best to change.

Exceptions.

Hi,

I am having a problem when clients disconnect from server. I am using my own server, but happens same in sillyousu project.

Repo:
https://github.com/sillyousu/TryoutPipelineSockets

The client is basic, I just created a client that creates some connections(like 1000) and sends some packets.

When I just close or stop debugging, several exceptions occur and end up making the server quite slow (like pause).

[22:08:08,840] Connection will be disconnected, because of an exception
Pipelines.Sockets.Unofficial.ConnectionResetException: An existing connection was forcibly closed by the remote host ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
   at Pipelines.Sockets.Unofficial.Internal.Throw.Socket(Int32 errorCode) in C:\********************\Internal\Throw.cs:line 59
   at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.<GetResult>g__ThrowSocketException|10_0(SocketError e) in C:\********************\SocketAwaitableEventArgs.cs:line 86
   at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.GetResult() in C:\********************\SocketAwaitableEventArgs.cs:line 79
   at Pipelines.Sockets.Unofficial.SocketConnection.DoReceiveAsync() in C:\********************\SocketConnection.Receive.cs:line 64
   --- End of inner exception stack trace ---
   at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
   at System.IO.Pipelines.Pipe.GetReadAsyncResult()
   at TestPipe.Network.PacketPipeReaderBase.ReadBuffer() in C:\********************\PacketPipeReaderBase.cs:line 69
   at TestPipe.Network.PacketPipeReaderBase.ReadSource() in C:\********************\PacketPipeReaderBase.cs:line 48
[22:08:08,840] Connection will be disconnected, because of an exception
Pipelines.Sockets.Unofficial.ConnectionResetException: An existing connection was forcibly closed by the remote host ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
   at Pipelines.Sockets.Unofficial.Internal.Throw.Socket(Int32 errorCode) in C:\********************\Internal\Throw.cs:line 59
   at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.<GetResult>g__ThrowSocketException|10_0(SocketError e) in C:\********************\SocketAwaitableEventArgs.cs:line 86
   at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.GetResult() in C:\********************\SocketAwaitableEventArgs.cs:line 79
   at Pipelines.Sockets.Unofficial.SocketConnection.DoReceiveAsync() in C:\********************\SocketConnection.Receive.cs:line 64
   --- End of inner exception stack trace ---
   at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
   at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
   at System.IO.Pipelines.Pipe.GetReadAsyncResult()
   at TestPipe.Network.PacketPipeReaderBase.ReadBuffer() in C:\********************\PacketPipeReaderBase.cs:line 69
   at TestPipe.Network.PacketPipeReaderBase.ReadSource() in C:\********************\PacketPipeReaderBase.cs:line 48

PacketPipeReaderBase original code can be found here (thanks @MUnique):
https://github.com/MUnique/OpenMU/blob/master/src/Network/PacketPipeReaderBase.cs

Am I forgetting something?

Thanks!

Test: ConnectTests.Connect - Fails on my machine

(Running on Windows)

[xUnit.net 00:00:14.88]     Pipelines.Sockets.Unofficial.Tests.ConnectTests.Connect [FAIL]
  X Pipelines.Sockets.Unofficial.Tests.ConnectTests.Connect [5s 77ms]
  Error Message:
   System.TimeoutException : The operation has timed out.
  Stack Trace:
     at Pipelines.Sockets.Unofficial.Internal.Throw.Timeout(String message) in /_/src/Pipelines.Sockets.Unofficial/Internal/Throw.cs:line 33
   at Pipelines.Sockets.Unofficial.Tests.ConnectTests.<Connect>d__8.MoveNext() in /_/tests/Pipelines.Sockets.Unofficial.Tests/ConnectTests.cs:line 38
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.