Giter Site home page Giter Site logo

rust-yamux's Introduction

Yamux

A stream multiplexer over reliable, ordered connections such as TCP/IP. Implements https://github.com/hashicorp/yamux/blob/master/spec.md

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

rust-yamux's People

Contributors

appaquet avatar aschran avatar demi-marie avatar dependabot[bot] avatar dvdplm avatar gowee avatar koushiro avatar libp2p-mgmt-read-write[bot] avatar mxinden avatar romanb avatar thomaseizinger avatar tomaka avatar twittner avatar web-flow avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rust-yamux's Issues

Use type state pattern

I'm coming from the Go yamux so I understand how the Go library works, however, I'm wondering if it makes sense given the Rust type system to prevent server-side connections from being able to call client-only methods. Right now, the API will let me call client-only methods like open_stream even though I'm working with a server connection. It seems like Rust's type system can better express the distinction between a server and client connection. I'm not a Rust expert, so I don't know how to pull this off, but it does feel like the API could be better.

Dropped `Stream`s are never removed until connection closes?

I'm integrating yamux into my app, which creates many new streams on a single connection. I am finding that when I run my hacked-together benchmark tool, new streams on the server side are continually accepted and never removed, even though the yamux::Stream objects used in my app are properly being dropped, until the max_num_streams limit in yamux is hit and my benchmark consequently dies.

In looking at Active<T> in connection.rs, it seems that the max_num_streams limit is enforced based on the length of self.streams. The only time streams are removed from that map are:

  1. in on_drop_stream
  2. in drop_all_streams, when the connection is dropped

Setting aside (2), which works fine, I am having trouble seeing how (1) is supposed to work? It looks like on_drop_stream is called in Active<T>::poll when the corresponding channel in self.stream_receivers is closed (i.e. receiver returns None). However, the clone of the Stream that's stored in self.streams itself includes a clone of the mpsc::Sender<StreamCommand>. Even when my app drops its Stream (and thus its Sender), the clone that's stored in the Active<T> persists, which looks like a circular reference that prevents that Stream from ever being removed until the whole connection is dropped.

I feel like I must be missing something? How is this supposed to work, so that streams can be removed (and thus not count towards the max streams limit) before the connection is taken down?

Consider increasing `receive_window` default

In order to exhaust the bandwidth delay product of a connection, i.e. to maximize the amount of data in-flight, so to say the amount of data temporarily stored on the wire, one needs a receive window size in accordance to the connection properties.

Example:

  • Consider two nodes in two different datacenters, across the US (e.g. AWS us-east-1 and us-west-2).
  • Latency between the two nodes is 60 ms.
  • They each have a link of 25 Gbit/s.
  • The bandwidth delay product would be: 60ms * 25Gbit/s = 187.5 Mbyte

(Learn more about the setup on libp2p/test-plans#184.)

In other words, one needs a 187.5 Mbyte receive window to fully utilize the underlying network between the two machines.

Currently we set the default receive window per stream to 256 Kbyte. This is the default value according to the Yamux specification.

pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification

rust-yamux/yamux/src/lib.rs

Lines 116 to 127 in 34dcb97

impl Default for Config {
fn default() -> Self {
Config {
receive_window: DEFAULT_CREDIT,
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
window_update_mode: WindowUpdateMode::OnRead,
read_after_close: true,
split_send_size: DEFAULT_SPLIT_SEND_SIZE,
}
}
}

At the same time we set a maximum buffer size per stream to 1 Mbyte. This can not possibly be used, i.e. the buffer can never grow larger than the receive window, unless one uses WindowUpdateMode::OnReceive, which I consider a bad option in every case as it breaks backpressure. The default value is probably still from a time when WindowUpdateMode::OnReceive was the default, i.e. before ab4fe45.

max_buffer_size: 1024 * 1024,

In the above scenario, when using the default values, one can achieve roughly 20 Mbit/s on a single stream. Using a receive window of 16 Mbyte bumps this to 300 Mbit/s. Note that with the default one can still exhaust the bandwidth delay product, one just has to use multiple streams.

There is a downside to choosing a large receive window. The larger the receive window, the more memory an adversary can make the local node allocate, potentially leading to the local node to run out of memory.

So what value should we choose instead of the current 256 Kbyte? I don't know. Ideally one could configure a maximum per stream and a maximum per connection such that one can exhaust the bandwidth delay product on a single stream, or using multiple streams. (See QUIC and in particular quinn as a role model.)

What does go-libp2p do? They set a 16 Mbyte maximum receive window, but given their resource manager they have more fine grained control of how much memory an adversary can allocate.

https://github.com/libp2p/go-yamux/blob/e584a210c72651b67829477b4b4285cae25b1a1b/const.go#L115-L120

Refactor `stream::Shared` to use internal mutability

Currently, we hold a stream::Shared within the stream and one in Active. This is shared by being wrapped in Arc<Mutex>.

It would be slightly more ergonomic for both usage sites if the Arc<Mutex> would be inside stream::Shared.

Send WindowUpdate message early

Flow control in Yamux and HTTP/2

The Yamux flow control mechanism is very similar to HTTP/2's flow control. This is to no surprise, given that Yamux is inspired by the early SPDY efforts.

In both Yamux and HTTP/2 the WindowUpdate message is the integral part of the flow control mechanism.

To prevent the streams from stalling, window update frames should be sent regularly. Yamux can be configured to provide a larger limit for windows sizes.

https://github.com/hashicorp/yamux/blob/master/spec.md#flow-control

Flow control is based on WINDOW_UPDATE frames. Receivers advertise how many octets they are prepared to receive on a stream and for the entire connection. This is a credit-based scheme.

https://tools.ietf.org/html/rfc7540#section-5.2.1

In HTTP/2 it is up to the receiver when to send a WindowUpdate message. If I understand the short Yamux specification correctly, the same applies to Yamux.

HTTP/2 defines only the format and semantics of the WINDOW_UPDATE frame (Section 6.9). This document does not stipulate how a receiver decides when to send this frame or the value that it sends, nor does it specify how a sender chooses to send packets. Implementations are able to select any algorithm that suits their needs.

See https://tools.ietf.org/html/rfc7540#section-5.2.1

For a general overview of HTTP/2 flow control I can recommend "HTTP/2 in Action" [1]. Chapter 7.2 on the topic can be (pre-)viewed on the publishers website.

WindowUpdate message strategies

HTTP/2 implementations can use WindowUpdate messages to implement various (possibly advanced) flow control strategies. One example of a simple strategy is the nghttp2 library which sends a WindowUpdate message once it (receiver) has received and consumed more than half of the flow control window.

Today with WindowUpdateMode::OnRead this Yamux implementation sends a WindowUpdate message once (a) the window credit has been fully depleted and (b) the read buffer is empty, thus all bytes have been consumed. See implementation for details.

Comparison

Imagine the following simplified scenario:

A sender S is communicating with a receiver R. S wants to send 1 MB in multiple chunks to R. R uses a receive window of 256 KB (Yamux default). S and R are connected on some network inducing both a delay and a bandwidth constraint.

Algorithm 1 (Yamux today): Send WindowUpdate message once (a) window is fully depleted by sender and (b) receiver has consumed all send buffered bytes.

Once S has depleted its window, having sent 256 KB to R, S is blocked and has to wait for a WindowUpdate message to be send by R. This message is only send by R once 256 KB have been received and all of those are consumed. Thus every 256 KB the sender is blocked, having to wait for a whole round-trip (for all its data to arrive as well as for the WindowUpdate to be received).

Algorithm 2 (nghttp2): Sending WindowUpdate message once half or more of the window has been received and consumed.

While S sends the first 256 KB of data, R receives and consumes chunks of that data in parallel. Instead of waiting for the window to be fully depleted by S and fully consumed by R, R already sends a small WindowUpdate message (# bytes consumed of the current window) once half or more of the window has been depleted and consumed. This WindowUpdate will likely arrive at S before it depletes the entire window and thus S never stalls.

Summary

Long story short, to prevent senders from being blocked every time they have sent RECEIVE_WINDOW (256 KB) number of bytes, I am suggesting Yamux to adapt the same WindowUpdate strategy as nghttp2, namely to send a WindowUpdate message once half or more of the window has been received and consumed. An early benchmark with a patched Yamux using an adapted version of @twittner bench tool looks promising. I still have to do more testing on a high-latency network.

What do people think? Let me know if I am missing something. Happy for any pointers to similar discussions in the past.

[1] Pollard, Barry. HTTP/2 in Action. Manning, 2019.

Provide an example command-line app

We should provide an example command-line tool that demonstrates using yamux. The first one that comes to mind is a socket forwarder: it listens on a socket (AF_UNIX or TCP) on one side, and multiplexes connections over stdio to its peer, which connects to a socket passed as a command-line argument.

Race condition receiving a window update and closing a stream

The following scenario reveals a race condition in the stream closing logic.

Imagine sender S sending multiple messages to receiver R with the latter expecting exactly as many messages as the former sent.

  1. After sending multiple messages S calls poll_close on the Stream eventually returning Poll::Ready(Ok(())). At this point there are multiple pending messages in the channel between Stream and Connection. In addition, the last element in the channel between Stream and Connection is a StreamCommand::CloseStream.
  2. With poll_close returning Poll::Ready(Ok(())) the user of the stream drops its reference, thus the overall reference count on the inner Shared drops to 1, with Connection holding the last reference.
  3. Connection::garbage_collect is called. It iterates through all streams, finding the one mentioned above with a reference count of 1 and thus garbage collecting it, i.e. removing it from Connection.streams. Note: At this point there are still multiple messages pending in the channel between Stream and Connection as well as a final StreamCommand::CloseStream command.
  4. Instead of sending out the above mentioned pending items S receives a window update message from R for the stream that S just closed. Given that S just garbage collected the stream, S will send a stream reset to R. Note, the items in the channel between the garbage collected Stream and the Connection are still pending.
  5. R receives the stream reset and marks the connection as closed. In the AsyncRead implementation poll_read finds self.shared.buffer to be empty. It then checks whether it can expect to ever read more bytes from the stream, but given that it has been marked as closed, it will return Poll::Ready(Ok(0))). The user on the receiving side interprets this as an EOF while still expecting more data. Note, at this point the previously pending items might still be pending in the channel on S, or might be on their way to R.

I can deterministically reproduce this error when applying the following diff to df11758 and running cargo bench.

diff --git a/src/connection.rs b/src/connection.rs
index eb944f5..ad415ab 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -778,7 +780,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                 w.wake()
             }
         } else if !is_finish {
-            log::debug!("{}/{}: window update for unknown stream, ignoring", self.id, stream_id);
+            log::debug!("{}/{}: window update for unknown stream", self.id, stream_id);
+            let mut header = Header::data(stream_id, 0);
+            header.rst();
+            return Action::Reset(Frame::new(header))
         }

I need to put more thoughts into a fix. For now I am just documenting my observation.

What is the idiomatic way to create multiple streams?

Hi! I recently tried to upgrade yamux to the latest version and I want to provide a method to create a stream from a connection in my project

In yamux 0.10.2, I can use

pub struct YamuxCtrl<S> {
    ctrl: Control,
    _conn: PhantomData<S>,
}

impl<S> YamuxCtrl<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    pub fn new_client(stream: S) -> Self {
        let conn = Connection::new(stream.compat(), Config::default(), Mode::Client);

        let ctrl = conn.control();

        tokio::spawn(yamux::into_stream(conn).try_for_each_concurrent(None, |_stream| future::ready(Ok(()))));

        Self {
            ctrl,
            _conn: marker::PhantomData,
        }
    }
    async fn open_stream(&mut self) -> Result<ClientStream<Compat<yamux::Stream>>, Error> {
        let stream = self.ctrl.open_stream().await?;
        Ok(ClientStream::new(stream.compat()))
    }
}

I can use open_stream to obtain a stream, but how can I do this in yamux 0.13.1 since Control has been removed and we need call Connection::poll_next_inbound repeatedly in order to make progress

I found some examples in tests, it looks like:

        let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx))
            .await
            .unwrap();
        task::spawn(noop_server(stream::poll_fn(move |cx| {
            client.poll_next_inbound(cx)
        })));

        stream.write_all(&msg.0).await?;
        stream.close().await?;    }

but since the ownership of client has already been moved into task::spawn, how do I then obtain a stream from this connection afterward?

refactor: replace `Chunks` with `BytesMut`

The entire Chunks module looks similar to what Bytes and BytesMut do so we can probably throw that away and use the bytes crate instead.

Is likely more efficient although that is not my main concern. We just want to maintain as little code as possible.

Detect if stream has ended on server side

When writing to a stream (in my case through tokio Framed<Compat<yamux::Stream>, LengthDelimitedCodec>)
I expect the stream to EOF if the stream has gone away. This does not seem to be happening.

Is this expected behaviour? If so, how would you go about detecting that the stream has gone when streaming data (i.e. write only)?

Will the writing side have to read before it knows that a stream is closed?

Happy to contribute a PR, if needed.

Many thanks!

Simple client/server example blocks forever

It would be really nice to have an example of how to properly use this library (#61). I'm running into an issue where my code blocks forever, and I think I'm not quite understanding some invariant of the library, or maybe something in Rust itself. Here's what I'm trying to do

  1. Client connects to server
  2. Client sends hello message to server
  3. Server responds with hello to client
  4. Server initiates a stream with client
  5. Server sends hello message to client
  6. Client responds with hello message to server

The code seems to block when the client attempts to open_stream while the server is also blocked on next_stream. It seems like those two functions should unblock each other, but as I said, I must be missing something.

server: started listener
server: accepted connection
client: connected to server
client: opening stream on server
server: waiting for next stream
use anyhow::Result;
use async_compat::CompatExt;
use futures::{AsyncReadExt, AsyncWriteExt};
use tokio::{
    net::{TcpListener, TcpStream},
    task,
};
use yamux::{Config, Connection, Mode};

#[tokio::main]
async fn main() -> Result<()> {
    let handle1 = task::spawn(async { server().await.unwrap() });
    let handle2 = task::spawn(async { client().await.unwrap() });
    futures::future::join_all(vec![handle2, handle1]).await;
    Ok(())
}

async fn server() -> Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("server: started listener");
    let cfg = Config::default();
    loop {
        let (socket, _) = listener.accept().await?;
        println!("server: accepted connection");
        let mut conn = Connection::new(socket.compat(), cfg.clone(), Mode::Server);
        let mut ctrl = conn.control();

        // Wait for the first stream
        println!("server: waiting for next stream");
        let mut stream1 = conn.next_stream().await?.unwrap();
        println!("server: got next stream");

        // Read the client hello
        let mut msg = String::default();
        stream1.read_to_string(&mut msg).await?;
        println!("got message from client: {}", msg);

        // Respond with the server's hello
        stream1.write_all("hello from server".as_bytes()).await?;

        // Create a server-initiated stream in response
        let mut stream2 = ctrl.open_stream().await?;
        stream2
            .write_all("hello from server again".as_bytes())
            .await?;

        // Close streams and cleanup
        stream2.close().await?;
        stream1.close().await?;
        ctrl.close().await?;
    }
}

async fn client() -> Result<()> {
    let socket = TcpStream::connect("127.0.0.1:8080")
        .await
        .expect("connecting to server");
    println!("client: connected to server");

    let cfg = Config::default();
    let mut conn = Connection::new(socket.compat(), cfg, Mode::Client);
    let mut ctrl = conn.control();

    // Open the first stream and send a hello
    println!("client: opening stream on server");
    let mut stream1 = ctrl.open_stream().await?;
    stream1.write_all("hello from client".as_bytes()).await?;
    println!("client: sent hello to server");

    // Get a response
    let mut msg = String::default();
    stream1.read_to_string(&mut msg).await?;
    println!("got message from server: {}", msg);

    // Wait for the server's second stream and read the response
    let mut stream2 = conn
        .next_stream()
        .await?
        .expect("accepting server-initiated stream");
    let mut msg = String::default();
    stream2.read_to_string(&mut msg).await?;
    println!("got another message from server: {}", msg);

    Ok(())
}

`poll_new_outbound` without sending data results in no `inbound` stream

Hi, thanks for this crate!

I'm currently trying to write a higher-level wrapper around this library. As a starting point I wrapped the Connection into its own struct which also contains the receiving end of an mpsc::channel over commands. The corresponding spawn method selects on poll_new_inbound and the channel.
The idea was, a user can send a message over the command channel which results in both a new outbound channel being created, as well as a new inbound channel. The other party executes the same code and polls for an outbound and inbound channel.

After establishing the channels, I wanted to send a message from both parties on their outbound channel and receive it on the others inbound channel.
(Side Note: I know that an established Stream is bidirectional, but in the future, I want to start sending from both parties as soon as poll_new_outbound completes, without waiting for the ACK).

Unfortunately, I hit a problem and my test deadlocked. After quite some time, I found the underlying issue:
A stream returned by poll_new_outbound without sending data on this stream results in no inbound stream on the side of the socket.

The simplified flow looked roughly like this:

+---------------------------------+         +---------------------------------+
|                                 |         |                                 |
|                P0               |         |                P1               |
|                                 |         |                                 |
|                                 |         |                                 |
| outbound =  poll_new_outbound() |         | outbound =  poll_new_outbound() |
|                                 |         |                                 |
| inbound  =  poll_new_inbound()  |         | inbound  =  poll_new_inbound()  |
|                                 |         |                                 |
|                                 |         |                                 |
| outbound.send(0)                |         | outbound.send(0)                |
|                                 |         |                                 |
| msg = inbound.recv()            |         | msg = inbound.recv()            |
|                                 |         |                                 |
+---------------------------------+         +---------------------------------+

(Note: poll_new_inbound was continuously polled in a background task)

Because poll_new_inbound only resolves once the first message on the stream is sent, my code never got to the outbound.send() and deadlocked.

While I understand that it might makes sense to delay establishing the inbound stream, this should definitely be documented.
If the above is not wrong and the issue actually lies somewhere else, I could maybe send a documentation PR (but I'm not sure when I'll have the time for this).

add chunking in poll_write?

Since there is poll_flush for triggering a send syscall (which I consider to be expensive), how about buffering data in poll_write up to the chunking threshold?

As an example, the protocol Iโ€™m currently working on alternates between writing 4 bytes and some kB. While debugging some issues I saw that the 4 bytes were sent over the wire with a 12 byte header. In my send loop I deliberately call poll_write for as long as I have data, followed by poll_flush and then waiting for more inputs. With some chunking this would probably increase bandwidth by a large factor.

I know from testing that mplex requires flushing, but since yamux currently does not (which I donโ€™t think is documented) weโ€™d probably need to review all protocols for proper use of poll_flush after such a change.

Yamux resets stream when both peers are in Client mode

Hello everyone,

I'm opening this issue because I have an issue with yamux. I'm using yamux as a wider libp2p-rust project. I'm not having this issue with mplex in my libp2p-rust project. On a side note, I don't have this issue with raw TCP communication either with some PoC.

I have a scenario of three nodes. One node (A) is publicly reachable, two other nodes (B and C) are behind a NAT. To make nodes B and C communicate, I use the TCP Hole Punching NAT traversal solution I implemented myself.

B and C are connected to A and exchange information. At some point, C will need to connect to B. To do that, C sends a message to A to initiate the TCP hole punching. A tells B to start connecting to the public address of C. At the same time, A answers to C with the publicly visible IP of B.

Both nodes create a socket and then bind it to the same address used to contact node A. It then use this socket to connect. B and C start trying to connect to each other, creating a hole in the NAT.

Obviously, B and C try multiple times to connect before they successfully connect to each other. When it doesn't work, a simple Connection Refused message is received but we keep trying.

At some point, B and C are connected at the TCPย level. But none of B nor C did accept() a connection. I don't know if that's specific to rust or just how the kernel handle things but since both end try to reach the other, the socket won't be accepted, the system will just make it one TCP connection.

Then yamux enters the game. I guess because no socket is "seen" as the Listener, both are put in Mode::Client. Then, at some point yamux on one of the nodes will throw the error invalid stream id X and end the substream making it impossible to exchange data. The other node will see a connection ending.

I have a working POC of this issue here: https://github.com/BlackYoup/poc-tcp-hole-punching/blob/master/src/raw_yamux.rs . Pardon the code which may be a bit "raw", it took a few hours to set this up and I went the very quick way.

It might not work locally as I couldn't reproduce it. But behind NATed nodes it works as expected. If you don't have the possibility to test and want to try, I think Iย can manage to give you access to such setup. To run:

  • In src/raw_yamux.rs in the raw_yamux() function, change the SocketAddr with the IP / port of the publicly reachable node A.
  • To start node A: RUST_LOG=trace cargo run --release -- yamux SERVER
  • To start node B / C: RUST_LOG=trace cargo run --release -- yamux CLIENT

As for the logs, I'll only start the logs once both nodes start to connect to each other, that's when the log Init tcp hole punch is printed (don't mind the panic, I used unwrap() everywhere):

Node B:
Init tcp hole punch
 2020-08-04T19:14:10.211Z TRACE mio::poll                 > deregistering handle with poller
We are connected! Try to init yamux :) // We are connected at the TCP level
 2020-08-04T19:14:10.214Z TRACE mio::poll                 > registering with poller
 2020-08-04T19:14:10.214Z DEBUG yamux::connection         > new connection: 4e4b3a1c (Client)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Init)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.214Z TRACE yamux::connection         > 4e4b3a1c: creating new outbound stream
 2020-08-04T19:14:10.214Z TRACE yamux::connection         > 4e4b3a1c: sending initial (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.214Z DEBUG yamux::connection         > 4e4b3a1c: new outbound (Stream 4e4b3a1c/1) of (Connection 4e4b3a1c Client (streams 0))
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 12)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 4e4b3a1c: received: (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > is_valid_remote_id: id=StreamId(1), tag=WindowUpdate, mode=Client
 2020-08-04T19:14:10.215Z ERROR yamux::connection         > 4e4b3a1c: invalid stream id 1
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 4e4b3a1c: sending term
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Init)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (ReadState::Header 12)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 4e4b3a1c: read: (Header GoAway 0 (len 1) (flags 0))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 4e4b3a1c: received: (Header GoAway 0 (len 1) (flags 0))
 2020-08-04T19:14:10.215Z TRACE yamux::connection::stream > 4e4b3a1c/1: update state: (Open Closed Closed)
 2020-08-04T19:14:10.215Z TRACE mio::poll                 > deregistering handle with poller
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: WriteZero, error: "4e4b3a1c/1: connection is closed" })', src/raw_yamux.rs:156:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Node C:
Init tcp hole punch
 2020-08-04T19:14:10.213Z TRACE mio::poll                 > deregistering handle with poller
We are connected! Try to init yamux :)  // We are connected at the TCP level
 2020-08-04T19:14:10.214Z TRACE mio::poll                 > registering with poller
 2020-08-04T19:14:10.214Z DEBUG yamux::connection         > new connection: 7ffc697e (Client)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Init)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.214Z TRACE yamux::connection         > 7ffc697e: creating new outbound stream
 2020-08-04T19:14:10.214Z TRACE yamux::connection         > 7ffc697e: sending initial (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.214Z DEBUG yamux::connection         > 7ffc697e: new outbound (Stream 7ffc697e/1) of (Connection 7ffc697e Client (streams 0))
 2020-08-04T19:14:10.214Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 12)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 7ffc697e: received: (Header WindowUpdate 1 (len 262144) (flags 1))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > is_valid_remote_id: id=StreamId(1), tag=WindowUpdate, mode=Client
 2020-08-04T19:14:10.215Z ERROR yamux::connection         > 7ffc697e: invalid stream id 1
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 7ffc697e: sending term
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Init)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 0)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (ReadState::Header 12)
 2020-08-04T19:14:10.215Z TRACE yamux::frame::io          > 7ffc697e: read: (Header GoAway 0 (len 1) (flags 0))
 2020-08-04T19:14:10.215Z TRACE yamux::connection         > 7ffc697e: received: (Header GoAway 0 (len 1) (flags 0))
 2020-08-04T19:14:10.215Z TRACE yamux::connection::stream > 7ffc697e/1: update state: (Open Closed Closed)
 2020-08-04T19:14:10.215Z TRACE mio::poll                 > deregistering handle with poller
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: WriteZero, error: "7ffc697e/1: connection is closed" })', src/raw_yamux.rs:156:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

You might also note that before writing / reading, I make tokio::time::delay_for call to show that the stream gets stopped. Without it, some data goes through before the stream is ended which might not show the issue.

Is all this expected behaviour?

Let me know if you need any other details or if I can be of any help here.

Thank you!

Interpret the initial WindowUpdate additive.

Issue #90 revealed a mismatch between go-yamux and this crate w.r.t. the initial window update: This crate understands the initial window update as the total receive window size whereas go-yamux adds the window update to the implicit default of 256 KiB. The spec should probably understood to mean the latter and the differences need to be resolved anyway, so this issue outlines a way forward.

We will publish four versions of this crate which move from the current interpretation to the additive one:

  1. yamux-0.5.0 checks the currently unused flag 0x8000. If present it interprets an initially received window update as additive.
  2. yamux-0.6.0 behaves like yamux-0.5.0 but also sets the flag 0x8000 when sending the initial window update. It deprecates the lazy_open flag and determines automatically if an initial window update needs to be sent based on the receive_window setting (i.e. if it is greater than the default window an initial window update will be sent, otherwise not).
  3. yamux-0.7.0 always interprets the initially received window update as additive and does not check for flag 0x8000. It still sets the flag itself to be compatible with yamux-0.6.0.
  4. yamux-0.8.0 behaves like yamux-0.7.0 but no longer sets the 0x8000 flag. It removes the lazy_open config setting.

Every released version is compatible only with its immediate predecessor.

Thanks to @romanb for suggesting the temporary use of unused flags.

CC: @pawanjay176, @AgeManning, @tomaka

How to detect disconnection?

Dear all,

I initialize a connection between my local (client) to a remote server (server) using yamux.

Then I switch my computer's connection off, and my local client remain connected forever while yamux or TcpStream should have triggered a disconnection.

How could I solve that?
Shouldn't yamux be capable of detecting that?

It would be better to detect without sending any data, cf. tokio-rs/tokio#2228

Is there any problem with my code and why it doesn't work

use futures::{future, stream, AsyncReadExt, AsyncWriteExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;

#[tokio::main]
async fn main() {
    
    tokio::spawn(async move {
        let lis = TcpListener::bind("0.0.0.0:8000").await.unwrap();
        let (conn, _) = lis.accept().await.unwrap();
        let muxcfg = yamux::Config::default();
        let mut c = yamux::Connection::new(conn.compat(), muxcfg, yamux::Mode::Server);
        println!("111");
        let mut server = stream::poll_fn(move |cx| c.poll_next_inbound(cx));
        let mut stream = server.next().await.unwrap().unwrap();
        println!("xxxx");
        let mut buf = [0u8; 1024];
        let n = stream.read(&mut buf).await.unwrap();
        let s = String::from_utf8_lossy(&buf[..n]).to_string();
        println!("{}", s);
        stream.close().await.unwrap();
        println!("s2");
        let mut stream = server.next().await.unwrap().unwrap();
        let mut buf = [0u8; 1024];
        let n = stream.read(&mut buf).await.unwrap();
        let s = String::from_utf8_lossy(&buf[..n]).to_string();
        println!("{}", s);
        stream.close().await.unwrap();
        println!("999");
    });
    tokio::spawn(async move {
        let s = TcpStream::connect("127.0.0.1:8000").await.unwrap();
        let muxcfg = yamux::Config::default();
        let mut c = yamux::Connection::new(s.compat(), muxcfg, yamux::Mode::Client);
        let mut stream = future::poll_fn(|cx| c.poll_new_outbound(cx)).await.unwrap();
        stream
            .write_all("Hello World! 1\n".as_bytes())
            .await
            .unwrap();
        stream.flush().await.unwrap();
        stream.close().await.unwrap();
        println!("send 2");
        let mut stream = future::poll_fn(|cx| c.poll_new_outbound(cx)).await.unwrap();
        stream
            .write_all("Hello World! 2\n".as_bytes())
            .await
            .unwrap();
        stream.flush().await.unwrap();
        stream.close().await.unwrap();
        tokio::time::sleep(tokio::time::Duration::from_secs(100)).await;
    });
    tokio::time::sleep(tokio::time::Duration::from_secs(100)).await;
}

chore: remove `WindowUpdateMode::OnReceive`

I suggest we remove WindowUpdateMode::OnReceive in favor of always using (not just defaulting to) WindowUpdateMode::OnRead.

I can not think of a use-case where one would want OnReceive over OnRead. Anyone?

Removing OnReceive will simplify our codebase.

/// Specifies when window update frames are sent.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowUpdateMode {
/// Send window updates as soon as a [`Stream`]'s receive window drops to 0.
///
/// This ensures that the sender can resume sending more data as soon as possible
/// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates
/// data in its buffer which may reach its limit (see `set_max_buffer_size`).
/// In this mode, window updates merely prevent head of line blocking but do not
/// effectively exercise back pressure on senders.
OnReceive,

Re-write interface to buffer streams internally

Once #150 is solved, we more or less have back-pressure in yamux. See libp2p/specs#471 (comment) for an explanation.

To make sure applications are compliant with this, we should buffer the inbound streams internally. This allows us to align the buffer size with the ACK backlog.

The interface I'd like to see is:

impl Connection<S> {
    /// Allow the connection to make progress by sending pending frames and reading data from the socket.
    ///
    /// This will buffer up to 256 new inbound streams and immediately reset any additional ones.
	pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;

    /// Attempt to open a new outbound stream.
    ///
    /// This will suspend in case we already have 256 unacknowledged outbound streams.
	pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>>;

    /// Return the next inbound stream.
    ///
    /// This will return streams from an internal buffer that gets populated as part of [`Connection::poll`].
    /// In case, the buffer is empty, this will return [`Poll::Pending`].
	pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>>;
}

Waiting on new channel and channel read in the same task

I'm trying to use this yamux Rust version with the original one in Go and I have an issue trying to wait on a new stream (server side) and reading at the same time from the first stream created.

This is my code in Rust:

extern crate yamux;

use async_std::os::unix::net::{UnixListener, UnixStream};
use async_std::task;
use futures::prelude::*;

fn main() {
    task::block_on(async {
        let listener = UnixListener::bind("/tmp/ipc.sock").await.unwrap();
        let mut incoming = listener.incoming();

        while let Some(socket) = incoming.next().await {
            handle_new_client(socket.unwrap()).await;
        }
    });
}

async fn handle_new_client(socket: UnixStream) {
    let mut conn = yamux::Connection::new(socket, yamux::Config::default(), yamux::Mode::Server);
    
    if let Some(mut stream) = conn.next_stream().await.unwrap() {
        task::spawn(async move {
            let mut buf = vec![0; 512];
            stream.read_exact(&mut buf[0..1]).await.unwrap();
            println!("recv value: {}", buf[0]);
        });
    }
}

As it is, it works but I would like to:

  1. Wait for a first channel
  2. Exchange some information on this channel
  3. Open more channels following information exchanged (containing next channel configurations)

In Go, this is really easy to achieve by simply accepting a first stream, read/write from it and accept more streams afterwards, from the same Goroutine.

I wanted to achieve the same in Rust as follow:

async fn handle_new_client(socket: UnixStream) {
    let mut conn = yamux::Connection::new(socket, yamux::Config::default(), yamux::Mode::Server);
    
    if let Some(mut stream) = conn.next_stream().await.unwrap() {
        let mut buf = vec![0; 512];
        stream.read_exact(&mut buf[0..1]).await.unwrap();
        println!("recv value: {}", buf[0]);
    }
}

It works with the following client in Rust:

extern crate yamux;

use async_std::os::unix::net::UnixStream;
use async_std::task;
use futures::prelude::*;

fn main() {
    task::block_on(async {
        let stream = UnixStream::connect("/tmp/ipc.sock").await.unwrap();
        
        let conn = yamux::Connection::new(stream, yamux::Config::default(), yamux::Mode::Client);
        let mut ctrl = conn.control();
        task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} ));

        let mut channel = ctrl.open_stream().await.unwrap();

        let mut buf = vec![0; 512];
        buf[0] = 0xAA;
        channel.write(&mut buf[0..1]).await.unwrap();
        println!("sent data: {}", buf[0]);
    });
}

But doesn't work with the following client in Go (server gets a new stream but gets stuck on the read call):

package main

import (
	"fmt"
	"net"

	"github.com/hashicorp/yamux"
)

func main() {
	conn, err := net.Dial("unix", "/tmp/ipc.sock")
	if err != nil {
		panic(err)
	}

	session, err := yamux.Client(conn, yamux.DefaultConfig())
	if err != nil {
		panic(err)
	}

	stream, err := session.Open()
	if err != nil {
		panic(err)
	}

	// send the channels config
	buf := []byte{0xAA}
	_, err = stream.Write(buf)
	fmt.Println("sent data: {}", buf[0])
}

I assumed this library follows the same protocol as the Go version since it takes its name, am I correct ?

Also, my understanding of async is we should be able, on the server side, to wait on both new stream (conn.next_stream().await) and on a existing stream read call (stream.read_exact(..).await), is it correct ?
EDIT: My understanding is incorrect on this point but still it doesn't explain why the server receives a new stream but gets stuck on the read call.

GC Bug

Firstly, show one case

    let data = b"hello world";
    stream.write_all(data).await.expect("C write msg 1");
    stream.write_all(data).await.expect("C write msg 2");
    // msg3
    stream.write_all(data).await.expect("C write msg 3");

    // drop it immediately
    drop(stream);

Drop stream immediately after write message. As result, msg3 can't be received by reader.
After debug, We found that GC send RST frame before msg3.

// garbage_collect()

    if let Some(f) = frame {
        log::trace!("{}: sending: {}", self.id, f.header());
        self.socket.get_mut().send(&f).await.or(Err(ConnectionError::Closed))?
    }

GC send RST frame directly to reader, At the meantime, msg3 is still queueed in stream receiver.
If GC send RST frame to stream receiver, the bug will be fixed.

// garbage_collect()

    if let Some(f) = frame {
        log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header());
        // send to message process queue to process(/send) orderly
        let cmd = StreamCommand::SendFrame(f.left());
        if self.stream_sender.try_send(cmd).is_err() {
            continue;
        }
    }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.