Giter Site home page Giter Site logo

yamux's Introduction

Yamux

Yamux (Yet another Multiplexer) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or Unix domain sockets, and provides stream-oriented multiplexing. It is inspired by SPDY but is not interoperable with it.

Yamux features include:

  • Bi-directional streams
    • Streams can be opened by either client or server
    • Useful for NAT traversal
    • Server-side push support
  • Flow control
    • Avoid starvation
    • Back-pressure to prevent overwhelming a receiver
  • Keep Alives
    • Enables persistent connections over a load balancer
  • Efficient
    • Enables thousands of logical streams with low overhead

Documentation

For complete documentation, see the associated Godoc.

Specification

The full specification for Yamux is provided in the spec.md file. It can be used as a guide to implementors of interoperable libraries.

Usage

Using Yamux is remarkably simple:

func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

    // Setup client side of yamux
    session, err := yamux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of yamux
    session, err := yamux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.Accept()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}

yamux's People

Contributors

armon avatar clems4ever avatar codelingobot avatar dadgar avatar erikdubbelboer avatar evanphx avatar fatedier avatar fatih avatar ffmmm avatar filipochnik avatar freddygv avatar grubernaut avatar hashicorp-copywrite[bot] avatar jacobvosmaer avatar jefferai avatar mafredri avatar mitchellh avatar mkeeler avatar pmalhaire avatar preetapan avatar r0l1 avatar rnapier avatar slackpad avatar stuartcarnie avatar tarndt avatar testwill avatar whyrusleeping avatar wweir avatar xtaci avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

yamux's Issues

Licensing for language port

In relation to discussion with @mitchellh in hashicorp/go-plugin#27, I'm investigating the possibility of writing a NodeJS implementation of the protocol described in spec.md.

I'm not a licensing expert, but I get the basics. It's my understanding that the MPL applies to the implementation code only, and not necessarily the protocol. However, I'm also aware ports where the developer has access to the source code are in a little bit of a fuzzy area in regards to licensing.

I'd like to publish my port under the MIT License, since this is somewhat customary for the NPM package community, but will maintain the Mozilla Public License for this module if it is the desire of this module's author.

I will provide attribution for the protocol design no matter what license I publish the code under.

Can I publish my port under the MIT License?

Yamux should not log errors

Having Yamux log errors means that software that uses this library have no real control over what messages are logged where. Since Yamux is already returning named errors, which basically contain the message being logged, it would be much nicer if that error could be handled at a higher level and logged there.

When MaxStreamWindowSize is larger than initialStreamWindow, should lead to short read

OpenStream call sendWindowUpdate, then sendWindowUpdate will sync window from client to server.
incrSendWindow and wirte will happen race. if incrSendWindow first run it is ok, else will lead sendWindow is not the same value to recvWindow, will out of sync.

// session_test.go
func TestSendData_Large(t *testing.T) {
    cfg := testConf()
    cfg.MaxStreamWindowSize = 4 * 1024 
    // ...
}
// const.go
const (
    // initialStreamWindow is the initial stream window size
    initialStreamWindow uint32 = 1 * 1024
)
=== RUN   TestSendData_Large
--- FAIL: TestSendData_Large (5.00s)
        session_test.go:415: short read: 1024
        session_test.go:441: err: stream reset
panic: timeout [recovered]
        panic: timeout

Option for last stream close to close the session

I use yamux just for control flow, one stream per connection, and rely on net.Conn interface to communicate.

net.Conn in this case is a yamux.Stream

Any code that relies on net.Conn.Close() to do the right thing, can't really rely on that, as the session and hence the underlying connection is not closed, so things like closing and reopening a connection do not work.

Usually I end up wrapping yamux.Stream with something that closes the session as you close the stream, but I'd be nice if there was an option in the config to deal with that.

http.Serve(Session, nil) hangs on abortPendingRead when using Hijack()

This minimal example hangs on go 1.10 when I add log.Printf to hijackLocked() (net/http/server.go:300). It hangs on abortPendingRead() (net/http/server.go:692) when I can get it to hang. What are some possible issues with hijacking?

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/hashicorp/yamux"
	"net"
	"net/http"
	"time"
)

var endpoint = flag.String("ep", "127.0.0.1:8000", "endpoint")

func client() {
	// Get a TCP connection
	conn, err := net.Dial("tcp", *endpoint)
	if err != nil {
		panic(err)
	}

	// Setup client side of yamux
	session, err := yamux.Client(conn, nil)
	if err != nil {
		panic(err)
	}
	http.Serve(session, nil)

}

func server() {
	// Accept a TCP connection
	listener, err := net.Listen("tcp", *endpoint)
	if err != nil {
		panic(err)
	}
	conn, err := listener.Accept()
	if err != nil {
		panic(err)
	}

	// Setup server side of yamux
	session, err := yamux.Server(conn, nil)
	if err != nil {
		panic(err)
	}
	tr := &http.Transport{
		DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
			return session.Open()
		},
	}
	client := &http.Client{Transport: tr}
	resp, err := client.Get("http://127.0.0.1:8000/works")
	if err != nil {
		panic(err)
	}
	fmt.Println(resp)
	fmt.Println("hanging in abortPendingRead()...")
	resp, err = client.Get("http://127.0.0.1:8000/bug")
	if err != nil {
		fmt.Println("we should not see this because it should hang in abortPendingRead()")
		fmt.Println(err)
	}
	fmt.Println(resp)

}

func main() {
	flag.Parse()
	http.HandleFunc("/works", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "this works")
	})
	http.HandleFunc("/bug", func(w http.ResponseWriter, r *http.Request) {
		h, ok := w.(http.Hijacker)
		if !ok {
			fmt.Printf("does not support hijack")
		}
		c, _, err := h.Hijack()
		if err != nil {
			panic(err)
		}
		fmt.Fprintf(c, "this works")
		c.Close()
	})

	go server()
	time.Sleep(90 * time.Millisecond)
	client()
}

Wrong doc of func (*Session) Accept

On the page of yamux, the words of function Accept and AcceptSteam are as follows:

func (s *Session) Accept() (net.Conn, error)
Accept is used to block until the next available stream is ready to be accepted.

func (s *Session) AcceptStream() (*Stream, error)
AcceptStream is used to block until the next available stream is ready to be accepted.

Both are the same. Maybe the right illustration of function Accept is:
Accept is used to block until the next available connection is ready to be accepted.

Is Session.Accept Result Really Compatible With net.Conn?

Hi
A Session instance returns Stream one from Accept call as a net.Conn here

Yet net.Conn among the rest defines

	// LocalAddr returns the local network address, if known.
	LocalAddr() Addr

	// RemoteAddr returns the remote network address, if known.
	RemoteAddr() Addr

which Stream seems like doesn't implement.

So I'm not really sure how this is compiled at all (while it indeed is)?

But my main concern here how can the accepted connection be passed to other consumers which are really supposed to have a pure net.Conn?

Should I wrap with some stubs the connections myself? Or are there any intended solution for that?

Thx.

Named connections(routing)

Hi, I've started Bon to be able to Open and Accept connections by their route names. With named connections, you can start and handle them like you do with http handlers. This way it is easier to group your connections by their use purposes.
So, please check out if you feel like you need them too!

Need some documentation on how to use NAT traversal feature

I am trying to build a small client server, where the server is reachable via client, however client is not reachable via server. We need both ways duplex streams where either side can initiate a connection and send messages. Looking at documentation it seems YAMUX will fit this use case, however I dont see how this could be achieved. Any guidance will be helpful.

Missing commit in history

This is in relation to hashicorp/nomad#4455

I had a look at this today while I was on the hunt for dependency issues in our products.

Turns out the commit in question does exist and govendor fetch github.com/hashicorp/yamux@2658be15c5f05e76244154714161f17e3e77de2e works fine: 2658be1

It came from this PR: (#53) which was merged and has a parent of:
image

However, the trees for master:
image

vs the tree for: https://github.com/hashicorp/yamux/commits/2658be15c5f05e76244154714161f17e3e77de2e

image

is different. Not sure what happened here but 683f491 and 3fc4056 are swapped in the two trees so all the hashes of the commits before them differ.

Hope this helps but I don't know the code base well enough to do surgery on it so I will leave it to the pros :). Looks like it might just be a rebase error.

cc: @preetapan @chelseakomlo @mkeeler

yamux: Invalid protocol version: 123

How do I fix this issue? Regular io.copy with net.Listen and net.Dial works but when trying net.Listen and session.Open(stream), I get the error yamux: Invalid protocol version: 123

io.Copy(conn, remote) go io.Copy(remote,conn)

Works

io.Copy(conn, stream) go io.Copy(stream ,conn)

yamux: Invalid protocol version: 123

Protocol Version Error

I don't think I fully understand something. I telnet to code below but I get the error [ERR] yamux: Invalid protocol version: 102. What exactly is wrong?

Here is my code


import (
	"net"
	"github.com/hashicorp/yamux"
)

func main() {
	server()
}

func server() {

	listener, _ := net.Listen("tcp", "0.0.0.0:1234")
	for {

		// Accept a TCP connection
		conn, err := listener.Accept()
		if err != nil {
			panic(err)
		}

		// Setup server side of smux
		session, err := yamux.Server(conn, nil)
		if err != nil {
			panic(err)
		}

		// Accept a stream
		stream, err := session.Accept()
		if err != nil {
			panic(err)
		}

		// Listen for a message
		buf := make([]byte, 4)
		stream.Read(buf)
		stream.Close()
		session.Close()
	}
}

Send timeout might corrupt outgoing messages

Hi everyone,

I just reviewed the library a little bit since I'm also interested in stream multiplexing (actually want something for websockets instead of plain TCP, but I guess yamux would work there too).
Overall it looks really well written and I like the fact that you have a spec :-)

One thing that I stumbled upon during review was there might be situations where the send timeout for the session might corrupt outgoing data: If we look at the waitForSendErr function it will start the timout and try to send the data in parallel be handing it over to the send channel.

The select/timeout logic looks like it introduces a data race to me: If the timeout is expiring here the send will still be in progress. That means the data (header and body) will still be in use in the send routine. However the waitForSendErr call returns with an the calling stream could think the data is no longer required and reuse it. It might probably not do it for sendHdr as it sees that writes fail and no further writes and sendHdr mutations follow, but the payload data slice might be reused by the application and mutated - while the send is still in progress. As an end result you could send data to the wire that was not desired.

Fixing this is probably not that easy. I guess that there probably could be some cancellation or sendInProgress flags in the enqueued sendReady struct. If the message was not yet fetched by the writing proc it can be marked as cancelled and will there no longer be processed. However as soon as the writer goroutine catches it the safest thing is to let it write to completion as there is no kind of CancellationToken that can be passed to to the Write or Copy methods.

Thread-safety properties

Is this library thread-safe? If I Accept() multiple streams from a single session, can I access different streams from different goroutines concurrently?

KeepAlive is not working

Even if Config -> EnableKeepAlive and Config -> KeepAliveInterval are set and the connectivity is lost (i.e. internet goes down), Session.Accept() returns no error.
I'm wondering if it's by design or if it is a bug. If Session.Accept doesn't return any error you can't implement fallback mechanism (e.g. redial the connection), isn't it?

stream recvWindow decremented incorretly

at stream.readData():

       if _, err := io.Copy(s.recvBuf, conn); err != nil {
		s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
		s.recvLock.Unlock()
		return err
	}

	// Decrement the receive window
	s.recvWindow -= length <-- this should be number of read bytes.
	s.recvLock.Unlock()

when there is connection breaks io.Copy() will return partial reads.

Data lost when closing Session

Hi all, I ran into an issue, and I wanted to get your opinion on what the correct behavior should be.

I have a yamux client that writes data to a Stream, and then closes the Stream, and also the Session.
The problem is that some times, all the data arrives at the other end, and other times, some data remains in the send queue when the underlying connection actually closes.

First, this causes this warning message:
[ERR] yamux: Failed to write header: tls: use of closed connection

Second, the data does not arrive at the other end.

This commit contains a unit test that shows what happens; the failure happens only sometimes because it is a race condition: yipal@a5eed1e

I noticed that the existing code finishes sending the data when the client calls Stream.Close(), so that makes me think
the correct behavior is for Session.Close() to also finish draining the send queue before actually closing the underlying connection.

Authentication

I'm wondering Authentication is something yamux should/can support and if so how it should work like. I'm willing to submit a PR if guidance is provided.

Weird question

window server return ping ack : [0 2 0 2 0 0 0 0 0 0 0 1]

but client recive :[0 2 0 2 0 0 0 0 0 0 0 0]

id not correct ?

`go test ./...` is failed

go version : go version go1.11.1 linux/amd64
go test ./... output:

2018/10/29 06:57:50 [ERR] yamux: keepalive failed: i/o deadline reached
2018/10/29 06:57:50 [ERR] yamux: keepalive failed: i/o deadline reached
2018/10/29 06:57:50 [WARN] yamux: failed to send ping reply: session shutdown
--- FAIL: TestSendData_Large (0.58s)
    session_test.go:396: err: EOF
    session_test.go:427: err: session shutdown
2018/10/29 06:58:06 [WARN] yamux: failed to send ping reply: connection write timeout
FAIL
FAIL	github.com/hashicorp/yamux	17.765s

Stream receivebuffer continually grows

The library currently uses bytes.Buffer for buffering data that was received from the network connection until it was fetched by the user through a Read() call.
The problem with that is that all writes from the network will append to the end of the buffer. Reads will read from the current position and shift the read pointer to the end of the buffer, but the already read data will still stay at the beginning of the buffer - until the stream is closed.

This means e.g. for transferring a 100MB file through yamux the receivebuffer for the stream will grow to 100MB.

This could be fixed by utilizing a ring buffer, which is big enough to hold the maximum window size. Or a buffer of window size where each read copies remaining data to the front, resets the read pointer to 0 and resets the writer pointer according to it.

Cant send message only connected

I try yamux
here's my yamux client

	package main

import (
	"log"
	"net"
	"strconv"
	"time"

	"github.com/hashicorp/yamux"
)

func main() {
	addr := "127.0.0.1:3333"
	conn, _ := net.Dial("tcp", addr)
	// Setup client side of yamux
	session, err := yamux.Client(conn, nil)
	if err != nil {
		panic(err)
	}

	for i := 1; i <= 100; i++ {
		go testC(session, strconv.Itoa(i))
	}

	time.Sleep(60 * time.Second)

	conn.Close()

}

func testC(session *yamux.Session, st string) {
	// Open a new stream
	stream, err := session.Open()
	if err != nil {
		panic(err)
	}

	// stream.SetDeadline(time.Now().Add(3))

	_, err = stream.Write([]byte(st))
	if err != nil {
		panic(err)
	}
	reply := make([]byte, 7000)
	_, err = stream.Read(reply)
	if err != nil {
		panic(err)
	}

	log.Println("STRING " + st + ":" + string(reply))
}

and here's my server

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
)

func handleConnection(conn net.Conn) {
	fmt.Println("Handling new connection...")

	// Close connection when this function ends
	defer func() {
		fmt.Println("Closing connection...")
		conn.Close()
	}()

	// timeoutDuration := 5 * time.Second
	bufReader := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

	for {
		// Set a deadline for reading. Read operation will fail if no data
		// is received after deadline.
		// conn.SetWriteDeadline(time.Now().Add(timeoutDuration))

		// Read tokens delimited by newline
		bytes, err := bufReader.ReadString('\n')
		if err != nil {
			fmt.Println(err)
			return
		}
		_, err = bufReader.WriteString(bytes)
		if err != nil {
			log.Println(err)
			return
		}
		err = bufReader.Flush()
		if err != nil {
			log.Println("Flush failed.", err)
		}
		if string(bytes) == "exit" {
			log.Println("should be exit")
			conn.Close()
		}

		fmt.Printf("%s", bytes)
	}
}

func main() {
	// Start listening to port 3333 for TCP connection
	listener, err := net.Listen("tcp", ":3333")
	if err != nil {
		fmt.Println(err)
		return
	}

	defer func() {
		listener.Close()
		fmt.Println("Listener closed")
	}()

	for {
		// Get net.TCPConn object
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println(err)
			break
		}

		go handleConnection(conn)
	}
}

but why it can't send message to server?
is both server and client should implement yamux?
or any issue on my code? Thanks

YaMUX accept rate slower than I need

I have been testing yamux and it does just what I need when looking at 5 to around 100 streams, however the application use case will required 10k or more streams per connection when I simulate this by connecting a client that just openstreams and starts sending as fast as it can on each stream (with an accept backlog of 5k) the accept rate for yamux is painfully slow taking around 3m to accept 10k streams. From what I have profiled it seems the locking interplay between data and accept is painful. Any plans to refactor or perhaps avoid some or all of the locking? I watched your video on the plugin arch and the pain you went through to build yamux so I don't want to have to re-implement myself ;)

Any pointers would be helpful

Thx
Leif

long hang in stream write

I finally got around to updating ipfs to use the fixes from #29 and now i'm seeing hangs on the select in stream.go:199. Its reproducible on the live network, but it takes a little while to happen.

cc @slackpad

How can I change the "initialStreamWindow" gracefully?

The initialStreamWindow const in yamux is 256 * 1024.
In some cases, the initial 256K window is too small and you have to wait for the window update.
This will result in a longer wait for the buffering.
So how can I enlarge the initialStreamWindow gracefully other than modifying yamux code.

Question about the deadline options...

Hi @armon, @mitchellh, I was looking at this package in a little more detail and was wondering what the reasoning behind the deadline implementation is? I noticed that when setting a deadline it expects a certain point in time instead of a duration and I fail to understand why it is implemented this way?

Looking at this part of the code for example, I see that readDeadline is checked to see if it has been set and if so the delay is calculated by subtracting the current time from the configured deadline time.

So when I open a new stream and then call stream.SetDeadline(time.Now().Add(30 * time.Second)) the readDeadline would be set to 30 seconds in the future. So if my program starts sending and receiving data within 30 seconds from the time I opened the stream, all should be good right?

But say I communicate using the newly opened stream for about 60 seconds after which some local processing will be done before sending another request over the stream. It will then get to that same code part again, but this time it will subtract the current time from a time in the past making the duration negative and so time.After(delay) will fire immediately giving me a timeout error.

So is it expected and designed to behave this way? Or do I misread/misunderstand how this is meant to be used? I would think that changing the deadlines to durations it could function as a generic timeout which works not only when the stream is opened and is waiting for it's first communication, but also in between communications. Additionally it could be used to make sure streams are cleaned up after not being used for the configured duration (when parallelism or the program layout make it hard to determine when to call Close() on the stream safely).

Thanks,

Sander

"There must be at most one ...-side connection"

Client() and Server() indicate "There must be at most one ...-side connection." Is this just per-conn, or does it mean something different? I don't see anything that would prevent there being multiple sessions on different conns, but hate to miss something basic and critical.

The server should handle GoAway frames

Currently GoAway is handled only on Session.Open. However there is also the case when the client sends a GoAway request. In that case the server should handle it on Accept(). A common use case is when the the client acts as a "server" (also known as back connection/ reverse connection)

Should we close the session when streams receive window exceeded ?

After read about the stream receive window code, I found when a stream's receive window exceeded, the error will cause the session's recvLoop() return and then the session shutdown.

If above is correct, when stream is individual, why a stream's receive window exceeded(usually caused by stream's consumer been stuck) make the session shutdown and then stop all other streams? Can we only close the problem stream and return the error message to it's consumer ?

Config should take a Logger interface

At present, yamux.Config.Logger is a pointer to a concrete struct. This makes it impossible to use popular libraries such as https://github.com/sirupsen/logrus.

Fortunately, most logging libraries implicitly satisfy the following interface:

type Logger interface {
    Fatal(v ...interface{})
    Fatalf(format string, v ...interface{})
    Fatalln(v ...interface{})
    Panic(v ...interface{})
    Panicf(format string, v ...interface{})
    Panicln(v ...interface{})
    Prefix() string
    Print(v ...interface{})
    Printf(format string, v ...interface{})
    Println(v ...interface{})
}

Config should take an interface like the one above in lieu of *log.Logger.

What is Disconnect Flow

Please, what is intended graceful disconnection flow?

  • Should a client it-self initiate a disconnection or should it be controlled on higher level and both server and client should be closed?
  • Are client and server enough to be closed or subsequent streams should be closed first
  • Same who should be responsible for the streams. to be closed (client/server/controlled both)

Trying to close just a client side leads to EOFs and keepalive timeouts. So is there design supposed scenario or I've to manage it on my app explicitly?

Thx

OpenStream hanging on synCh

over on the ipfs project we updated the version of yamux we were using to the latest master from 9feabe6 about a month ago. Since then we started noticing random hanging issues, which i got around to debugging today and it looks like the culprit is the synCh channel on the session. Commits that don't contain that logic work just fine, but every commit i've tried with it has hung.

It looks to be an issue with not replacing tokens in the semaphore in all needed cases, maybe a call to Close isnt actually releasing a stream the way it should?

Increasing the AcceptBacklog in the config 'fixes' the problem, but that just seems like its just buying me more time until it hangs again.

at stream read/write wrap underlying error to returned error

I need to recover from errors when connection is broken.
So there should be some way to check why the session was closed at client side.
e.g

[ERR] yamux: Failed to read header: read tcp 10.0.1.10:55190->10.0.3.20:443: read: connection timed out

now read() and write() just return io.EOF

What about the performance of yamux? It seems yamux speed down a lot of bandwidth.

I write a simple tcp tunnel with yamux, and test the speed:

  • Direct link (client --> server): 64mbps ~ 75mbps
  • yamux (default config) as a tunnel with 1 stream (client --> yamux client --stream--> yamux server --> server) : 24mbps ~ 28mbps

From the rough test above, we may conclude that yamux speed down a lot of bandwidth. Is there any methods that can improve throughtput yamux?

potential race condition in writing while connection drops

I've been having some issues with yamux writes hanging lately. See goroutine 14008165 in: https://ipfs.io/ipfs/QmQn8YDTHWJzF7GVKUKanBzoxbzd5jQpwsgM2PfFVTKuGR

I think its also correlated with the internet connection of the machine its running on dropping during a write. Heres what i suspect is happening:

  • A write on a stream goes through, and consumes the entire sendWindow (resulting in s.sendWindow being not zero)
  • Another write comes in, and is delegated to the WAIT code (where we see the write being stuck in the linked stack trace)
  • At this point, the connection drops, resulting in us not receiving the window update
  • Now, since the writer is waiting on either a timeout (which by default is not set) or a notification is received on the recvNotifyCh, we have to hope we receive that notification.

I'm not sure if the above is exactly whats happening, but i'm quite confident that if we somehow ended up in the write wait loop after the stream has been closed, its possible that the sendNotifyCh signal got missed and we will block forever. To address that possibility, I think that we should close the notify channels when the streams get closed, so that they are always ready to receive on.

cc @slackpad

http.Serve(session, nil) hangs in abortPendingRead() when hijack is used

This may be a misuse of yamux instead of a yamux issue, but I just wanted to let you know that the following code hangs for me in hijack() (really abortPendingRead() in go/src/net/http/server.go:3341) on go version devel +3067376 Mon Feb 26 22:10:51 2018 +0000 linux/amd64.

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/hashicorp/yamux"
	"net"
	"net/http"
	"time"
)

var endpoint = flag.String("ep", "127.0.0.1:8000", "endpoint")

func client() {
	// Get a TCP connection
	conn, err := net.Dial("tcp", *endpoint)
	if err != nil {
		panic(err)
	}

	// Setup client side of yamux
	session, err := yamux.Client(conn, nil)
	if err != nil {
		panic(err)
	}
	http.Serve(session, nil)

}

func server() {
	// Accept a TCP connection
	listener, err := net.Listen("tcp", *endpoint)
	if err != nil {
		panic(err)
	}
	conn, err := listener.Accept()
	if err != nil {
		panic(err)
	}

	// Setup server side of yamux
	session, err := yamux.Server(conn, nil)
	if err != nil {
		panic(err)
	}
	tr := &http.Transport{
		DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
			return session.Open()
		},
	}
	client := &http.Client{Transport: tr}
	resp, err := client.Get("http://127.0.0.1:8000/works")
	if err != nil {
		panic(err)
	}
	fmt.Println(resp)
	fmt.Println("hanging in abortPendingRead()...")
	resp, err = client.Get("http://127.0.0.1:8000/bug")
	if err != nil {
		fmt.Println("we should not see this because it should hang in abortPendingRead()")
		fmt.Println(err)
	}
	fmt.Println(resp)

}

func main() {
	flag.Parse()
	http.HandleFunc("/works", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "this works")
	})
	http.HandleFunc("/bug", func(w http.ResponseWriter, r *http.Request) {
		h, ok := w.(http.Hijacker)
		if !ok {
			fmt.Printf("does not support hijack")
		}
		c, _, err := h.Hijack()
		if err != nil {
			panic(err)
		}
		fmt.Fprintf(c, "this works")
		c.Close()
	})

	go server()
	time.Sleep(90 * time.Millisecond)
	client()
}

Using Session with http.Server can result in an infinite error loop in Accept

Consider a situation when we have a client-server connection and a keepalive on the client side fails. The underlying connection gets closed without sending GoAway message.

Now, on the server side, there's a race who notices the connection got closed first and calls (*Session).exitErr(). If (*Session).recv wins (I think it also applies to (*Session).send) when (*Session).recvLoop is in the io.ReadFull call, the (*Session).exitErr() is called with "connection reset by peer" net.Error. That error is written to Session.shutdownErr and will be returned by (*Session).Accept on all subsequent calls.

And here's the problem. If that server Session is used as a net.Listener with http.Server it will cause an infinite error loop. That's because "connection reset by peer" is a temporary error and those are retried indefinitely, see here.

Ideally, all yamux errors should implement net.Error and set Temporary() and Timeout() appropriately, but that would be a breaking API change.
The backward compatible solution is to never return errors that implement net.Error because the context where they originally happen and the context where they are returned by yamux differ and lead to erroneous situations like this one.
In practical terms, it simply means wrapping errors passed to (*Session).exitErr() if they are not yamux errors.

I can submit a PR for this but I would first like to know if that's something you would accept.

[bug] TestGoAway failed

=== RUN   TestGoAway
    session_test.go:571: err: <nil>
2023/01/15 18:56:28 [WARN] yamux: frame for missing stream: Vsn:0 Type:1 Flags:1 StreamID:1 Length:0
--- FAIL: TestGoAway (0.00s)
FAIL
FAIL    github.com/hashicorp/yamux      0.005s
FAIL

"context canceled" error when using with http.ReverseProxy

Hi, I'm having this weird problem when using yamux with http.ReverseProxy. The thing I want to achieve is:

  1. A server listening on :8888 for client to connect
  2. When a client is connected, the connection is handed over to yamux, and
    • the client becomes the real server, which is just a simple proxy to example.org
    • the server starts listening on :8080, which would proxy to the client

The weird part is that the first curl localhost:8080 would success (with 404 from example.org), but then all the following curl localhost:8080 would return 502 bad gateway, with the client prints out http: proxy error: context canceled with each request sent.

Here's some clue:

  1. The error message is from the http roundtripper, where it would error out if ctx is done
  2. If I avoid yamux, and do server -> client -> example.org proxy directly with http.ReverseProxy, the problem would just go away
  3. If I set DisableKeepAlives: true to server's transport, the problem would go away too because each request would Dial a new connection then

Below is the detailed code and steps to reproduce the problem:

server code:

package main

import (
	"log"
	"net"
	"net/http"
	"net/http/httputil"
	"net/url"
	"time"

	"github.com/hashicorp/yamux"
)

func main() {
	lis, err := net.Listen("tcp", ":8888")
	if err != nil {
		panic(err)
	}

	conn, err := lis.Accept()
	if err != nil {
		panic(err)
	}

	sess, err := yamux.Server(conn, nil)
	if err != nil {
		panic(err)
	}

	go func() {
		u, err := url.Parse("http://whatever")
		if err != nil {
			panic(err)
		}

		rp := httputil.NewSingleHostReverseProxy(u)
		rp.Transport = &http.Transport{
			Dial: func(network string, addr string) (net.Conn, error) {
				return sess.Open()
			},
		}

		log.Println("serving at :8080")
		if err := http.ListenAndServe(":8080", rp); err != nil {
			panic(err)
		}
	}()

	for {
		time.Sleep(time.Second)
	}
}

client code:

package main

import (
	"log"
	"net"
	"net/http"
	"net/http/httputil"
	"net/url"

	"github.com/hashicorp/yamux"
)

func main() {
	conn, err := net.Dial("tcp", "localhost:8888")
	if err != nil {
		panic(err)
	}

	sess, err := yamux.Client(conn, nil)
	if err != nil {
		panic(err)
	}

	u, err := url.Parse("http://example.org")
	if err != nil {
		panic(err)
	}

	log.Println("serving")
	if err := http.Serve(sess, httputil.NewSingleHostReverseProxy(u)); err != nil {
		panic(err)
	}
}

And do below in terminal to see the problem:

curl localhost:8080 // success
curl localhost:8080 // 502 bad gateway

How to deal with ErrRecvWindowExceeded

I am working on a port-forwarding service to replace an existing sshd based service, as that one encrypts and decrypts data which is not needed in my case. I started working with yamux, and it looks like I have it basically working. My test scenario so far is putting a forwarded HTTP server under siege, and it fails after a couple of transfers. The number itself varies, usually after 20 to 100 MB yamux returns ErrRecvWindowExceeded. I looked into this, and in that case atomic.LoadUint32(&s.recvWindow) is always 0.

If I just disable the test below for ErrRecvWindowExceeded, everything works perfect. What could I do wrong here?

if length > atomic.LoadUint32(&s.recvWindow) {
    s.session.logger.Printf("[ERR] yamux: receive window exceeded")
    return ErrRecvWindowExceeded
}

Limited of numbers of streams?

If I read and understand this code correctly, it seems to me that it imposes a hard limit here.

So what are the options if you would like to use Yamux in a long running process that could use more than math.MaxUint32-1 streams? Most likely not all at the same time of course, but as the preferred way to manage stream ID's seem to be to increment the ID with 1 you will hit the limit sooner or later.

So could this logic be updated/changed without breaking existing stuff? And/or are there other ways to (safely) be able to reuse old ID's of streams that aren't used anymore?

TestPing failed on Windows

run TestPing on Linux

$ go test . -run '^TestPing$' -v 
=== RUN   TestPing
--- PASS: TestPing (0.00s)
PASS
ok      github.com/hashicorp/yamux      0.002s

run TestPing on Windows

PS C:\Users\jinmiaoluo\repo\yamux> go test . -run '^TestPing$' -v 
=== RUN   TestPing
    session_test.go:98: bad: 0s
--- FAIL: TestPing (0.00s)
FAIL
FAIL    github.com/hashicorp/yamux      0.178s
FAIL
PS C:\Users\jinmiaoluo\repo\yamux>

debug on Linux

$ dlv test .
Type 'help' for list of commands.
(dlv) b ./session_test.go:94
Breakpoint 1 set at 0x56eb44 for github.com/hashicorp/yamux.TestPing() ./session_test.go:94
(dlv) c
> github.com/hashicorp/yamux.TestPing() ./session_test.go:94 (hits goroutine(8):1 total:1) (PC: 0x56eb44)
    89:         client, server := testClientServer()
    90:         defer client.Close()
    91:         defer server.Close()
    92: 
    93:         rtt, err := client.Ping()
=>  94:         if err != nil {
    95:                 t.Fatalf("err: %v", err)
    96:         }
    97:         if rtt == 0 {
    98:                 t.Fatalf("bad: %v", rtt)
    99:         }
(dlv) p rtt
196033
(dlv) 

debug on Windows

(dlv) b c:/users/jinmiaoluo/repo/yamux/session_test.go:94
Breakpoint 1 set at 0x5b8544 for github.com/hashicorp/yamux.TestPing() c:/users/jinmiaoluo/repo/yamux/session_test.go:94
(dlv) c
> github.com/hashicorp/yamux.TestPing() c:/users/jinmiaoluo/repo/yamux/session_test.go:94 (hits goroutine(8):1 total:1) (PC: 0x5b8544)
    89:         client, server := testClientServer()
    90:         defer client.Close()
    91:         defer server.Close()
    92:
    93:         rtt, err := client.Ping()
=>  94:         if err != nil {
    95:                 t.Fatalf("err: %v", err)
    96:         }
    97:         if rtt == 0 {
    98:                 t.Fatalf("bad: %v", rtt)
    99:         }
(dlv) p rtt
0
(dlv)

Panic when sending large data

I'm working on a reverse proxy server via websocket and this great library is used as a multiplexer over single ws connection. It works fine when proxying small http request but will panic if body is large.

panic: runtime error: index out of range [29934] with length 7680

goroutine 13 [running]:
bufio.(*Reader).Read(0xc000443920, {0xc0005fa000, 0x1e00, 0xc00014bcc0})
        /usr/local/Cellar/go/1.17/libexec/src/bufio/bufio.go:218 +0x319
io.(*LimitedReader).Read(0xc00048e2a0, {0xc0005fa000, 0x153bb86, 0xc00014bd00})
        /usr/local/Cellar/go/1.17/libexec/src/io/io.go:473 +0x45
bytes.(*Buffer).ReadFrom(0xc0000a11d0, {0x18eb320, 0xc00048e2a0})
        /usr/local/Cellar/go/1.17/libexec/src/bytes/buffer.go:204 +0x98
io.copyBuffer({0x18ea9a0, 0xc0000a11d0}, {0x18eb320, 0xc00048e2a0}, {0x0, 0x0, 0x0})
        /usr/local/Cellar/go/1.17/libexec/src/io/io.go:409 +0x14b
io.Copy(...)
        /usr/local/Cellar/go/1.17/libexec/src/io/io.go:382
github.com/hashicorp/yamux.(*Stream).readData(0xc0000a20d0, {0xc00011c040, 0xc000000002, 0xc}, 0xc998, {0x18ea940, 0xc000443920})
        /Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/stream.go:482 +0x21a
github.com/hashicorp/yamux.(*Session).handleStreamMessage(0xc0000d9080, {0xc00011c040, 0xc00011c040, 0xc})
        /Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:550 +0x285
github.com/hashicorp/yamux.(*Session).recvLoop(0xc0000d9080)
        /Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:501 +0x114
github.com/hashicorp/yamux.(*Session).recv(0xc0005148a0)
        /Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:462 +0x1e
created by github.com/hashicorp/yamux.newSession
        /Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:113 +0x49b
exit status 2

yamux/stream.go

Lines 471 to 475 in 26ff87c

if s.recvBuf == nil {
// Allocate the receive buffer just-in-time to fit the full data frame.
// This way we can read in the whole packet without further allocations.
s.recvBuf = bytes.NewBuffer(make([]byte, 0, length))
}

Maybe we should grow it's recv buffer large enough to avoid panics? I have tried this modification and it never panics again. But I don't know whether makes it right.

if s.recvBuf.Cap() < int(length) {
s.recvBuf.Grow(int(length) - s.recvBuf.Cap())
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.