sdroege / async-tungstenite Goto Github PK
View Code? Open in Web Editor NEWAsync binding for Tungstenite, the Lightweight stream-based WebSocket implementation
License: MIT License
Async binding for Tungstenite, the Lightweight stream-based WebSocket implementation
License: MIT License
Hi,
I have to store the WebSocket that's returned by async_tungstenite::tokio::connect_async
in a struct without adding a type parameter to it.
The return value of the above function is Result<(WebSocketStream<StreamSwitcher<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TokioAdapter<TokioAdapter<TcpStream>>>>>>, Response), Error>
.
So a WebSocket is represented by:
WebSocketStream<StreamSwitcher<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TokioAdapter<TokioAdapter<TcpStream>>>>>>
Which is pretty complex.
I tried to construct my own type alias, but I failed because I don't have a TLS feature activated, so I can't rebuild the type.
I guess, the problem is connected to the use of certain features and the limitations of cargo doc
but at that point, I gave up and switched to tokio-tungstenite
(which doesn't seem to have this problem).
I think type aliases would be very useful here (or at least examples that show how to store a WebSocket somewhere).
PS: I'm pretty new to Rust, so sorry if the right way to do this is obvious.
I just ran into this: https://crates.io/crates/async-stdio
It's only in an alpha release, I haven't scrutinized it, but if it's good, it's probably worth for tokio-tungstenite, async-tungstenite and potentially other (I thought tokio had the same issue somewhere) to re-use a generic solution to this problem rather than handrolling code for this in several places.
For urls like like ws://localhost:8080/?a=3
, how could we get a=3
on the server?
Hi I'm trying to install async-tungstentine and use it in my project.
I stated by adding the following to my Cargo.toml
async-tungstenite = "*"
futures = "*"
then i tried copying over the examples/async-std-echo.rs to my main.rs and running, however I get the following error.
error[E0432]: unresolved import `async_tungstenite::async_std`
--> src/main.rs:15:25
|
15 | use async_tungstenite::{async_std::connect_async, tungstenite::Message};
| ^^^^^^^^^ could not find `async_std` in `async_tungstenite`
error[E0432]: unresolved import `async_std`
--> src/main.rs:18:5
|
18 | use async_std::task;
| ^^^^^^^^^ use of undeclared type or module `async_std`
warning: unused import: `futures::prelude`
Do i have to enable some features in my Cargo.toml to use this?
Hi! Wonderful crate. Everything I've tried so far works great, but there is one rough edge. I'm getting an error when trying to use accept_async
with Tokio's TcpStream
:
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: futures_io::if_std::AsyncRead` is not satisfied
--> src/main.rs:8:34
|
8 | let ws_stream = accept_async(stream).await.unwrap();
| ^^^^^^ the trait `futures_io::if_std::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
::: /Users/igor/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.2/src/lib.rs:142:8
|
142 | S: AsyncRead + AsyncWrite + Unpin,
| --------- required by this bound in `async_tungstenite::accept_async`
... and the same error with AsyncWrite
. I suspect using Tokio has something to do with this. All uses of this function in examples rely on async-std's implementation.
Here is a minimal example to reproduce the error:
main.rs
use async_tungstenite::accept_async;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let Ok((stream, _)) = listener.accept().await;
let ws_stream = accept_async(stream).await.unwrap();
Ok(())
}
Cargo.toml
[package]
name = "async_fail"
version = "0.0.1"
edition = "2018"
[dependencies]
tokio = { version = "0.2.17", features = ["macros", "tcp", "dns", "io-util"] }
async-tungstenite = { version = "0.4.2", features = ["tokio-tls", "tokio-runtime"] }
use std::time::Duration;
use async_tungstenite::tokio::connect_async;
use futures::StreamExt;
use tokio::time::timeout;
#[tokio::main]
async fn main() {
let (ws_stream, _) = connect_async("test").await.unwrap();
let (_write, mut read) = ws_stream.split();
// First read. Safe ??
if let Err(_) = timeout(Duration::from_millis(10), read.next()).await {
println!("did not receive message within 10 ms");
}
// Second read
let message = read.next().await;
}
Let's say the first read.next()
read few bytes but not whole message within 10 ms. What happen to the bytes that is read? Are they discarded or kept in buffer for next read? So if remaining bytes of message is arrived later, does the second read.next()
read the message properly? I wonder whether cancellation of read before completion can cause any problem for next read.
Seems to me that d4eea72 broke the build:
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/compat.rs:6:18
|
6 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/handshake.rs:3:18
|
3 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/stream.rs:9:18
|
9 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/lib.rs:51:18
|
51 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/tokio.rs:11:18
|
11 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error[E0432]: unresolved imports `futures_io::AsyncRead`, `futures_io::AsyncWrite`
--> .cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.4.1/src/tokio.rs:23:22
|
23 | use futures_io::{AsyncRead, AsyncWrite};
| ^^^^^^^^^ ^^^^^^^^^^ no `AsyncWrite` in the root
| |
| no `AsyncRead` in the root
error: aborting due to 6 previous errors
Pretty much anything of relevance in futures-io
is behind the std
feature from what I can tell, so you may want to enable it. If I enable it the compilation goes through smoothly.
I'm sorry to open an issue here. Maybe could you turn on the Github discussion feature on this repo?
Is this a good way to handle read&write (with match select(...)
)? https://github.com/l2ust/substrater/blob/main/src/websocket.rs#L58-L110
And how do I know if a connection is lost?
For example, if the connection lost this will cause a deadlock. What kind of check should I do within this function? https://github.com/l2ust/substrater/blob/main/src/websocket.rs#L317
My poor idea is to make a read()
one more time to check the connection state.
I don't get a valid WSS connection when trying the example. Not sure if I am doing something wrong.
Error: Io(Custom { kind: UnexpectedEof, error: "tls handshake eof" })
This is my Cargo.toml
[package]
name = "async_maker"
version = "0.1.0"
authors = ["Fredrik Park <[email protected]>"]
edition = "2018"
[dependencies]
async-std = "1.6.3"
futures = "0.3"
async-tungstenite = { version = "0.8", default-features = false, features = ["async-tls", "async-std-runtime"] }
For completeness I am including the code that I use (it's the example with the ws:// url removed).
use async_tungstenite::async_std::connect_async;
use async_tungstenite::tungstenite::Message;
use futures::prelude::*;
use async_std::task;
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let url = "wss://echo.websocket.org";
let (mut ws_stream, _) = connect_async(url).await?;
let text = "Hello, World!";
println!("Sending: \"{}\"", text);
ws_stream.send(Message::text(text)).await?;
let msg = ws_stream
.next()
.await
.ok_or_else(|| "didn't receive anything")??;
println!("Received: {:?}", msg);
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
task::block_on(run())
}
I am trying to change server.rs so that a customized message is sent instead of a clone of a received one.
Line 75 in "examples/server.rs" changed to
for recp in broadcast_recipients { // recp.unbounded_send(msg.clone()).unwrap(); recp.unbounded_send(Message::text("abc".to_string())).unwrap(); }
But when I open 2 client connections and send messages from one connection, nothing is received on the other connection.
Appreciate it if some help is provided.
From a brief glance, it appears that TLS support in async-tungstenite requires intentional, custom integration for each TLS crate. Servo is considering switching to tungstenite in servo/servo#27043; would a PR for openssl support in this crate be desirable?
Help how to fix
This is a question and not an issue so I'm sorry for putting it here. If there's somewhere else to put it, I'll gladly close this and move it there!
I copied the chat server example to set up a multiplayer game where all player clients report their current position and the server broadcasts that to its peers so they can draw each other. One thing I noticed was, when one client closed, all clients were disconnected. I believe this is because the websocket close message that is sent by one client is then broadcast by the server to the other clients. In my application, I updated the code to just not broadcast the message if it was a close message.
I'm wondering two things:
tokio
docs but I honestly have a very hard time understanding how this all fits together so far :-)I've been stuck with the following error for the past few days and I am not sure as to how I should overcome it.
thread 'tokio-runtime-worker' panicked at 'Failed to create shard: Tungstenite(Io(Custom { kind: Other, error: "task was cancelled" }))', gateway/src/manager.rs:141:49
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
I've managed to trace it back to the following piece of code:
let (wstream, _) = async_tungstenite::tokio::connect_async_with_config(ws,Some(WebSocketConfig {
max_message_size: Some(usize::max_value()),
max_frame_size: Some(usize::max_value()),
..Default::default()
})).await?;
ws
is defined as &str
.
I am extremely confused as to what is going on here. The WS URL is valid. It's a discord gateway for a discord library. Any ideas as to why this IO Custom error is being thrown here?
Hi, I'm using tokio_tls
and my program still uses openssl
, maybe I'm doing something wrong. How can I fix that?
Forgive me if this is a bad place to ask, but I cannot find where the async_tungstenite::WebSocketStream
object inherits the .split()
method from
it's used on line 40 of the echo-server.rs
example
Cargo.toml
[dependencies]
async-tungstenite = { version = "0.13.0", features = ["tokio-runtime", "tokio-native-tls"] }
main.rs
use async_tungstenite::tokio::ConnectStream;
#[derive(Debug)]
struct Foo(ConnectStream);
fn main() {
println!("Hello, world!");
}
This produce compile error : error[E0277]: async_tungstenite::stream::Stream<TokioAdapter<tokio::net::tcp::stream::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::tcp::stream::TcpStream>>>
doesn't implement Debug
I hope this type implements debug.
We are building a ws api using async-tungstenite and used the close frames of websockets to provide error hints for clients.
The situation which occurs quite often is the following:
async fn pseudo_handler(ws: &mut WebSocketStream<TlsStream>) -> anyhow::Result<()> {
ws.send(Message::Binary(...).await?;
ws.close(None).await?;
}
And the client needs / want to rely on the fact that he receives the message before any close sequences.
We know that this is expected behavior but how can we prevent this / flush before sending the close sequence?
Sadly flush()
defined in futures::SinkExt
and similar didnt work.
Thanks!
I suggest that when the user enables tokio-rustls that no default ClientConfig
for Rustls be used, and in particular no use of webpki-roots is used. webpki-roots is rarely the right choice on most platforms, and also there are users (the private projects I work on) that want to ensure that webpki-roots isn't used; ideally it wouldn't appear in our Cargo.lock files at all.
Concretely, I suggest the following change, which isn't backward-compatible: Either remove the tokio::connect_async
implementation used when the tokio-rustls
feature is enabled, or only expose it when a tokio-rustls-webpki-roots
feature is enabled. In the latter case, the webpki-roots
dependency should be conditioned on the tokio-rustls-webpki-roots
flag.
Hi,
thanks for this library! I'm wondering if there's any existing example on how to couple this with e.g. static file serving over the same port? I'd want to serve static files, and have a /ws
route to be handled by async-tungstenite. Are there any examples on how to do this e.g. with tide? Thanks!
0.13 allows rustls backend, would be nice to get a 0.13 version bump if possible. Thanks!
url = "2.2.1"
http = "0.2.3"
libc = "0.2.88"
regex = "1"
lazy_static = "1.4.0"
futures-util = "0.3.13"
tokio = { version = "1.3.0", features = ["full"] }
async-tungstenite ={ version = "0.13.0", features = ["tokio-native-tls"] }
tokio-native-tls = { version = "^0.3.0", optional = true}
When the number of connections exceeds 50, only disconnect the socket that is already connected can you connect again
Hi I am trying to compare tungstenite with async_tungstenite to see if their is a huge performance difference between connecting to multiple websockets using multiple threads in tungstenite vs using tokio spawn in async_tungstenite.
I would have expected their to be a huge difference (I come from python and in python these implementations have a huge performance difference), but what I notice is the async implementation seems to use less threads, but more cpu usage. Is this expected? Or am i using the library incorrectly by spawning the async futures in the wrong way? I cant seem to get my async implementation to run faster than the sync.
In my example i connect to the binance market data websockets to compare the performance on my machine. I run cargo build --release
and run the target/release/myproject file to compare on my mac. Ive attached code snippets below.
Sync Implementation
use tungstenite::{connect, WebSocket};
use url::Url;
use std::{thread, time};
use tungstenite::handshake::client::Response;
use tungstenite::protocol::Message;
use tungstenite::client::AutoStream;
const WS_BASE_URL: &'static str = "wss://stream.binance.com:9443";
fn connect_to_markets(url: &String) -> WebSocket<AutoStream>{
let (socket, response) = connect(Url::parse(url).unwrap()).expect("Cant Connect");
socket
}
fn subscribe_to_market(pair: &'static str){
let ws_url = format!("{}/ws/{}@depth20", WS_BASE_URL, pair);
println!("{}", &ws_url);
let mut socket = connect_to_markets(&ws_url);
loop{
let msg = socket.read_message().expect("Error reading message");
match msg{
Message::Text(msgtxt)=> {
println!("Message {:?}", msgtxt)
},
Message::Ping(_msg) => continue,
Message::Binary(_msg)=> continue,
Message::Pong(_msg)=> continue,
Message::Close(msg)=> {println!("Close {:?}", msg); // re-open connection here
let mut socket = connect_to_markets(&ws_url);
},
};
}
}
fn main() {
let markets = vec!["btcusdt", "ethusdt", "etcusdt", "bchusdt","bnbusdt", "eosusdt","xrpusdt" ,"ltcusdt","busdusdt","zecusdt","adausdt", "btcpax","atomusdt", "trxusdt", "xlmusdt", "bttusdt", "vetusdt", "tusdusdt"];
let mut children = vec![];
for pair in markets{
children.push(thread::spawn(move||{
println!("Market started for {}", pair);
subscribe_to_market(pair);
}))
}
for child in children{
let _ = child.join();
}
}
Async Implementation
use tokio;
use async_tungstenite::{tokio::connect_async};
use futures::prelude::*;
use tungstenite::protocol::Message;
// Base url for ws
const WS_BASE_URL: &'static str = "wss://stream.binance.com:9443";
// function that subscribes to markets and currently just prints out the ob
async fn subscribe_to_market(pair: &str) {
// format the ws_url
let ws_url: String = format!("{}/ws/{}@depth20", WS_BASE_URL, pair);
// build the ws url connection request add tls support for wss
#[cfg(not(any(feature = "tokio-tls", feature = "tokio-runtime")))]
let url = url::Url::parse(&ws_url).unwrap();
// connect to the ws
let (mut ws_stream, _) = connect_async(url.clone()).await.unwrap();
// while connected
loop {
// wait for messages
let msg = ws_stream
.next()
.await
.ok_or_else(|| "didn't receive anything").unwrap().unwrap();
match msg{
Message::Text(msgtxt)=> {
println!("Message {:?}", msg);
} ,
Message::Ping(_msg) => continue,
Message::Binary(_msg)=> continue,
Message::Pong(_msg)=> continue,
Message::Close(msg)=> {println!("Close {:?}", msg); // re-open connection here
let (mut ws_stream, _) = connect_async(url.clone()).await.unwrap();
},
};
}
}
#[tokio::main]
async fn main(){
let markets = vec!["btcusdt", "ethusdt", "etcusdt", "bchusdt","bnbusdt", "eosusdt","xrpusdt" ,"ltcusdt","busdusdt","zecusdt","adausdt", "btcpax","atomusdt", "trxusdt", "xlmusdt", "bttusdt", "vetusdt", "tusdusdt"];
let mut children = vec![];
for pair in markets{
children.push(
tokio::spawn( subscribe_to_market(pair))
//subscribe_to_market(pair)
);
}
let all_futures = futures::future::join_all(children);
all_futures.await;
}
When upgrading to a new version it would be nice to know what changed. I'm suggesting that for future releases a changelog is kept in a file like CHANGELOG.md
or similar.
I'd love to have a link to the ws_stream_tungstenite
crate in the README, as a suggestion for how to treat a WebSocket as a continuous stream of binary data rather than as discrete messages. That crate was hard to find, and a cross-reference would really help.
Currently pointing to 0.7; latest is 0.9.0
I recently had problems with incompatible structs for rustls 0.17 and 0.18.
I have a wasm client app (using websocket support provided by the ws_stream_wasm
crate) that connects to a websocket server (using websocket support provided by the async-tungstenite
crate). Non-secure connection is working good. However, now I want to add a secure (wss
) connection between them.
This is what I am trying to do on the server side using async-tls
:
fn load_config(options: &Options) -> io::Result<ServerConfig> {
let certs = load_certs(&options.cert)?;
let mut keys = load_keys(&options.key)?;
// we don't use client authentication
let mut config = ServerConfig::new(NoClientAuth::new());
config
// set this server to use one cert together with the loaded private key
.set_single_cert(certs, keys.remove(0))
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
Ok(config)
}
...
async fn handle_connection(acceptor: &TlsAcceptor, stream: TcpStream) -> Result<(), Error> {
// Calling `acceptor.accept` will start the TLS handshake
let handshake = acceptor.accept(stream);
// The handshake is a future we can await to get an encrypted
// stream back.
let mut tls_stream = handshake.await?;
let ws_stream = async_tungstenite::accept_async(tls_stream)
.await
.expect("Error during the websocket handshake occurred");
...
}
The above was just copied and modified from here.
When my wasm client app now connects using a wss
url, I get a Custom { kind: InvalidData, error: AlertReceived(DecryptError) }
on this line: let mut tls_stream = handshake.await?;
.
The wasm client app isn't passing any certificates or other information specific to TLS. All I did was just replaced ws
with wss
in the url part.
Is this how I should be adding support for wss
on the server?
EDIT: Is there a function in async-tungstenite
that I can use directly to accept a client connection over wss
automatically instead of using async-tls
?
I have the use case where I have to send a message of X bytes, and I know X in advance, but it is fairly large so I'd like to not have to have the entire message in memory beforehand. Instead, I'd like to lazily generate it in small chunks as they are being sent. (Of course there will need to be a check afterwards that exactly as many bytes as advertised were sent)
Would such a feature be feasible?
Similarly, but also less importantly, what about that feature for the reading direction?
Per the comments I run
cargo run --features="async-std-runtime" --example echo-server 127.0.0.1:123452
And I made two connection's using the "websocat" utility crate.
websocat ws://127.0.0.1:12345
Everything works as expected, I can send/receive from both clients, but at some point I see a panic in the server output.
This is at f5025ed / "0.17.2"
Here is my rustup show
output
Default host: x86_64-apple-darwin
rustup home: /Users/mburr/.rustup
installed toolchains
--------------------
stable-x86_64-apple-darwin (default)
nightly-x86_64-apple-darwin
1.52.1-x86_64-apple-darwin
1.60.0-x86_64-apple-darwin
installed targets for active toolchain
--------------------------------------
thumbv7m-none-eabi
wasm32-unknown-unknown
x86_64-apple-darwin
active toolchain
----------------
stable-x86_64-apple-darwin (default)
rustc 1.61.0 (fe5b13d68 2022-05-18)
Here is the ugly panic output on the terminal.
Compiling async-tungstenite v0.17.2 (/Users/mburr/git/async-tungstenite)
Finished dev [unoptimized + debuginfo] target(s) in 18.31s
Running `target/debug/examples/echo-server '127.0.0.1:12345'`
thread 'async-std/runtime' panicked at 'Failed to forward messages: Protocol(ResetWithoutClosingHandshake)', examples/echo-server.rs:53:10
stack backtrace:
0: rust_begin_unwind
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:584:5
1: core::panicking::panic_fmt
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:143:14
2: core::result::unwrap_failed
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/result.rs:1785:5
3: core::result::Result<T,E>::expect
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/result.rs:1035:23
4: echo_server::accept_connection::{{closure}}
at ./examples/echo-server.rs:50:5
5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
6: <async_std::task::builder::SupportTaskLocals<F> as core::future::future::Future>::poll::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/builder.rs:199:17
7: async_std::task::task_locals_wrapper::TaskLocalsWrapper::set_current::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/task_locals_wrapper.rs:60:13
8: std::thread::local::LocalKey<T>::try_with
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:442:16
9: std::thread::local::LocalKey<T>::with
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:418:9
10: async_std::task::task_locals_wrapper::TaskLocalsWrapper::set_current
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/task_locals_wrapper.rs:55:9
11: <async_std::task::builder::SupportTaskLocals<F> as core::future::future::Future>::poll
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/task/builder.rs:197:13
12: async_executor::Executor::spawn::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:144:19
13: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
14: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/future.rs:124:9
15: async_task::raw::RawTask<F,T,S>::run
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.3.0/src/raw.rs:511:20
16: async_task::runnable::Runnable::run
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.3.0/src/runnable.rs:309:18
17: async_executor::Executor::run::{{closure}}::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:235:21
18: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
19: <futures_lite::future::Or<F1,F2> as core::future::future::Future>::poll
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:529:33
20: async_executor::Executor::run::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:242:31
21: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/future/mod.rs:91:19
22: <futures_lite::future::Or<F1,F2> as core::future::future::Future>::poll
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:529:33
23: async_io::driver::block_on
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-io-1.7.0/src/driver.rs:142:33
24: async_global_executor::reactor::block_on::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/reactor.rs:3:18
25: async_global_executor::reactor::block_on
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/reactor.rs:12:5
26: async_global_executor::threading::thread_main_loop::{{closure}}::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:95:17
27: std::thread::local::LocalKey<T>::try_with
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:442:16
28: std::thread::local::LocalKey<T>::with
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/thread/local.rs:418:9
29: async_global_executor::threading::thread_main_loop::{{closure}}
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:89:13
30: std::panicking::try::do_call
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:492:40
31: ___rust_try
32: std::panicking::try
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:456:19
33: std::panic::catch_unwind
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panic.rs:137:14
34: async_global_executor::threading::thread_main_loop
at /Users/mburr/.cargo/registry/src/github.com-1ecc6299db9ec823/async-global-executor-2.2.0/src/threading.rs:88:12
35: core::ops::function::FnOnce::call_once
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
I'm unclear the method to accept clients wishing to communicate over TLS. From looking at the code I don't see a definition of any accept
function under the TLS modules, and there aren't examples for this usecase either.
This code uses cargo features in a non-additive way.
Imagine a library A that enables tokio-rustls
feature and uses rustls
types (e.g. rustls-based Connector).
Also, imagine a library B that enables tokio-openssl
features and uses openssl
types (e.g. openssl-based Connector).
In isolation they will work.
But when they appear in one crate graph, asunc-tungstenite will select only rustls-based implementation, and library B will fail to compile.
Disclaimer: I am not convinced this is a problem with async-tungstenite
itself, rather than just being the consequence of the in-flux state of the async ecosystem, but I figured I might as well file it at the "entry point" of my problem, since I could not think of a better alternative :-)
I am trying to connect to a websocket server with a self-signed certificate.
Digging into things a little bit, I figured that I need to use the client
api instead of the high level connect-async
, and use my own TlsConnector
, see for instance (snapview/tungstenite-rs#84).
Since this is the async version of tungstenite, I figured I would need an async TLS connector.
I also saw that tungstenite uses native-tls.
I found that https://github.com/dbcfd/tls-async is an async version of native-tls, but it does not seem active and currently does not compile.
On the other hand, there is https://github.com/async-rs/async-tls but that uses rust-tls, so I do not think I can use that with async-tungstenite,
Any suggestion?
Hi! I'm still trying to understand this, but the following code works with 0.14 and stack overflows on windows with 0.15:
[package]
name = "async-tungstenite-stream-overflow-windows"
version = "0.1.0"
edition = "2018"
[dependencies]
async-global-executor = "2.0.2"
async-net = "1.6.1"
async-tungstenite = "0.15.0"
futures-util = "0.3.17"
fn main() {
async_global_executor::block_on(async {
let socket = async_net::TcpStream::connect("localhost:8080").await.unwrap();
let (mut client, _) = async_tungstenite::client_async("ws://localhost/some/route", socket).await.unwrap();
use futures_util::{SinkExt, StreamExt};
client.send(async_tungstenite::tungstenite::Message::text("hello")).await.unwrap();
dbg!(&client.next().await); // commenting this line out avoids the stack overflow
});
}
This expects a websocket echo server of some sort running at :8080, but I believe as long as the server responds with a message, this will overflow the stack on 0.15. Expected output (and 0.14 output) is something like:
&client.next().await = Some(
Ok(
Text(
"received your message: hello",
),
),
)
Instead, the output is
thread 'main' has overflowed its stack
error: process didn't exit successfully: `target\debug\tungstenite-stream-overflow-windows.exe` (exit code: 0xc00000fd, STATUS_STACK_OVERFLOW)
I don't know nearly anything about windows and have been trying to investigate this on a cloud instance, but someone with a local windows machine will likely have more success. I have no understanding of why this is fine on my local mac.
Let me know if there's any more information I can provide to help investigate this, and thanks for async tungstenite!
Hi!
Would you maybe release a new version to crates.io, containing the added tokio-rustls PR?
Is it possible to share multiple owned clones of one stream?
For a simple situation, the close()
method needs to pass CloseFrame
:
let close_frame = CloseFrame {code: CloseCode::Normal, reason: Cow::from("")};
ws_stream.close(close_frame).await;
But if I call split() for ws_stream
to create tasks with their participation, method close()
stops taking arguments, what throws an exception on the echo server side, like this:
websockets.exceptions.ConnectionClosedError: received 1005 (no status code [internal]); then sent 1005 (no status code [internal])
The server continues to run, but in its console it is always an exception when the connection is terminated in the last way.
Hi,
have you seen Nuclei from the Bastion project?
Wouldn't it be possible with Nuclei to create an executer agnostic version of this crate? Or at least, wouldn't it be possible for users of this crate to create a crate that doesn't depend on a specific executer?
Nuclei supports (via Agnostik): Bastion, Async-Std, Tokio, and Smol โ so basically all major executors, it seems.
I still try to wrap my head around all this, but wouldn't it be possible at the moment to create a TCP socket to the WebSocket server using Nuclei, and then pass that socket/stream to client_async?
Are there any disadvantages to this approach?
I think it would make it much easier to create executer agnostic crates.
At the moment, it seems, I have two options with async-tungstenite
:
A) I let the user create the WebSocket / require a function as an argument that creates the WebSocket (and use Agnostik
or async_executors as an abstraction over the executors).
B) use feature flags to implement executer specific code
With Nuclei, it seems, it would be possible to just request a supported executer and use a WebSocket that uses a Nuclei TCP Stream.
What do you think?
Nice crate! But I just need your help.
There was a server with which the client failed to verify the certificate when I established a TLS connection.
The following is the error message:
tungstenite error IO error: invalid certificate: UnknownIssuer
Is there any way to disable certificate verification.
Hello, thanks for this nice crate. Sometimes it requires to accept only handshakes that pass an async I/O validation based on the values from request headers or URI. But currently, we cannot do such operations in the callback of the accept_hdr_async
function.
Any suggestion?
It was possible to disable certificate validation in 0.2.1
using the following code:
async fn connect_tls_disable_cert_verification<R>(
req: R,
) -> std::result::Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), tungstenite::Error>
where
R: Into<Request<'static>> + Unpin,
{
use async_tls::TlsConnector;
use rustls::ClientConfig;
use std::sync::Arc;
mod danger {
use webpki;
pub struct NoCertificateVerification {}
impl rustls::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef<'_>,
_ocsp: &[u8],
) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
}
}
}
let mut request: Request = req.into();
let mut config = ClientConfig::new();
config
.dangerous()
.set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
let connector = TlsConnector::from(Arc::new(config));
async_tungstenite::connect_async_with_tls_connector(request, Some(connector)).await
}
This code fails in the latest version. Is it still possible to do this?
These tracing message is hardly useful in my case. It only shows that my program is running and fills my disk space very quickly. Is there a way to disable this?
May 07 16:09:17.848 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:301 Stream.poll_next
May 07 16:09:17.848 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context
May 07 16:09:17.848 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:306 Stream.with_context poll_next -> read_message()
May 07 16:09:17.848 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush
May 07 16:09:17.848 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.848 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush
May 07 16:09:17.849 TRACE main tungstenite::protocol: Frames still in queue: 0
May 07 16:09:17.849 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:147 Read.read
May 07 16:09:17.849 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.849 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:152 Read.with_context read -> poll_read
May 07 16:09:17.849 TRACE main async_tungstenite::compat: WouldBlock
May 07 16:09:17.851 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context
May 07 16:09:17.851 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush
May 07 16:09:17.851 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.851 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush
May 07 16:09:17.851 TRACE main tungstenite::protocol: Frames still in queue: 0
May 07 16:09:17.851 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:301 Stream.poll_next
May 07 16:09:17.852 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context
May 07 16:09:17.852 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:306 Stream.with_context poll_next -> read_message()
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush
May 07 16:09:17.852 TRACE main tungstenite::protocol: Frames still in queue: 0
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:147 Read.read
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:152 Read.with_context read -> poll_read
May 07 16:09:17.852 TRACE main async_tungstenite::compat: WouldBlock
May 07 16:09:17.852 TRACE main async_tungstenite: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/lib.rs:256 WebSocketStream.with_context
May 07 16:09:17.852 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:182 Write.flush
May 07 16:09:17.853 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:124 AllowStd.with_context
May 07 16:09:17.853 TRACE main async_tungstenite::compat: /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/async-tungstenite-0.13.1/src/compat.rs:187 Write.with_context flush -> poll_flush
May 07 16:09:17.853 TRACE main tungstenite::protocol: Frames still in queue: 0
Hi
I checked in the examples and API how to add headers to the initial HTTP connection, but could not find anything.
How would I add such?
I'm making a client-side connection that will almost certainly receive messages faster than it can process them and so I was looking to inspect the buffered set of messages, or perhaps quickly push the messages into a buffer of my own.
I managed to find size_hint but I suspect async-tungstenite (or perhaps one of the layers below) is using the default implementation, as the method keeps returning (0, None)
which is what the default implementation would do.
Any suggestions on how to get some control over the buffering? In my situation, it's acceptable to close the connection or send a "please stop" message to the server, but that still requires a trigger.
AsyncRead/AsyncWrite traits didn't implement for WebSocketStream. Is there any plan for this?
futures-rs just switched and tokio has been using the -lite
one since 0.2.0. I haven't used either so I don't actually know how hard switching is, but it seems like a good idea since it greatly improves compile times for people who don't depend on syn & quote otherwise.
My projects are waiting for this release so I can upgrade to tokio 0.3. the release is already in the master branch. Is there anything holding back the release? If not I would appreciate if 0.10.0 could be published on crates.io. ๐
The latest release (0.4.0) seems to error when you attempt to create a websocket
You can reproduce using:
use async_std::task;
use async_std::net::{TcpListener, TcpStream, SocketAddr};
fn main () {
task::block_on(server());
}
async fn server() {
let server = TcpListener::bind("127.0.0.1:3012").await.unwrap();
while let Ok((stream, addr)) = server.accept().await {
task::spawn(handle_connection(stream, addr));
}
}
async fn handle_connection(raw_stream: TcpStream, _addr: SocketAddr) {
println!("{:?}", _addr);
async_tungstenite::accept_async(raw_stream).await.unwrap();
}
Then create a websocket using new WebSocket("ws://localhost:3012")
.
Weirdly enough, it's still possible to create a websocket connection using websocat
Hello,
I have a small program serving long-lasting websockets, I'd like to cleanly close those when my program exits.
Right now, I just drop the stream and it causes the client to exit with Error 1006/EOF.
How would you suggest handling that?
It's not the end of the world if it remains that way, the server isn't exposed to the public, I'm just curious about what would be good practice
Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.