Giter Site home page Giter Site logo

Comments (10)

mgravell avatar mgravell commented on June 20, 2024 1

K; I'm going to close this down as I don't think it is really an "issues in this library" thing - happy to offer more guidance as needed though : stay in touch!

from pipelines.sockets.unofficial.

mgravell avatar mgravell commented on June 20, 2024

can I ask some clarifying questions? it sounds like you have:

  • multiple writers
  • each writer might sometimes write multiple frames to represent a single payload (with some kind of complete/incomplete marker)
  • (the actual problem) currently, frames from different writers are becoming interleaved, making it hard to know which logical stream a second message is related to

is that right?

if so, there are two options:

  1. make sure each frame has some kind of sequence identifier in it, so that if the consume sees [1, incomplete] [1, incomplete] [2, complete] [1, complete] it knows that all the "1s" go together, and that the "2" is by itself (or however it works out)
  2. use a locking primitive such that you retain a logical lock over the stream for the duration of a multi-frame write, so that you write [1, incomplete] [1, incomplete] [1, complete] [2, complete]

The first obviously requires support in your framing protocol, so may not be possible if your frame semantics are baked and don't support it. The second option could use something like the MutexSlim that is in the library.

Obviously also: 1 puts most of the work on the consumer to parse frame streams correctly and dispatch them appropriately; 2 puts most of the work on the producer to write messages as contiguous frame sequences. 2 is simpler, in every way - but has the disadvantage that large multi-frame messages may "block" other writers (quotes because "block" here doesn't necessarily mean "block a thread" - it could block on an async-acquire of a mutex).

Note that if you have multiple writers, you should already be using a locking primitive anyway, since you can only have a single pipe writer. So in many ways, the second option just extends the logical scope of that synchronization to be "entire message sequence" rather than "single frame".

If I've misunderstood the question, please let me know!

from pipelines.sockets.unofficial.

francisminu avatar francisminu commented on June 20, 2024

Thank you so much for your prompt response. I am still in a hustle here.
If possible, could you please take a look at the below:
This is what I have now. I make use of the same PipeWriter instance all the time.

public class SocketPipe: ISocketPipe
{
        private readonly ILoggerService _loggerService;
        
        public SocketPipe(ILoggerService loggerService)
        {
            _loggerService = loggerService;
            if(_loggerService == null)
            {
                throw new ArgumentNullException($"ILoggerService object is required.");
            }
            Pipe = new Pipe();
        }

        private Pipe Pipe { get; set; }

        public Pipe GetPipe()
        {
            return Pipe ?? (Pipe = new Pipe());
        }

        public PipeReader GetPipeReader()
        {
            if(Pipe == null)
            {
                Pipe = new Pipe();
            }
            return Pipe.Reader;
        }

        public PipeWriter GetPipeWriter()
        {
            if (Pipe == null)
            {
                Pipe = new Pipe();
            }
            return Pipe.Writer;
        }

        public async Task WriteToPipe(object msg)
        {
            var data = (byte[])msg;
            var pipeWriter = GetPipeWriter();
            try
            {
                
                Memory<byte> memory = pipeWriter.GetMemory(data.Length);
                Memory<byte> dataMemory = new Memory<byte>(data);
                dataMemory.CopyTo(memory);

                // Tell the PipeWriter how much was read
                pipeWriter.Advance(data.Length);
            }
            catch(Exception ex)
            {
                _loggerService.Error(ex, new LogMessage { MessageText = "Exception occurred in Socket Pipes." });
            }
            // Make the data available to the PipeReader
            FlushResult result = await pipeWriter.FlushAsync();
            if (result.IsCompleted)
            {
                pipeWriter.Complete();
            }
        }

        public Task<string> ReadFromPipe()
        {
            var reader = GetPipeReader();
            var pipeHasData = reader.TryRead(out ReadResult inputMessage);
            if (!pipeHasData || inputMessage.Buffer.IsEmpty)
                return Task.FromResult("");
            var inboundMessage = "";
            ReadOnlySequence<byte> buffer = inputMessage.Buffer;

            var stxPosition = buffer.PositionOf(Convert.ToByte(0x02));
            var etxPosition = buffer.PositionOf(Convert.ToByte(0x03));

            if (stxPosition == null && etxPosition == null)
            {
                reader.AdvanceTo(buffer.End);
            }
            else
            {
                var labelNames = new[] { "Server" };
                var labelValues = new[] { Environment.MachineName };
                var discardedMessagesCountGauge = Metrics.CreateGauge("discarded_messages_count", "Number of messages discarded while being read from Pipe", labelNames);
                if (stxPosition != null & etxPosition != null)
                {
                    if (CheckMessageFrameSequence(stxPosition, etxPosition))
                    {
                        _loggerService.Info(new LogMessage
                        {
                            ClassName = "SocketPipe",
                            MethodName = "ReadFromPipe",
                            MessageText = "Valid message obtained."
                        });
                        var nextPosition = buffer.GetPosition(1, etxPosition.Value);
                        buffer = buffer.Slice(buffer.GetPosition(1, stxPosition.Value), etxPosition.Value);
                        if (buffer.IsSingleSegment)
                        {
                            inboundMessage = Encoding.ASCII.GetString(buffer.First.Span);
                        }
                        else
                        {
                            foreach (var segment in buffer)
                            {
                                inboundMessage = inboundMessage + Encoding.ASCII.GetString(segment.Span);
                            }
                        }
                        reader.AdvanceTo(nextPosition);
                    }
                    else
                    {
                        
                        discardedMessagesCountGauge.WithLabels(labelValues).Inc();
                        if (discardedMessagesCountGauge.Value > 10000)
                        {
                            discardedMessagesCountGauge.WithLabels(labelValues).Set(0);
                        }

                        // For logging discarded message
                        
                        buffer = buffer.Slice(buffer.Start, stxPosition.Value);
                        var discardedMessage = GetDiscardedMessage(buffer);
                        _loggerService.Warning(new LogMessage
                        {
                            ClassName = "SocketPipe",
                            MethodName = "ReadFromPipe",
                            MessageText = $"Discarded message: Current data in Pipe has ETX before the first occurrence of STX. Data till STX is discarded. Discarded data: {discardedMessage}"
                        });
                        reader.AdvanceTo(stxPosition.Value);
                    }
                }
                else if (stxPosition == null)
                {
                    discardedMessagesCountGauge.WithLabels(labelValues).Inc();
                    if (discardedMessagesCountGauge.Value > 10000)
                    {
                        discardedMessagesCountGauge.WithLabels(labelValues).Set(0);
                    }
                    // For logging discarded message
                    var discardedMessage = GetDiscardedMessage(buffer);

                    _loggerService.Warning(new LogMessage
                    {
                        ClassName = "SocketPipe",
                        MethodName = "ReadFromPipe",
                        MessageText = $"Discarded message: Buffer has only ETX and no corresponding STX. Removing entire data from Pipe. Discarded data: {discardedMessage}"
                    });
                    reader.AdvanceTo(buffer.End);
                }
                else
                {
                    _loggerService.Warning(new LogMessage
                    {
                        MessageText = "Only STX found in buffer. Acceptable message not received. Hence not reading from the buffer."
                    });
                    reader.AdvanceTo(stxPosition.Value, buffer.End);
                }
            }

            if (inputMessage.IsCompleted)
            {
                reader.Complete();
            }
            return Task.FromResult(inboundMessage);
        }

        #region PrivateMethods

        private bool CheckMessageFrameSequence(SequencePosition? stxPos, SequencePosition? etxPos)
        {
            if (stxPos != null && etxPos != null)
                return stxPos.Value.GetInteger() < etxPos.Value.GetInteger();
            _loggerService.Warning(new LogMessage
            {
                ClassName = "SocketPipe",
                MethodName = "CheckMessageFrameSequence",
                MessageText = $"Null value for stxPos/etxPos. stxPos: {stxPos} etxPos: {etxPos}"
            });
            return false;
        }

        private string GetDiscardedMessage(ReadOnlySequence<byte> buffer)
        {
            var discardedMessage = "";
            if (buffer.IsSingleSegment)
            {
                discardedMessage = Encoding.ASCII.GetString(buffer.First.Span);
            }
            else
            {
                foreach (var segment in buffer)
                {
                    discardedMessage = discardedMessage + Encoding.ASCII.GetString(segment.Span);
                }
            }
            return discardedMessage;
        }


        #endregion

    }

The caller function:

private async Task<string> ProcessData(object msg)
        {
            try
            {
                await _pipe.WriteToPipe(msg);

                var dataReadFromPipe = await _pipe.ReadFromPipe();
                return dataReadFromPipe;
            }
            catch(Exception ex)
            {
                _loggerService.Error(ex, new LogMessage { MessageText = "Error occured in ProcessData" });
                return "";
            }
        }

And the Processdate gets called whenever dotnetty receives data.
Could you please take a look to see if the implementation is correct at all? I just started using the Pipe feature and this is how I came about the implementation. Please let me know if this is not the correct implementation.

Thanks,
Minu

(edit by Marc: code formatting)

from pipelines.sockets.unofficial.

mgravell avatar mgravell commented on June 20, 2024

right; looking at public async Task WriteToPipe(object msg) first; I'm guessing that whatever framing protocol you're using is already built into msg at this point? i.e. it already has some kind of delimiter to allow individual messages to be deframed?

It isn't clear to me whether how you are handling concurrency; the original question seemed to be talking about multiple writers, but this code doesn't protect against that at all. So: can you clarify what the expected behaviour is here? Note: pipelines itself provides zero protection against concurrent writes, so if you need to support multiple writers, you'll need to add that yourself. This would be easy for me to add with MutexSlim, so please let me know what you intend to happen here - I may be able to help.

You then do this:

                Memory<byte> memory = pipeWriter.GetMemory(data.Length);
                Memory<byte> dataMemory = new Memory<byte>(data);
                dataMemory.CopyTo(memory);

                // Tell the PipeWriter how much was read
                pipeWriter.Advance(data.Length);
...
            FlushResult result = await pipeWriter.FlushAsync();

The problem here is: sizes. GetMemory isn't guaranteed to support arbitrary sizes - the idea is that you ask for something, do what you can (exploiting what it gave you, which might be more than you asked for). Fortunately, the method to safely do everything here already exists. Everything above is simply:

var result = await pipeWriter.WriteAsync(data);

You then do this:

            if (result.IsCompleted)
            {
                pipeWriter.Complete();
            }

It isn't clear to me what you're doing here, but: calling Complete() on a PipeWriter is saying "I'm all done, no more data will be forthcoming". Since the original question suggested multiple frames, I suspect this is just... incorrect and should be removed.

next up: ReadFromPipe

from pipelines.sockets.unofficial.

mgravell avatar mgravell commented on June 20, 2024

You start off by doing:

            var pipeHasData = reader.TryRead(out ReadResult inputMessage);
            if (!pipeHasData || inputMessage.Buffer.IsEmpty)
                return Task.FromResult("");

but... we expect there to not be a response yet (sockets being async), so... I can't see any point in even testing this - just lose it? At the moment, it looks like it will always incorrectly return an empty string? You almost always want to write a loop here, that keeps doing an async read until either it has the data it needs, or there isn't enough data and the incoming stream is terminated.

You also want to be very careful about how you consume data; for example:

            if (stxPosition == null && etxPosition == null)
            {
                reader.AdvanceTo(buffer.End);

that looks very bad to me; that appears to say "if we haven't found our frame delimiters, then consume the data we've peeked at", when what you really want to say is "if we haven't found our frame delimiters, consume nothing, but record that we've inspected everything"; my IDE is updating right now, but I believe that is something like:

reader.AdvanceTo(buffer.Start, buffer.End);

(i.e. the first parameter is "consumed" - nothing, and the second parameter is "inspected" - everything)

Again, inside a loop, this means that your next ReadAsync should get more data including the data you've already looked at once, hopefully now as a complete frame. Again, this would typically be in a loop that runs until you have dequeued a message.

The same issue of concurrency arises. In terms of timing, note that you want to acquire the read lock before you release the write lock, otherwise you get into a race condition where multiple writers can compete and get out-of-order.

Finally, we have:

            if (inputMessage.IsCompleted)
            {
                reader.Complete();
            }

which like above, looks incorrect. reader.Complete means "I'm done, I won't be reading any more", but you mention multiple frames, so this is probably incorrect.


Has that been helpful at all??

from pipelines.sockets.unofficial.

francisminu avatar francisminu commented on June 20, 2024

Thank you so much Marc. I will try to clean up the implementation as per the suggestions you have provided.
The reason why I have
reader.AdvanceTo(buffer.End);
is because:
Lets say pipe has the following at present: testdatatrial123
Now, there is no STX or ETX, now if a new data comes in (proper data), it will be STX test1232432 ETX and I do not need the data that is testdatatrial123 as I cannot have an STX before that data anyways.

Also, in ReadPipe, the reason I do not have a loop is because, ours is kind of synchronous implementation, The client connects and expects a response in the same connection. So, I need to return the data of a request in the same flow. It would be right to say, this is more like a synchronous implementation.

I have not worked with MutexSlim before, I will take a look.
Thank you once again for the inputs, they were really helpful.

Thanks,

from pipelines.sockets.unofficial.

francisminu avatar francisminu commented on June 20, 2024

@mgravell
Hi Marc,

I know this is not the place to ask questions, Please let me know if you want me to send this question via some other medium.

I was just going through the samples once again and now I am a bit more confused. The owner doesn't seem to respond, so I thought you might be the best person next.
As per the sample,
https://github.com/davidfowl/TcpEcho/blob/master/src/PipelinesServer/Program.cs

Does it mean that a Pipe instance is created for every client that is connected to the server?

At present, my implementation is such that I have a global instance of Pipe that is registered via Dependency Injection and I use the same instance. All the clients write to the same Pipe instance. Could you verify if that approach is correct?

Thanks,
Minu Francis

from pipelines.sockets.unofficial.

mgravell avatar mgravell commented on June 20, 2024

from pipelines.sockets.unofficial.

francisminu avatar francisminu commented on June 20, 2024

@mgravell Thank you for the response Marc.
This is what our requirement is:

  1. We have a TCP Server running
  2. There are several TCP Clients that connect to this server. They connect and then send a message along with the connection and then expects a response back (kind of a synchronous model).
  3. Once the response is sent, the client is disconnected.
  4. So in other words, every message from client connects to our server and once that message is processed and response is sent, it disconnects and this continues .
  5. Now, for the Socket implementation, we have used Dotnetty so that it takes care of the Connect and Disconnect part
  6. They have specific implementations called ChannelActive (when connected) and ChannelInactive (when disconnected). I can make use of these to create instance of Pipe each time a connection is made (This is what I was not sure about, if I should do so and this was my earlier qtn today).

So, at present what happens is ,
Client A -> Connects (Message size : 2048)
Dotnetty reads only 1024 at a time, so it reads 1024 and writes to the Pipe (which is a global instance).
At about the same time there is another Client b -> Connects (MessagesizeL 1024)
Dotnetty reads and writes to the same pipe.

Since it is the same global instance of Pipe that I am using, the Pipe now has data in the form:
PartOfMessage1_Message2_SecondPartOfMessage1
Which is now completely wrong as the messages have got mixed up now.

So, my doubt was if I should go for Pipe creation in each ChannelActive and do a PipeWriter.Complete in ChannelInactive or should I continue with the global pipe instance itself?

Now, if I can go ahead with the global Pipe instance, I went through something called Mutex, is that same as the MutexSlim that you had mentioned earlier?

Thanks,
Minu Francis

from pipelines.sockets.unofficial.

mgravell avatar mgravell commented on June 20, 2024

from pipelines.sockets.unofficial.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.