Giter Site home page Giter Site logo

dotnet / nerdbank.streams Goto Github PK

View Code? Open in Web Editor NEW
574.0 13.0 53.0 3.84 MB

Specialized .NET Streams and pipes for full duplex in-proc communication, web sockets, and multiplexing

License: MIT License

C# 85.89% TypeScript 8.89% PowerShell 5.09% Batchfile 0.06% Dockerfile 0.08%

nerdbank.streams's Introduction

Specialized .NET Stream classes

NuGet package Build Status codecov

Enhanced streams for communication in-proc or across the Internet.

Features

  1. SimplexStream is meant to allow two parties to communicate one direction. Anything written to the stream can subsequently be read from it. You can share this Stream with any two parties (in the same AppDomain) and one can send messages to the other.
  2. FullDuplexStream creates a pair of bidirectional streams for in-proc two-way communication; it also creates a single bidirectional stream from two unidirectional streams.
  3. MultiplexingStream allows you to split any bidirectional .NET Stream into many sub-streams (called channels). This allows two parties to establish just one transport stream (e.g. named pipe or web socket) and use it for many independent protocols. For example, one might set up JSON-RPC on one channel and use other channels for efficient binary transfers.
  4. AsStream() wraps a WebSocket, System.IO.Pipelines.PipeReader, System.IO.Pipelines.PipeWriter, or System.IO.Pipelines.IDuplexPipe with a System.IO.Stream for reading and/or writing.
  5. UsePipe() enables reading from and writing to a Stream or WebSocket using the PipeReader and PipeWriter APIs.
  6. Stream.ReadSlice(long) creates a sub-stream that ends after a given number of bytes.
  7. PipeReader.ReadSlice(long) creates a sub-PipeReader that ends after a given number of bytes.
  8. MonitoringStream wraps another Stream and raises events for all I/O calls so you can monitor and/or trace the data as it goes by.
  9. WriteSubstream and ReadSubstream allow you to serialize data of an unknown length as part of a larger stream, and later deserialize it such in reading the substream, you cannot read more bytes than were written to it.
  10. Sequence<T> is a builder for ReadOnlySequence<T>.
  11. PrefixingBufferWriter<T> wraps another IBufferWriter<T> to allow for prefixing some header to the next written buffer, which may be arbitrarily long.
  12. BufferTextWriter is a TextWriter-derived type that can write directly to any IBufferWriter<byte>, making it more reusable than StreamWriter and thus allows for alloc-free writing across many writers.
  13. SequenceTextReader is a TextReader-derived type that can read directly from any ReadOnlySequence<byte>, making it more reusable than StreamReader and thus allows for alloc-free reading across many sequences.
  14. DuplexPipe is a trivial implementation of IDuplexPipe.
  15. Stream.ReadBlockAsync guarantees to fill the supplied buffer except under certain documented conditions, instead of the regular ReadAsync guarantee of supplying at least 1 byte.

.NET Foundation

This project is supported by the .NET Foundation.

nerdbank.streams's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nerdbank.streams's Issues

Question: How best to go about learning to create Azue pipeline and build processes

@AArnott I've been studying how you've setup azure pipeline and the overall build process for this project and it is elegant, and most importantly, necessary for repeatability, determinate high quality and availability. I was just wondering how does one go about learning to do it all; any suggestions on where to start? I've grown so dependent on VS builds and legacy deployment and Installer, but now I'm moving everything to azure pipelines and Azure App Center for WPF and other native apps via Xamarin. I'm also working on the Desktop samples for Nerdbank.Streams and wanted increase my domain knowledge around the build process.

BufferTextWriter doesn't flush on dispose

Using the following code:

using (var writer = new BufferTextWriter(buffer, Encoding))
{
    SchemaSerializer.Serialize(_queryExecutor.Schema, writer);
}

nothing gets written to the buffer if the writer can contain it in its buffer. This is fairly surprising behavior. I'm pretty sure the stream writer, for instance, flushes on dispose (which has its own set of issues in an async context - but the IBufferWriter is a sync API so those issues shouldn't be present here).

CI build's code coverage test run is broken

If you look at the Azure Pipelines build logs, the code coverage step of the Windows builds is merely a printout of proper usage because the command line we're using is malformed.

MultiplexingStream should apply backpressure to individual channels

To avoid a fast sender and slow receiver from plugging up the entire connection (when we're out of local buffers to receive for an individual channel), the MultiplexingStream should apply and respond to backpressure.

Design proposal

  1. Offer and accept channel frames should include allowed window size.
  2. Channel.Input should track how many bytes have been processed by the reader and send frames back to the remote party to inform them so they know when they can start transmitting.
    1. What is the minimum newly processed data size before a frame should be transmitted? For example if the reader only processes 3 bytes, we shouldn't send a frame back to the sender to let them know.
      1. One frame size processed is worth transmitting about.
        ii. When there is more than one frame to transfer and less than one frame of window remaining, consider waiting till more window space is available.
    2. What if the reader 'processes' 0 bytes but acknowledges all of them? Pipelines API allows this as a means to receive more data. Unless mxstream supports that too, it can result in deadlocks.
      1. The channel can send a special frame requesting the sender to transmit exactly one more frame. The Pipelines API will allow exactly one more Write call to complete, so this is safe. It will provide the receiver with the additional data they need. This process can repeat if the reader 'examines' the whole buffer again.
  3. Channel.Output should track how much data was transmitted and compare to how much data has been processed by the remote party so that it does not flush data that would exceed the allowed window size.
  4. Add tests for when window size is larger/smaller/exactly window size.
  5. Set default window size to be some multiple of the frame size.
  6. A default window size is communicated during the mxstream handshake for channels that are created and transmitted to even before acknowledgment is received.
    1. This default must be NO GREATER than the minimum allowed window size for any individual channel, since otherwise once the channel is accepted and its pipe is set up, it could still overflow with data transmitted from the channel's creator based on the previous, larger window size.
  7. If a channel requests a window size that is less than the mxstream's default window size, quietly just raise the window size for the channel to match the default value.
  8. How to detect window size when given an ExistingPipe? โ“
  9. Make protocol more flexible for versions.

Unstable test: MultiplexingStreamTests.CancelChannelOfferBeforeAcceptance

In this build, the test failed with this log:

2018-10-16T20:27:23.8679970Z Failed   MultiplexingStreamTests.CancelChannelOfferBeforeAcceptance(cancelFirst: False)
2018-10-16T20:27:23.8682090Z Error Message:
2018-10-16T20:27:23.8682500Z  System.InvalidOperationException : Channel is no longer available for acceptance.
2018-10-16T20:27:23.8682620Z Stack Trace:
2018-10-16T20:27:23.8682820Z    at Nerdbank.Streams.MultiplexingStream.AcceptChannelOrThrow(Channel channel, ChannelOptions options) in /Users/vsts/agent/2.140.2/work/1/s/src/Nerdbank.Streams/MultiplexingStream.cs:line 803
2018-10-16T20:27:23.8682980Z    at Nerdbank.Streams.MultiplexingStream.AcceptChannelAsync(String name, ChannelOptions options, CancellationToken cancellationToken) in /Users/vsts/agent/2.140.2/work/1/s/src/Nerdbank.Streams/MultiplexingStream.cs:line 469
2018-10-16T20:27:23.8683200Z    at MultiplexingStreamTests.CancelChannelOfferBeforeAcceptance(Boolean cancelFirst) in /Users/vsts/agent/2.140.2/work/1/s/src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs:line 377
2018-10-16T20:27:23.8684250Z --- End of stack trace from previous location where exception was thrown ---
2018-10-16T20:27:23.8684460Z Standard Output Messages:
2018-10-16T20:27:23.8684550Z  mx2 Information: 0 : Multiplexing protocol established successfully.
2018-10-16T20:27:23.8684700Z  mx1 Information: 0 : Multiplexing protocol established successfully.
2018-10-16T20:27:23.8685170Z  mx1 Information: 12 : Sending Offer frame for channel 2, carrying 0 bytes of content.
2018-10-16T20:27:23.8685330Z  mx2 Information: 13 : Received Offer frame for channel 2 with 0 bytes of content.
2018-10-16T20:27:23.8685410Z  mx2 Information: 9 : Remote party offers channel 2 "" which has no pending AcceptChannelAsync
2018-10-16T20:27:23.8685550Z  mx1 Information: 11 : Offer of channel 2 ("") canceled.
2018-10-16T20:27:23.8685630Z  mx1 Information: 10 : Local channel "" stream disposed.
2018-10-16T20:27:23.8685760Z  mx1 Information: 12 : Sending ChannelTerminated frame for channel 2, carrying 0 bytes of content.
2018-10-16T20:27:23.8685850Z  mx2 Information: 13 : Received ChannelTerminated frame for channel 2 with 0 bytes of content.
2018-10-16T20:27:23.8685930Z  mx2 Information: 7 : Accepting channel 2 "" which is already offered by the other side.
2018-10-16T20:27:23.8686100Z  mx2 Information: 10 : Local channel "" stream disposed.
2018-10-16T20:27:23.8686170Z  mx2 Information: 12 : Sending ChannelTerminated frame for channel 2, carrying 0 bytes of content.
2018-10-16T20:27:23.8686340Z  mx1 Information: 13 : Received ChannelTerminated frame for channel 2 with 0 bytes of content.
2018-10-16T20:27:23.8686410Z  mx1 Information: 5 : Disposing.
2018-10-16T20:27:23.8686540Z  mx2 Information: 5 : Disposing.

Allow non-closing stream for pipes

Use case

I have an "interesting" problem, that I have to be able to "enable/disable" an SslStream on an open connection. The idea is to wrap an IDuplexPipe in a Stream which gets passed (if requested) to an SslStream. Now, the client should be able to return to a non-encrypted connection - for example to use a different certificate, etc...).

Possible solution

The ideal solution would be to tell the PipeStream to keep the underlying pipes open when Dispose gets called. Now I would be able create new streams on the same old pipes.

My workaround is to use the RawStream implementation from the Kestrel project.

Expose MultiplexingStream from its nested Channel class

It turns out that in some reasonable use cases that it's rather cumbersome to pass along a Channel and a MultiplexingStream to folks that need to use a channel and possibly create new ones. Hiding the ability to create new channels when given a Channel seemed initially like a good idea, but in retrospect this can and should be done simply by exposing an IDuplexPipe or Stream to the user that shouldn't know about the MultiplexingStream.

Add sub-stream for writing (and later reading) an unknown length

When writing to a stream, sometimes one needs to write a particular blob of an unknown length, and write more afterward. The reader, on the other hand, needs to be able to read that blob and know where the end is.

Today the options are:

  1. Write the blob to a MemoryStream first, then write out the length of the stream to the original stream followed by copying the content of the MemoryStream to the original stream. This requires that a large chunk of contiguous memory be allocated for the MemoryStream, which may not be reasonable.
  2. Find some special byte or byte sequence that won't appear in the blob to write after the blob, and have the reader carefully scan for that byte sequence and stop there. This is like HTTP chunking. But it has the nasty side-effect that the reader has to read just one byte at a time for fear of reading too much from the stream and making those subsequent bytes unavailable to the parent stream's reader, leading to hugely inefficient and/or complex code.

What I'm proposing is a special writing/reading stream that will buffer up data up to some fixed size (e.g. 4K) then write out to the parent stream with a length header for just that 4K frame. When the last frame comes, it will be less than 4K and that is the clue for the reader that it should read exactly x bytes and then quit. Is should be hugely efficient, and hidden for the stream user almost entirely aside from the fact that a sub-stream was created on reading and writing that portion of the parent stream.

Feature request: add a delegate to be called before writing to the stream

Something like this:

public delegate void BeforeWriteToFullDuplexStreamDelegate(FullDuplexStream stream, byte[] buffer, int offset, int count);

public class FullDuplexStream : Stream
{
       /// <summary>
       /// Gets or sets a delegate that is called just before writing to the stream,
       /// but after the caller's cancellation token has been checked.
       /// </summary>
       public BeforeWriteToFullDuplexStreamDelegate BeforeWrite { get; set; }

       /// <summary>
       /// Creates a pair of streams that can be passed to two parties
       /// to allow for interaction with each other.
       /// </summary>
       /// <returns>A pair of streams.</returns>
       public static Tuple<FullDuplexStream, FullDuplexStream> CreateStreams()
       {
           var stream1 = new FullDuplexStream();
           var stream2 = new FullDuplexStream();
           stream1.SetOtherStream(stream2);
           stream2.SetOtherStream(stream1);
           return Tuple.Create(stream1, stream2);
       }

       /// <inheritdoc />
       public override void Write(byte[] buffer, int offset, int count)
       {
           Requires.NotNull(buffer, nameof(buffer));
           Requires.Range(offset >= 0, nameof(offset));
           Requires.Range(count >= 0, nameof(count));
           Requires.Range(offset + count <= buffer.Length, nameof(count));
           this.BeforeWrite?.Invoke(this, buffer, offset, count);

           // Avoid sending an empty buffer because that is the signal of a closed stream.
           if (count > 0)
           {
               byte[] queuedBuffer = new byte[count];
               Array.Copy(buffer, offset, queuedBuffer, 0, count);
               this.other.PostMessage(new Message(queuedBuffer));
           }
    }
}

Where can we find a getting started reference

@AArnott Other than the unit testing have you had a chance to generating a getting started developers guide or code reference? Ive got time and space to generate artifacts as I learn the api for sample, example sources.

Add a Sequence<T> class

.NET offers a ReadOnlySequence<T> struct but no way to conveniently create or maintain one. Let's write one in this project:

Requirements

  1. It should facilitate acquiring arrays of requested sizes for the caller to fill. These arrays should come from ArrayPool<T>.
  2. Given a (possibly partially) filled array, append it to the internally maintained sequence.
  3. Offer an implicit operator to convert to ReadOnlySequence<T>
  4. As an AdvanceTo method is invoked, causing some head portion of the sequence to be discarded, find any backing arrays that are now completely unused and return them to ArrayPool<T>.

Sequence<T> perf improvements

  • Convert Memory to Span before Slicing where possible (e.g. GetSpan's call to TrailingSlack which slices Memory)
  • Fewer calculations and slices all around: store Memory in a field for frequent calls like GetSpan and GetMemory

Reduce channel overhead (GC pressure)

The MultiplexingStreamPerfTests reveal that communicating the same data over channels is a bit more expensive in CPU, and significantly more expensive in terms of GC pressure. We should analyze the allocations and optimize anything we can with a goal of reducing allocations by at least 50%.

To illustrate, running a simple JSON-RPC invocation over named pipes incurs this cost:

Test host launched with: "D:\git\Nerdbank.FullDuplexStream\bin\Nerdbank.Streams.Tests\Debug\net461\IsolatedTestHost.exe" "D:\git\Nerdbank.FullDuplexStream\bin\Nerdbank.Streams.Tests\Debug\net461\Nerdbank.Streams.Tests.dll" "MultiplexingStreamPerfTests" "JsonRpcPerf_Pipe" "False"
Bytes allocated during quiet wait period: 648
373112 bytes allocated (373 per iteration)
Elapsed time: 279ms (0.279ms per iteration)

While running the same test except using a single channel over that same named pipe incurs 3.3K more allocations (10X the amount of the named pipe)!

Test host launched with: "D:\git\Nerdbank.FullDuplexStream\bin\Nerdbank.Streams.Tests\Debug\net461\IsolatedTestHost.exe" "D:\git\Nerdbank.FullDuplexStream\bin\Nerdbank.Streams.Tests\Debug\net461\Nerdbank.Streams.Tests.dll" "MultiplexingStreamPerfTests" "JsonRpcPerf_Channel" "False"
Bytes allocated during quiet wait period: 2248
Bytes allocated during quiet wait period: 32
3749344 bytes allocated (3749 per iteration)
Elapsed time: 308ms (0.308ms per iteration)

Allow MultiplexingStream.Channel to accept PipeWriter/PipeReader from caller

Instead of always instantiating a new Pipe() instance for reading and another for writing in the Channel constructor, add ChannelOptions properties for the caller to pass in the PipeWriter and PipeReader to use. This can eliminate a link in a pipe-chain and thus all the memory copies that that entails.

Calling .AsStream() on a pipe that is an IPC stream adds another adapter

In the case where a Pipe is actually an IPC Stream, the fact that we see it as a Pipe already suggests that there's a stream-to-pipe adapter on it. Sticking on another adapter means we're handing back a stream->pipe->stream object. The underlying stream should be unwrapped when the pipe is just an adapter to just return the original stream instead of stacking adapters.

Channel.Dispose can race with previous write+flush leading to data loss

When writing to a Channel, flushing, then immediately Disposing, that channel can actually transmit its own termination before it gets around to writing the content that had been flushed to it.

This causes the MultiplexingStream to send a termination frame followed by content for that same channel, and the remote party will reject the content.

Trying to call ReadAsync twice on an SSL Stream PipeReader throws an Exception

I'm using the Extension: SSLStream.UsePipeReader();

When calling ReadAsync the first time is successful, however on the second call this exception is thrown:

System.NotSupportedException : The ReadAsync method cannot be called when another read operation is pending. at System.Net.Security.SslStreamInternal.ReadAsyncInternal[TReadAdapter](TReadAdapter adapter, Memory1 buffer)
at Nerdbank.Streams.PipeExtensions.<>c__DisplayClass5_0.<b__1>d.MoveNext() in D:\a\1\s\src\Nerdbank.Streams\PipeExtensions.cs:line 92
--- End of stack trace from previous location where exception was thrown ---
at System.IO.Pipelines.PipeCompletion.ThrowLatchedException()
at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
at System.IO.Pipelines.Pipe.GetReadAsyncResult()`

Switching to System.Pipelines.Unofficial implementation StreamConnection.GetDuplex(SSLStream) I was able to call ReadAsync the second time successfully.

Add NPM package with (at least) MultiplexingStream

We should also offer an NPM package that offers at least the MultiplexingStream (TypeScript style) for interop between .NET and Javascript.

So far, we already have one team that it would be a good fit for, and can't adopt it as a solution without a javascript side.

Sequence<T> does not release references to T upon disposal

When a Sequence<T> is initialized with many reference type T instances, and then the Sequence<T> is disposed or Reset() is called, the underlying arrays are recycled into a pool without first clearing them. This results in a (bounded?) memory leak because all those references will never be candidates for GC (till the arrays get reset in some other way).

We should clear arrays before recycling them, at least if they contain reference types.

Proposal for scope increase

Make this repo build a Nerdbank.Streams package that adds:

  • Stream that wraps a WebSocket
  • Double-ended stream (i.e. half of what FullDuplexStream gives you: just one stream that you can write on one end and read from the other, but not the other way around, so no two-way communication). Preferably one that utilizes a circular buffer rather than allocating a new buffer for every write operation.
  • Multiplexing stream

Creating a channel with an existing, half-completed pipe produces content message before offer

Creating a duplex pipe and completing its input so that the other side cannot transmit but may receive causes the channel based on it to send content before the offer, leading to the reverse side throwing an exception.

[Fact]
public async Task OfferReadOnlyPipe()
{
    // Prepare a readonly pipe that is already fully populated with data for the other end to read.
    var pipePair = FullDuplexStream.CreatePipePair();
    pipePair.Item1.Input.Complete(); // we don't read -- we only write.
    await pipePair.Item1.Output.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
    pipePair.Item1.Output.Complete();

    var ch1 = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = pipePair.Item2 });
    await this.WaitForEphemeralChannelOfferToPropagateAsync();
    var ch2 = this.mx2.AcceptChannel(ch1.Id);
    var readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
    Assert.Equal(3, readResult.Buffer.Length);
    ch2.Input.AdvanceTo(readResult.Buffer.End);
    readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
    Assert.True(readResult.IsCompleted);
}

The trace output from this test is:

00:00:00.0000515 mx2 Information: 1 : Multiplexing protocol established successfully.
00:00:00.0000833 mx2 Verbose: 18 : Waiting for next frame
00:00:00.0001529 mx1 Information: 1 : Multiplexing protocol established successfully.
00:00:00.0001939 mx1 Verbose: 18 : Waiting for next frame
00:00:00.0069823 mx1 channel 2 Verbose: 0 : 0 of 0 bytes will be transmitted.
00:00:00.0070127 mx1 channel 2 Verbose: 0 : Transmission terminated because the writer completed.
00:00:00.0070442 mx1 channel 2 Information: 5 : Channel self-closing because both parties have completed transmission.
00:00:00.0000432 mx1 channel 1 Verbose: 0 : 3 of 3 bytes will be transmitted.
00:00:00.0011600 mx1 Information: 13 : Sending Content frame for channel 1, carrying 3 bytes of content.
0x010203
00:00:00.0001543 mx1 channel 1 Verbose: 0 : Transmission terminated because the writer completed.
00:00:00.0012648 mx1 Information: 13 : Sending Offer frame for channel 1, carrying 0 bytes of content.
00:00:00.0013175 mx1 Information: 13 : Sending Offer frame for channel 3, carrying 22 bytes of content.
0x457068656D6572616C4368616E6E656C576169746572
00:00:00.0013772 mx2 Information: 7 : Waiting to accept channel "EphemeralChannelWaiter", when offered by the other side.
00:00:00.0015335 mx2 Information: 14 : Received Content frame for channel 1 with 3 bytes of content.
00:00:00.0017257 mx2 Critical: 3 : Disposing self due to exception: System.Collections.Generic.KeyNotFoundException: The given key was not present in the dictionary.
   at System.Collections.Generic.Dictionary`2.get_Item(TKey key)
   at Nerdbank.Streams.MultiplexingStream.<OnContentAsync>d__58.MoveNext() in D:\git\Nerdbank.Streams\src\Nerdbank.Streams\MultiplexingStream.cs:line 791
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable.ConfiguredValueTaskAwaiter.GetResult()
   at Nerdbank.Streams.MultiplexingStream.<ReadStreamAsync>d__55.MoveNext() in D:\git\Nerdbank.Streams\src\Nerdbank.Streams\MultiplexingStream.cs:line 698
00:00:00.0017666 mx2 Information: 6 : Disposing.
00:00:00.0018367 mx1 Information: 6 : Disposing.
00:00:00.0006437 mx1 channel 3 Verbose: 0 : 0 of 0 bytes will be transmitted.
00:00:00.0006772 mx1 channel 3 Verbose: 0 : Transmission terminated because the writer completed.
00:00:00.0007077 mx1 channel 3 Information: 5 : Channel self-closing because both parties have completed transmission.
00:00:00.0010616 mx1 channel 1 Information: 5 : Channel self-closing because both parties have completed transmission.
00:00:00.0022073 mx1 Information: 6 : Disposing.
00:00:00.0022351 mx2 Information: 6 : Disposing.

Package Dependencies

The NuGet package Nerdbank.Streams doesn't have a (direct) dependency to System.Memory.

see Screenshot: NETStandard 2.0 Dependencies of Nerdbank.Streams

Interestingly the assembly has one.

see Screenshot: Dependency to System.Memory

I'm surprised to see this. Also, your codebase doesn't contain a reference to the System.Memory assembly (at a fist glance). Any ideas how that happened? Compiler magic by inlining of parts from System.Memory??

Because there is no direct dependency our package management has some issues to resolve the right version.
It might help in these circumstances, if the System.Memory package reference is listed inside the nuspec file.

Add extension methods to attach completion callbacks to pipes

IDuplexPipe and perhaps PipeReader and PipeWriter could use extension methods for OnReaderCompleted and OnWriterCompleted or OnCompleted so that callbacks can fire when they are completed.

Since these types don't support callbacks (those that exist are deprecated), the strategy would be to wrap the instances in order to monitor the Complete calls directly.

Also: do the same for Stream

Calling `AdvanceTo(Sequence.Start)` on empty sequence throws

Using the following code:

using var seq = new Sequence<byte>();
seq.AdvanceTo(seq.AsReadOnlySequence.Start);

I get the following exception:

Unable to cast object of type 'System.Byte[]' to type 'SequenceSegment[System.Byte]'.
Stack Trace: 
  Sequence`1.AdvanceTo(SequencePosition position)

PipeExtensions triggers UnobservedTaskException

Version: 2.2.5-alpha
SDK: 2.2.204

Repro project: Repro.zip

The code fragment below in PipeExtensions re-throws an exception from the underlying stream "unobserved", triggering UnobservedTaskException. Because the exception is properly propagated to the writer by pipe.Reader.Complete(ex), it does not need to be re-thrown.

This issue adds some noise to UnobservedTaskException, making debugging of async code slightly harder.

https://github.com/AArnott/Nerdbank.Streams/blob/master/src/Nerdbank.Streams/PipeExtensions.cs#L187

Task.Run(async delegate
{
    try
    {
// (snip)
    }
    catch (Exception ex)
    {
        pipe.Reader.Complete(ex);
        throw; // HERE
    }
}).Forget();

ArgumentException thrown from Sequence<T>.AdvanceTo

From this build, which consumed Nerdbank.Streams v2.0.165-beta, failed here:

https://github.com/AArnott/vs-streamjsonrpc/blob/80ff0009ff93116773c8279e7e3c8414a1f3927f/src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs#L207

Failure log follows:

2018-09-28T21:21:57.3498266Z [xUnit.net 00:00:12.7193559]         Server Critical: 13 : Connection closing (StreamError: Reading JSON RPC from the stream failed with ArgumentException: Position does not represent a valid position in this sequence.
2018-09-28T21:21:57.3498551Z [xUnit.net 00:00:12.7193830]         Parameter name: position). System.ArgumentException: Position does not represent a valid position in this sequence.
2018-09-28T21:21:57.3498612Z [xUnit.net 00:00:12.7194027]         Parameter name: position
2018-09-28T21:21:57.3498705Z [xUnit.net 00:00:12.7194191]            at Nerdbank.Streams.Sequence`1.AdvanceTo(SequencePosition position)
2018-09-28T21:21:57.3498771Z [xUnit.net 00:00:12.7194353]            at Nerdbank.Streams.StreamPipeReader.AdvanceTo(SequencePosition consumed, SequencePosition examined)
2018-09-28T21:21:57.3498880Z [xUnit.net 00:00:12.7194498]            at Nerdbank.Streams.StreamPipeReader.AdvanceTo(SequencePosition consumed)
2018-09-28T21:21:57.3498952Z [xUnit.net 00:00:12.7194658]            at StreamJsonRpc.HeaderDelimitedMessageHandler.<ReadCoreAsync>d__25.MoveNext() in /__w/1/s/src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs:line 207
2018-09-28T21:21:57.3499242Z [xUnit.net 00:00:12.7194890]         --- End of stack trace from previous location where exception was thrown ---
2018-09-28T21:21:57.3499349Z [xUnit.net 00:00:12.7195030]            at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
2018-09-28T21:21:57.3499415Z [xUnit.net 00:00:12.7195190]            at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
2018-09-28T21:21:57.3499526Z [xUnit.net 00:00:12.7195332]            at StreamJsonRpc.MessageHandlerBase.<ReadAsync>d__14.MoveNext() in /__w/1/s/src/StreamJsonRpc/MessageHandlerBase.cs:line 93
2018-09-28T21:21:57.3499879Z [xUnit.net 00:00:12.7195493]         --- End of stack trace from previous location where exception was thrown ---
2018-09-28T21:21:57.3499944Z [xUnit.net 00:00:12.7195653]            at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
2018-09-28T21:21:57.3500050Z [xUnit.net 00:00:12.7195792]            at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
2018-09-28T21:21:57.3500118Z [xUnit.net 00:00:12.7195952]            at StreamJsonRpc.JsonRpc.<ReadAndHandleRequestsAsync>d__93.MoveNext() in /__w/1/s/src/StreamJsonRpc/JsonRpc.cs:line 1519
2018-09-28T21:21:57.3500237Z [xUnit.net 00:00:12.7196095]         Server Critical: 13 : Connection closing (StreamError: Reading JSON RPC from the stream failed with BadRpcHeaderException: Header does not end with expected 
2018-09-28T21:21:57.3500643Z [xUnit.net 00:00:12.7196337]          character sequence.). StreamJsonRpc.BadRpcHeaderException: Header does not end with expected 
2018-09-28T21:21:57.3500888Z [xUnit.net 00:00:12.7196507]          character sequence.
2018-09-28T21:21:57.3500955Z [xUnit.net 00:00:12.7196647]            at StreamJsonRpc.HeaderDelimitedMessageHandler.<ReadHeadersAsync>d__35.MoveNext() in /__w/1/s/src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs:line 441
2018-09-28T21:21:57.3501459Z [xUnit.net 00:00:12.7196882]         --- End of stack trace from previous location where exception was thrown ---
2018-09-28T21:21:57.3501765Z [xUnit.net 00:00:12.7197042]            at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
2018-09-28T21:21:57.3501955Z [xUnit.net 00:00:12.7197182]            at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
2018-09-28T21:21:57.3502661Z [xUnit.net 00:00:12.7197345]            at StreamJsonRpc.HeaderDelimitedMessageHandler.<ReadCoreAsync>d__25.MoveNext() in /__w/1/s/src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs:line 171
2018-09-28T21:21:57.3503131Z [xUnit.net 00:00:12.7197560]         --- End of stack trace from previous location where exception was thrown ---
2018-09-28T21:21:57.3503317Z [xUnit.net 00:00:12.7197720]            at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
2018-09-28T21:21:57.3503414Z [xUnit.net 00:00:12.7197880]            at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
2018-09-28T21:21:57.3503565Z [xUnit.net 00:00:12.7198023]            at StreamJsonRpc.MessageHandlerBase.<ReadAsync>d__14.MoveNext() in /__w/1/s/src/StreamJsonRpc/MessageHandlerBase.cs:line 93
2018-09-28T21:21:57.3503966Z [xUnit.net 00:00:12.7198185]         --- End of stack trace from previous location where exception was thrown ---
2018-09-28T21:21:57.3504095Z [xUnit.net 00:00:12.7198327]            at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
2018-09-28T21:21:57.3504223Z [xUnit.net 00:00:12.7198485]            at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
2018-09-28T21:21:57.3504336Z [xUnit.net 00:00:12.7198647]            at StreamJsonRpc.JsonRpc.<ReadAndHandleRequestsAsync>d__93.MoveNext() in /__w/1/s/src/StreamJsonRpc/JsonRpc.cs:line 1519

SubstreamReader should drain on disposal

SubstreamReader may leaves bytes unread from the underlying stream even after the entire user-content of the substream has been read, leaving the original stream in a state where it will read unexpected bytes.

This should be protected against by ensuring that Substream disposal 'fast forwards' the substream. An async method should allow for this as well.

MultiplexingStream.Dispose races with itself and throws NRE

When calling MultiplexingStream.Dispose I witnessed an NRE with this callstack:

 	mscorlib.dll!System.IO.BufferedStream.Dispose(bool disposing) Line 275	C#
 	mscorlib.dll!System.IO.Stream.Close() Line 249	C#
 	mscorlib.dll!System.IO.Stream.Dispose() Line 261	C#
 	Microsoft.VisualStudio.LiveShare.Rpc.Json.dll!Microsoft.Cascade.Rpc.Json.SafeWriteBufferedStream.DisposeWritable() Line 46	C#
 	Microsoft.VisualStudio.LiveShare.Rpc.Json.dll!Microsoft.Cascade.Rpc.Json.WrappedStream.Dispose(bool disposing) Line 104	C#
 	mscorlib.dll!System.IO.Stream.Close() Line 249	C#
 	mscorlib.dll!System.IO.Stream.Dispose() Line 261	C#
 	Nerdbank.Streams.dll!Nerdbank.Streams.MultiplexingStream.Dispose(bool disposing)	Unknown
 	Nerdbank.Streams.dll!Nerdbank.Streams.MultiplexingStream.Dispose()	Unknown
>	Microsoft.VisualStudio.Shell.UI.Internal.dll!Microsoft.VisualStudio.Services.LiveShareServiceBrokerIntegration.ServiceBrokerHost.CreateServiceAsync.AnonymousMethod__1() Line 245	C#

The NRE would be caused by calling BufferedStream.Dispose concurrently on two threads. Could MultiplexingStream be responsible for calling Dispose on itself from another thread while we are calling Dispose on it? If so, we should ensure it's safe to call Dispose on it explicitly.

Allow for setting minimum backing array size for Sequence<T>

Some Sequence<T> instances may be used for large sequences but appended to in very small increments. This can mean many thousands of 32 byte arrays are allocated instead of just a few 4K arrays, for example, which would be much more efficient.

Sequence<T> should allow the owner to set the minimum size for any allocated backing array, so that if someone calls Sequence<T>.GetSpan(5) and a new array must be allocated, it will be that minimum size (e.g. 4K) instead of the ArrayPool minimum of 32 bytes.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.