Giter Site home page Giter Site logo

sdroege / async-tungstenite Goto Github PK

View Code? Open in Web Editor NEW
382.0 8.0 60.0 294 KB

Async binding for Tungstenite, the Lightweight stream-based WebSocket implementation

License: MIT License

Shell 2.26% Rust 97.74%
rust websocket websockets async tungstenite async-std tokio glib gio

async-tungstenite's People

Contributors

agalakhov avatar alexheretic avatar bluetech avatar carlosmn avatar cbenoit avatar complexspaces avatar daniel-abramov avatar dbcfd avatar deedasmi avatar dnaka91 avatar flosse avatar gdesmott avatar ibaryshnikov avatar jakkusakura avatar jeffesquivels avatar kellytk avatar leo1003 avatar mathiaskoch avatar mehcode avatar mpajkowski avatar najamelan avatar nelsonjchen avatar nemasu avatar nickelc avatar philn avatar randers00 avatar remalloc avatar sdroege avatar spinda avatar yusdacra avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-tungstenite's Issues

Type aliases for `WebSocketStream` would be useful.

Hi,

I have to store the WebSocket that's returned by async_tungstenite::tokio::connect_async in a struct without adding a type parameter to it.

The return value of the above function is Result<(WebSocketStream<StreamSwitcher<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TokioAdapter<TokioAdapter<TcpStream>>>>>>, Response), Error>.

So a WebSocket is represented by:

WebSocketStream<StreamSwitcher<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TokioAdapter<TokioAdapter<TcpStream>>>>>>

Which is pretty complex.

I tried to construct my own type alias, but I failed because I don't have a TLS feature activated, so I can't rebuild the type.

I guess, the problem is connected to the use of certain features and the limitations of cargo doc but at that point, I gave up and switched to tokio-tungstenite (which doesn't seem to have this problem).

I think type aliases would be very useful here (or at least examples that show how to store a WebSocket somewhere).

PS: I'm pretty new to Rust, so sorry if the right way to do this is obvious.

Generic solution to AsyncRead as std::io::Read

I just ran into this: https://crates.io/crates/async-stdio

It's only in an alpha release, I haven't scrutinized it, but if it's good, it's probably worth for tokio-tungstenite, async-tungstenite and potentially other (I thought tokio had the same issue somewhere) to re-use a generic solution to this problem rather than handrolling code for this in several places.

could not find `async_std` in `async_tungstenite`

Hi I'm trying to install async-tungstentine and use it in my project.

I stated by adding the following to my Cargo.toml

async-tungstenite = "*"
futures = "*"

then i tried copying over the examples/async-std-echo.rs to my main.rs and running, however I get the following error.

error[E0432]: unresolved import `async_tungstenite::async_std`
  --> src/main.rs:15:25
   |
15 | use async_tungstenite::{async_std::connect_async, tungstenite::Message};
   |                         ^^^^^^^^^ could not find `async_std` in `async_tungstenite`

error[E0432]: unresolved import `async_std`
  --> src/main.rs:18:5
   |
18 | use async_std::task;
   |     ^^^^^^^^^ use of undeclared type or module `async_std`

warning: unused import: `futures::prelude`

Do i have to enable some features in my Cargo.toml to use this?

Error using accept_async() with Tokio

Hi! Wonderful crate. Everything I've tried so far works great, but there is one rough edge. I'm getting an error when trying to use accept_async with Tokio's TcpStream:

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: futures_io::if_std::AsyncRead` is not satisfied
   --> src/main.rs:8:34
    |
8   |     let ws_stream = accept_async(stream).await.unwrap();
    |                                  ^^^^^^ the trait `futures_io::if_std::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
    | 
   ::: /Users/igor/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.2/src/lib.rs:142:8
    |
142 |     S: AsyncRead + AsyncWrite + Unpin,
    |        --------- required by this bound in `async_tungstenite::accept_async`

... and the same error with AsyncWrite. I suspect using Tokio has something to do with this. All uses of this function in examples rely on async-std's implementation.


Here is a minimal example to reproduce the error:

main.rs

use async_tungstenite::accept_async;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let Ok((stream, _)) = listener.accept().await;
    let ws_stream = accept_async(stream).await.unwrap();
    Ok(())
}

Cargo.toml

[package]
name = "async_fail"
version = "0.0.1"
edition = "2018"

[dependencies]
tokio = { version = "0.2.17", features = ["macros", "tcp", "dns", "io-util"] }
async-tungstenite = { version = "0.4.2", features = ["tokio-tls", "tokio-runtime"] }

[Question] Cancellation of read from websocket stream before completion

use std::time::Duration;
use async_tungstenite::tokio::connect_async;
use futures::StreamExt;
use tokio::time::timeout;

#[tokio::main]
async fn main() {
    let (ws_stream, _) = connect_async("test").await.unwrap();
    let (_write, mut read) = ws_stream.split();

    // First read. Safe ?? 
    if let Err(_) = timeout(Duration::from_millis(10), read.next()).await {
        println!("did not receive message within 10 ms");
    }

    // Second read
    let message = read.next().await;
}

Let's say the first read.next() read few bytes but not whole message within 10 ms. What happen to the bytes that is read? Are they discarded or kept in buffer for next read? So if remaining bytes of message is arrived later, does the second read.next() read the message properly? I wonder whether cancellation of read before completion can cause any problem for next read.

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`

Seems to me that d4eea72 broke the build:

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
 --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/compat.rs:6:18
  |
6 | use futures_io::{AsyncRead, AsyncWrite};
  |                  ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
  |                  |
  |                  no `AsyncRead` in the root

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
 --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/handshake.rs:3:18
  |
3 | use futures_io::{AsyncRead, AsyncWrite};
  |                  ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
  |                  |
  |                  no `AsyncRead` in the root

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
 --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/stream.rs:9:18
  |
9 | use futures_io::{AsyncRead, AsyncWrite};
  |                  ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
  |                  |
  |                  no `AsyncRead` in the root

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
  --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/lib.rs:51:18
   |
51 | use futures_io::{AsyncRead, AsyncWrite};
   |                  ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
   |                  |
   |                  no `AsyncRead` in the root

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
  --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/tokio.rs:11:18
   |
11 | use futures_io::{AsyncRead, AsyncWrite};
   |                  ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
   |                  |
   |                  no `AsyncRead` in the root

error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
  --> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/tokio.rs:23:22
   |
23 |     use futures_io::{AsyncRead, AsyncWrite};
   |                      ^^^^^^^^^  ^^^^^^^^^^ no `AsyncWrite` in the root
   |                      |
   |                      no `AsyncRead` in the root

error: aborting due to 6 previous errors

Pretty much anything of relevance in futures-io is behind the std feature from what I can tell, so you may want to enable it. If I enable it the compilation goes through smoothly.

[Question] What's the best practice to implement a simple websocket client with this crate?

I'm sorry to open an issue here. Maybe could you turn on the Github discussion feature on this repo?

Is this a good way to handle read&write (with match select(...))? https://github.com/l2ust/substrater/blob/main/src/websocket.rs#L58-L110

And how do I know if a connection is lost?
For example, if the connection lost this will cause a deadlock. What kind of check should I do within this function? https://github.com/l2ust/substrater/blob/main/src/websocket.rs#L317
My poor idea is to make a read() one more time to check the connection state.

TLS handshake eof in async-std-echo.rs example when forcing WSS

I don't get a valid WSS connection when trying the example. Not sure if I am doing something wrong.

Error: Io(Custom { kind: UnexpectedEof, error: "tls handshake eof" })

This is my Cargo.toml

[package]
name = "async_maker"
version = "0.1.0"
authors = ["Fredrik Park <[email protected]>"]
edition = "2018"

[dependencies]

async-std = "1.6.3"
futures = "0.3"
async-tungstenite = { version = "0.8", default-features = false, features = ["async-tls", "async-std-runtime"] }

For completeness I am including the code that I use (it's the example with the ws:// url removed).

use async_tungstenite::async_std::connect_async;
use async_tungstenite::tungstenite::Message;
use futures::prelude::*;
use async_std::task;

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    let url = "wss://echo.websocket.org";

    let (mut ws_stream, _) = connect_async(url).await?;

    let text = "Hello, World!";

    println!("Sending: \"{}\"", text);
    ws_stream.send(Message::text(text)).await?;

    let msg = ws_stream
        .next()
        .await
        .ok_or_else(|| "didn't receive anything")??;

    println!("Received: {:?}", msg);

    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    task::block_on(run())
}

Cannot send customized message in example "server"

I am trying to change server.rs so that a customized message is sent instead of a clone of a received one.

Line 75 in "examples/server.rs" changed to
for recp in broadcast_recipients { // recp.unbounded_send(msg.clone()).unwrap(); recp.unbounded_send(Message::text("abc".to_string())).unwrap(); }

But when I open 2 client connections and send messages from one connection, nothing is received on the other connection.

Appreciate it if some help is provided.

Integrating openssl support?

From a brief glance, it appears that TLS support in async-tungstenite requires intentional, custom integration for each TLS crate. Servo is considering switching to tungstenite in servo/servo#27043; would a PR for openssl support in this crate be desirable?

Question about chat server example

This is a question and not an issue so I'm sorry for putting it here. If there's somewhere else to put it, I'll gladly close this and move it there!

I copied the chat server example to set up a multiplayer game where all player clients report their current position and the server broadcasts that to its peers so they can draw each other. One thing I noticed was, when one client closed, all clients were disconnected. I believe this is because the websocket close message that is sent by one client is then broadcast by the server to the other clients. In my application, I updated the code to just not broadcast the message if it was a close message.

I'm wondering two things:

  1. Is my workaround just a hack? Is it best practice to broadcast the close message so other clients know one client has closed? Or should the server consume the close message from one client and broadcast a non-close message to the other clients alerting them of the close? In my experience, the other clients just got the close message and went, "Oh, the server closed. Bye!".
  2. Should the example be updated? I'd be happy to do this so I could get PR feedback on the correct way to do it. I'm currently working my way through the tokio docs but I honestly have a very hard time understanding how this all fits together so far :-)

Error "Task was cancelled" When trying to connect to a websocket

I've been stuck with the following error for the past few days and I am not sure as to how I should overcome it.

thread 'tokio-runtime-worker' panicked at 'Failed to create shard: Tungstenite(Io(Custom { kind: Other, error: "task was cancelled" }))', gateway/src/manager.rs:141:49
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I've managed to trace it back to the following piece of code:

let (wstream, _) = async_tungstenite::tokio::connect_async_with_config(ws,Some(WebSocketConfig {
    max_message_size: Some(usize::max_value()),
    max_frame_size: Some(usize::max_value()),
    ..Default::default()
})).await?;

ws is defined as &str.

I am extremely confused as to what is going on here. The WS URL is valid. It's a discord gateway for a discord library. Any ideas as to why this IO Custom error is being thrown here?

Where does the split() method come from?

Forgive me if this is a bad place to ask, but I cannot find where the async_tungstenite::WebSocketStream object inherits the .split() method from

it's used on line 40 of the echo-server.rs example

async_tungstenite::tokio::ConnectStream doesn't implement Debug when feature "tokio-native-tls" is enabled.

Cargo.toml

[dependencies]
async-tungstenite = { version = "0.13.0", features = ["tokio-runtime", "tokio-native-tls"] }

main.rs

use async_tungstenite::tokio::ConnectStream;

#[derive(Debug)]
struct Foo(ConnectStream);

fn main() {
    println!("Hello, world!");
}

This produce compile error : error[E0277]: async_tungstenite::stream::Stream<TokioAdapter<tokio::net::tcp::stream::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::tcp::stream::TcpStream>>> doesn't implement Debug

I hope this type implements debug.

Is it possible to flush all buffered messages before closing the stream?

We are building a ws api using async-tungstenite and used the close frames of websockets to provide error hints for clients.
The situation which occurs quite often is the following:

async fn pseudo_handler(ws: &mut WebSocketStream<TlsStream>) -> anyhow::Result<()> {
    ws.send(Message::Binary(...).await?;
    ws.close(None).await?;
}

And the client needs / want to rely on the fact that he receives the message before any close sequences.
We know that this is expected behavior but how can we prevent this / flush before sending the close sequence?

Sadly flush() defined in futures::SinkExt and similar didnt work.

Thanks!

Avoid webpki-roots as a default dependency when tokio-rustls is used

I suggest that when the user enables tokio-rustls that no default ClientConfig for Rustls be used, and in particular no use of webpki-roots is used. webpki-roots is rarely the right choice on most platforms, and also there are users (the private projects I work on) that want to ensure that webpki-roots isn't used; ideally it wouldn't appear in our Cargo.lock files at all.

Concretely, I suggest the following change, which isn't backward-compatible: Either remove the tokio::connect_async implementation used when the tokio-rustls feature is enabled, or only expose it when a tokio-rustls-webpki-roots feature is enabled. In the latter case, the webpki-roots dependency should be conditioned on the tokio-rustls-webpki-roots flag.

Example how to use with HTTP server

Hi,
thanks for this library! I'm wondering if there's any existing example on how to couple this with e.g. static file serving over the same port? I'd want to serve static files, and have a /ws route to be handled by async-tungstenite. Are there any examples on how to do this e.g. with tide? Thanks!

max connect just 50 onely

url = "2.2.1"
http = "0.2.3"
libc = "0.2.88"
regex = "1"
lazy_static = "1.4.0"
futures-util = "0.3.13"
tokio = { version = "1.3.0", features = ["full"] }
async-tungstenite ={ version = "0.13.0", features = ["tokio-native-tls"] }


tokio-native-tls = { version = "^0.3.0", optional = true}

code

When the number of connections exceeds 50, only disconnect the socket that is already connected can you connect again

Performance Question

Hi I am trying to compare tungstenite with async_tungstenite to see if their is a huge performance difference between connecting to multiple websockets using multiple threads in tungstenite vs using tokio spawn in async_tungstenite.

I would have expected their to be a huge difference (I come from python and in python these implementations have a huge performance difference), but what I notice is the async implementation seems to use less threads, but more cpu usage. Is this expected? Or am i using the library incorrectly by spawning the async futures in the wrong way? I cant seem to get my async implementation to run faster than the sync.

In my example i connect to the binance market data websockets to compare the performance on my machine. I run cargo build --release and run the target/release/myproject file to compare on my mac. Ive attached code snippets below.

Sync Implementation

use tungstenite::{connect, WebSocket};  
use url::Url;  
use std::{thread, time};  
use tungstenite::handshake::client::Response;  
use tungstenite::protocol::Message;  
use tungstenite::client::AutoStream;  
  
  
const WS_BASE_URL: &'static str = "wss://stream.binance.com:9443";  
  
 
fn connect_to_markets(url: &String) -> WebSocket<AutoStream>{  
    let (socket, response) = connect(Url::parse(url).unwrap()).expect("Cant Connect");  
  socket  
}  
  
fn subscribe_to_market(pair: &'static str){  
    let ws_url = format!("{}/ws/{}@depth20", WS_BASE_URL, pair);  
  println!("{}", &ws_url);  
  let mut socket = connect_to_markets(&ws_url);  
  loop{  
        let msg = socket.read_message().expect("Error reading message");  
  match msg{  
            Message::Text(msgtxt)=> {  
                                       println!("Message {:?}", msgtxt)    
                                            },  
  Message::Ping(_msg) => continue,  
  Message::Binary(_msg)=> continue,  
  Message::Pong(_msg)=> continue,  
  Message::Close(msg)=> {println!("Close {:?}", msg); // re-open connection here  
  let mut socket = connect_to_markets(&ws_url);  
  },  
  };  
  }  
}  
  
fn main() {  
    let markets = vec!["btcusdt", "ethusdt", "etcusdt", "bchusdt","bnbusdt", "eosusdt","xrpusdt" ,"ltcusdt","busdusdt","zecusdt","adausdt", "btcpax","atomusdt", "trxusdt", "xlmusdt", "bttusdt", "vetusdt", "tusdusdt"];  
  let mut children = vec![];  
  for pair in markets{  
        children.push(thread::spawn(move||{  
            println!("Market started for {}", pair);  
  subscribe_to_market(pair);  
  }))  
    }  
    for child in children{  
        let _ = child.join();  
  }  
}

Async Implementation

use tokio;  
use async_tungstenite::{tokio::connect_async};  
use futures::prelude::*;  
use tungstenite::protocol::Message;  
  

  
// Base url for ws  
const WS_BASE_URL: &'static str = "wss://stream.binance.com:9443";  
  
// function that subscribes to markets and currently just prints out the ob  
async fn subscribe_to_market(pair: &str) {  
    // format the ws_url  
  let ws_url: String = format!("{}/ws/{}@depth20", WS_BASE_URL, pair);  
  // build the ws url connection request add tls support for wss  
  #[cfg(not(any(feature = "tokio-tls", feature = "tokio-runtime")))]  
  let url = url::Url::parse(&ws_url).unwrap();  
  // connect to the ws  
  let (mut ws_stream, _) = connect_async(url.clone()).await.unwrap();  
  // while connected  
  loop {  
        // wait for messages  
  let msg = ws_stream  
  .next()  
            .await  
  .ok_or_else(|| "didn't receive anything").unwrap().unwrap();  
  
  match msg{  
            Message::Text(msgtxt)=> {  
                println!("Message {:?}", msg);
  } ,  
  Message::Ping(_msg) => continue,  
  Message::Binary(_msg)=> continue,  
  Message::Pong(_msg)=> continue,  
  Message::Close(msg)=> {println!("Close {:?}", msg); // re-open connection here  
  let (mut ws_stream, _) = connect_async(url.clone()).await.unwrap();  
  },  
  };  
  }  
}  
#[tokio::main]  
async fn main(){  
    let markets = vec!["btcusdt", "ethusdt", "etcusdt", "bchusdt","bnbusdt", "eosusdt","xrpusdt" ,"ltcusdt","busdusdt","zecusdt","adausdt", "btcpax","atomusdt", "trxusdt", "xlmusdt", "bttusdt", "vetusdt", "tusdusdt"];  
  let mut children = vec![];  
  for pair in markets{  
        children.push(  
            tokio::spawn( subscribe_to_market(pair))  
//subscribe_to_market(pair)  
  );  
  }  
    let all_futures = futures::future::join_all(children);  
  all_futures.await;  
}

Keep a changelog

When upgrading to a new version it would be nice to know what changed. I'm suggesting that for future releases a changelog is kept in a file like CHANGELOG.md or similar.

Please consider linking to ws_stream_tungstenite in the README

I'd love to have a link to the ws_stream_tungstenite crate in the README, as a suggestion for how to treat a WebSocket as a continuous stream of binary data rather than as discrete messages. That crate was hard to find, and a cross-reference would really help.

async-tls version upgrade

Currently pointing to 0.7; latest is 0.9.0

I recently had problems with incompatible structs for rustls 0.17 and 0.18.

How to add TLS support to a websocket server for wss to work?

I have a wasm client app (using websocket support provided by the ws_stream_wasm crate) that connects to a websocket server (using websocket support provided by the async-tungstenite crate). Non-secure connection is working good. However, now I want to add a secure (wss) connection between them.
This is what I am trying to do on the server side using async-tls:

fn load_config(options: &Options) -> io::Result<ServerConfig> {
    let certs = load_certs(&options.cert)?;
    let mut keys = load_keys(&options.key)?;

    // we don't use client authentication
    let mut config = ServerConfig::new(NoClientAuth::new());
    config
          // set this server to use one cert together with the loaded private key
          .set_single_cert(certs, keys.remove(0))
          .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;

    Ok(config)
}
...

async fn handle_connection(acceptor: &TlsAcceptor, stream: TcpStream) -> Result<(), Error> {
    // Calling `acceptor.accept` will start the TLS handshake
    let handshake = acceptor.accept(stream);
    // The handshake is a future we can await to get an encrypted
    // stream back.
    let mut tls_stream = handshake.await?;

    let ws_stream = async_tungstenite::accept_async(tls_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    ...   
}

The above was just copied and modified from here.

When my wasm client app now connects using a wss url, I get a Custom { kind: InvalidData, error: AlertReceived(DecryptError) } on this line: let mut tls_stream = handshake.await?;.
The wasm client app isn't passing any certificates or other information specific to TLS. All I did was just replaced ws with wss in the url part.

Is this how I should be adding support for wss on the server?

EDIT: Is there a function in async-tungstenite that I can use directly to accept a client connection over wss automatically instead of using async-tls?

Support fragmented messages

I have the use case where I have to send a message of X bytes, and I know X in advance, but it is fairly large so I'd like to not have to have the entire message in memory beforehand. Instead, I'd like to lazily generate it in small chunks as they are being sent. (Of course there will need to be a check afterwards that exactly as many bytes as advertised were sent)

Would such a feature be feasible?

Similarly, but also less importantly, what about that feature for the reading direction?

"thread 'async-std/runtime' panicked" when running "echo-server" example

Per the comments I run

cargo run --features="async-std-runtime" --example echo-server 127.0.0.1:123452

And I made two connection's using the "websocat" utility crate.

websocat ws://127.0.0.1:12345

Everything works as expected, I can send/receive from both clients, but at some point I see a panic in the server output.

This is at f5025ed / "0.17.2"

Here is my rustup show output

Default host: x86_64-apple-darwin
rustup home:  /Users/mburr/.rustup

installed toolchains
--------------------

stable-x86_64-apple-darwin (default)
nightly-x86_64-apple-darwin
1.52.1-x86_64-apple-darwin
1.60.0-x86_64-apple-darwin

installed targets for active toolchain
--------------------------------------

thumbv7m-none-eabi
wasm32-unknown-unknown
x86_64-apple-darwin

active toolchain
----------------

stable-x86_64-apple-darwin (default)
rustc 1.61.0 (fe5b13d68 2022-05-18)

Here is the ugly panic output on the terminal.

   Compiling async-tungstenite v0.17.2 (/Users/mburr/git/async-tungstenite)
    Finished dev [unoptimized + debuginfo] target(s) in 18.31s
     Running `target/debug/examples/echo-server '127.0.0.1:12345'`
thread 'async-std/runtime' panicked at 'Failed to forward messages: Protocol(ResetWithoutClosingHandshake)', examples/echo-server.rs:53:10
stack backtrace:
   0: rust_begin_unwind
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:584:5
   1: core::panicking::panic_fmt
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:143:14
   2: core::result::unwrap_failed
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/result.rs:1785:5
   3: core::result::Result<T,E>::expect
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/result.rs:1035:23
   4: echo_server::accept_connection::{{closure}}
             at ./examples/echo-server.rs:50:5
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
   6: <async_std::task::builder::SupportTaskLocals<F> as core::future::future::Future>::poll::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/builder.rs:199:17
   7: async_std::task::task_locals_wrapper::TaskLocalsWrapper::set_current::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/task_locals_wrapper.rs:60:13
   8: std::thread::local::LocalKey<T>::try_with
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:442:16
   9: std::thread::local::LocalKey<T>::with
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:418:9
  10: async_std::task::task_locals_wrapper::TaskLocalsWrapper::set_current
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/task_locals_wrapper.rs:55:9
  11: <async_std::task::builder::SupportTaskLocals<F> as core::future::future::Future>::poll
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/builder.rs:197:13
  12: async_executor::Executor::spawn::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:144:19
  13: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
  14: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/future.rs:124:9
  15: async_task::raw::RawTask<F,T,S>::run
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.3.0/src/raw.rs:511:20
  16: async_task::runnable::Runnable::run
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.3.0/src/runnable.rs:309:18
  17: async_executor::Executor::run::{{closure}}::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:235:21
  18: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
  19: <futures_lite::future::Or<F1,F2> as core::future::future::Future>::poll
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:529:33
  20: async_executor::Executor::run::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:242:31
  21: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
  22: <futures_lite::future::Or<F1,F2> as core::future::future::Future>::poll
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:529:33
  23: async_io::driver::block_on
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-io-1.7.0/src/driver.rs:142:33
  24: async_global_executor::reactor::block_on::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/reactor.rs:3:18
  25: async_global_executor::reactor::block_on
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/reactor.rs:12:5
  26: async_global_executor::threading::thread_main_loop::{{closure}}::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:95:17
  27: std::thread::local::LocalKey<T>::try_with
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:442:16
  28: std::thread::local::LocalKey<T>::with
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:418:9
  29: async_global_executor::threading::thread_main_loop::{{closure}}
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:89:13
  30: std::panicking::try::do_call
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:492:40
  31: ___rust_try
  32: std::panicking::try
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:456:19
  33: std::panic::catch_unwind
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panic.rs:137:14
  34: async_global_executor::threading::thread_main_loop
             at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:88:12
  35: core::ops::function::FnOnce::call_once
             at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Examples for WSS server with Tungstenite

I'm unclear the method to accept clients wishing to communicate over TLS. From looking at the code I don't see a definition of any accept function under the TLS modules, and there aren't examples for this usecase either.

TLS Features are not additive

This code uses cargo features in a non-additive way.

Imagine a library A that enables tokio-rustls feature and uses rustls types (e.g. rustls-based Connector).

Also, imagine a library B that enables tokio-openssl features and uses openssl types (e.g. openssl-based Connector).

In isolation they will work.

But when they appear in one crate graph, asunc-tungstenite will select only rustls-based implementation, and library B will fail to compile.

Using custom TlsConnector

Disclaimer: I am not convinced this is a problem with async-tungstenite itself, rather than just being the consequence of the in-flux state of the async ecosystem, but I figured I might as well file it at the "entry point" of my problem, since I could not think of a better alternative :-)

I am trying to connect to a websocket server with a self-signed certificate.

Digging into things a little bit, I figured that I need to use the client api instead of the high level connect-async, and use my own TlsConnector, see for instance (snapview/tungstenite-rs#84).
Since this is the async version of tungstenite, I figured I would need an async TLS connector.
I also saw that tungstenite uses native-tls.

I found that https://github.com/dbcfd/tls-async is an async version of native-tls, but it does not seem active and currently does not compile.

On the other hand, there is https://github.com/async-rs/async-tls but that uses rust-tls, so I do not think I can use that with async-tungstenite,

Any suggestion?

Initial report: stack overflow on windows with 0.15

Hi! I'm still trying to understand this, but the following code works with 0.14 and stack overflows on windows with 0.15:

[package]
name = "async-tungstenite-stream-overflow-windows"
version = "0.1.0"
edition = "2018"

[dependencies]
async-global-executor = "2.0.2"
async-net = "1.6.1"
async-tungstenite = "0.15.0"
futures-util = "0.3.17"
fn main() {
    async_global_executor::block_on(async {
        let socket = async_net::TcpStream::connect("localhost:8080").await.unwrap();
        let (mut client, _) = async_tungstenite::client_async("ws://localhost/some/route", socket).await.unwrap();
        use futures_util::{SinkExt, StreamExt};
        client.send(async_tungstenite::tungstenite::Message::text("hello")).await.unwrap();
        dbg!(&client.next().await);  // commenting this line out avoids the stack overflow
    });
}

This expects a websocket echo server of some sort running at :8080, but I believe as long as the server responds with a message, this will overflow the stack on 0.15. Expected output (and 0.14 output) is something like:

&client.next().await = Some(
    Ok(
        Text(
            "received your message: hello",
        ),
    ),
)

Instead, the output is

thread 'main' has overflowed its stack
error: process didn't exit successfully: `target\debug\tungstenite-stream-overflow-windows.exe` (exit code: 0xc00000fd, STATUS_STACK_OVERFLOW)

I don't know nearly anything about windows and have been trying to investigate this on a cloud instance, but someone with a local windows machine will likely have more success. I have no understanding of why this is fine on my local mac.

Let me know if there's any more information I can provide to help investigate this, and thanks for async tungstenite!

How to close connection gracefully after WebSocketStream.split()?

For a simple situation, the close() method needs to pass CloseFrame:

let close_frame = CloseFrame {code: CloseCode::Normal, reason: Cow::from("")};
ws_stream.close(close_frame).await;

But if I call split() for ws_stream to create tasks with their participation, method close() stops taking arguments, what throws an exception on the echo server side, like this:
websockets.exceptions.ConnectionClosedError: received 1005 (no status code [internal]); then sent 1005 (no status code [internal])
The server continues to run, but in its console it is always an exception when the connection is terminated in the last way.

Creating a executer agnostic crate via Nuclei?

Hi,

have you seen Nuclei from the Bastion project?

Wouldn't it be possible with Nuclei to create an executer agnostic version of this crate? Or at least, wouldn't it be possible for users of this crate to create a crate that doesn't depend on a specific executer?

Nuclei supports (via Agnostik): Bastion, Async-Std, Tokio, and Smol โ€“ so basically all major executors, it seems.

I still try to wrap my head around all this, but wouldn't it be possible at the moment to create a TCP socket to the WebSocket server using Nuclei, and then pass that socket/stream to client_async?

Are there any disadvantages to this approach?

I think it would make it much easier to create executer agnostic crates.

At the moment, it seems, I have two options with async-tungstenite:

A) I let the user create the WebSocket / require a function as an argument that creates the WebSocket (and use Agnostik or async_executors as an abstraction over the executors).

B) use feature flags to implement executer specific code

With Nuclei, it seems, it would be possible to just request a supported executer and use a WebSocket that uses a Nuclei TCP Stream.

What do you think?

Disable certificate verification

Nice crate! But I just need your help.

There was a server with which the client failed to verify the certificate when I established a TLS connection.

The following is the error message:

tungstenite error IO error: invalid certificate: UnknownIssuer

Is there any way to disable certificate verification.

Disable certificate validation

It was possible to disable certificate validation in 0.2.1 using the following code:

async fn connect_tls_disable_cert_verification<R>(
    req: R,
) -> std::result::Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), tungstenite::Error>
where
    R: Into<Request<'static>> + Unpin,
{
    use async_tls::TlsConnector;
    use rustls::ClientConfig;
    use std::sync::Arc;

    mod danger {
        use webpki;

        pub struct NoCertificateVerification {}

        impl rustls::ServerCertVerifier for NoCertificateVerification {
            fn verify_server_cert(
                &self,
                _roots: &rustls::RootCertStore,
                _presented_certs: &[rustls::Certificate],
                _dns_name: webpki::DNSNameRef<'_>,
                _ocsp: &[u8],
            ) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
                Ok(rustls::ServerCertVerified::assertion())
            }
        }
    }

    let mut request: Request = req.into();
    let mut config = ClientConfig::new();
    config
        .dangerous()
        .set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
    let connector = TlsConnector::from(Arc::new(config));

    async_tungstenite::connect_async_with_tls_connector(request, Some(connector)).await
}

This code fails in the latest version. Is it still possible to do this?

How to disable logging for compact?

These tracing message is hardly useful in my case. It only shows that my program is running and fills my disk space very quickly. Is there a way to disable this?

May 07 16:09:17.848 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:301 Stream.poll_next    
May 07 16:09:17.848 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context    
May 07 16:09:17.848 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:306 Stream.with_context poll_next -> read_message()    
May 07 16:09:17.848 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush    
May 07 16:09:17.848 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.848 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush    
May 07 16:09:17.849 TRACE                 main tungstenite::protocol: Frames still in queue: 0    
May 07 16:09:17.849 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:147 Read.read    
May 07 16:09:17.849 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.849 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:152 Read.with_context read -> poll_read    
May 07 16:09:17.849 TRACE                 main async_tungstenite::compat: WouldBlock    
May 07 16:09:17.851 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context    
May 07 16:09:17.851 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush    
May 07 16:09:17.851 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.851 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush    
May 07 16:09:17.851 TRACE                 main tungstenite::protocol: Frames still in queue: 0    
May 07 16:09:17.851 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:301 Stream.poll_next    
May 07 16:09:17.852 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context    
May 07 16:09:17.852 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:306 Stream.with_context poll_next -> read_message()    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush    
May 07 16:09:17.852 TRACE                 main tungstenite::protocol: Frames still in queue: 0    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:147 Read.read    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:152 Read.with_context read -> poll_read    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: WouldBlock    
May 07 16:09:17.852 TRACE                 main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context    
May 07 16:09:17.852 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush    
May 07 16:09:17.853 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context    
May 07 16:09:17.853 TRACE                 main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush    
May 07 16:09:17.853 TRACE                 main tungstenite::protocol: Frames still in queue: 0    

Inspecting the steam buffer

I'm making a client-side connection that will almost certainly receive messages faster than it can process them and so I was looking to inspect the buffered set of messages, or perhaps quickly push the messages into a buffer of my own.

I managed to find size_hint but I suspect async-tungstenite (or perhaps one of the layers below) is using the default implementation, as the method keeps returning (0, None) which is what the default implementation would do.

Any suggestions on how to get some control over the buffering? In my situation, it's acceptable to close the connection or send a "please stop" message to the server, but that still requires a trigger.

Release v0.10.0 to crates.io

My projects are waiting for this release so I can upgrade to tokio 0.3. the release is already in the master branch. Is there anything holding back the release? If not I would appreciate if 0.10.0 could be published on crates.io. ๐Ÿ‘

Can't establish web socket

The latest release (0.4.0) seems to error when you attempt to create a websocket

You can reproduce using:

use async_std::task;
use async_std::net::{TcpListener, TcpStream, SocketAddr};

fn main () {
    task::block_on(server());
}
async fn server() {
    let server = TcpListener::bind("127.0.0.1:3012").await.unwrap();
    while let Ok((stream, addr)) = server.accept().await {
        task::spawn(handle_connection(stream, addr));
    }
}

async fn handle_connection(raw_stream: TcpStream, _addr: SocketAddr) {
    println!("{:?}", _addr);
    async_tungstenite::accept_async(raw_stream).await.unwrap();
}

Then create a websocket using new WebSocket("ws://localhost:3012").

Weirdly enough, it's still possible to create a websocket connection using websocat

Question: How should I go about sending close frames when my program is exiting?

Hello,

I have a small program serving long-lasting websockets, I'd like to cleanly close those when my program exits.
Right now, I just drop the stream and it causes the client to exit with Error 1006/EOF.
How would you suggest handling that?
It's not the end of the world if it remains that way, the server isn't exposed to the public, I'm just curious about what would be good practice

Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.