Giter Site home page Giter Site logo

gbaranski / ezsockets Goto Github PK

View Code? Open in Web Editor NEW
201.0 4.0 10.0 329 KB

High-level declarative API for building WebSocket Clients and Servers in Rust ๐Ÿฆ€

Home Page: https://docs.rs/ezsockets

License: MIT License

Rust 100.00%
websocket rust hacktoberfest async websocket-server websockets

ezsockets's Introduction

ezsockets

Creating a WebSocket server or a client in Rust can be troublesome. This crate facilitates this process by providing:

  • Traits to allow declarative and event-based programming.
  • Easy concurrency with Tokio and async/await. Server sessions are Clone'able and can be shared between tasks.
  • Heartbeat mechanism to keep the connection alive.
  • Automatic reconnection of WebSocket Clients.
  • Support for arbitrary client back-ends, with built-in native and WASM client connectors.
  • Support for multiple server back-ends such as Axum or Tungstenite.
  • TLS support for servers with rustls and native-tls.

Documentation

View the full documentation at docs.rs/ezsockets

Examples

  • simple-client - a simplest WebSocket client which uses stdin as input.
  • echo-server - server that echoes back every message it receives.
  • echo-server - same as echo-server, but with native-tls.
  • counter-server - server that increments global value every second and shares it with client
  • chat-client - chat client for chat-server and chat-server-axum examples
  • wasm-client - chat client for chat-server and chat-server-axum examples that runs in the browser (only listens to the chat)
  • chat-server - chat server with support of rooms
  • chat-server-axum - same as above, but using axum as a back-end

Client

By default clients use tokio-tungstenite under the hood. Disable default features and enable wasm_client to run clients on WASM targets.

See examples/simple-client for a simple usage and docs.rs/ezsockets/server for documentation.

Server

WebSocket server can use one of the supported back-ends:

  • tokio-tungstenite - the simplest way to get started.
  • axum - ergonomic and modular web framework built with Tokio, Tower, and Hyper
  • actix-web - Work in progress at #22

See examples/echo-server for a simple usage and docs.rs/ezsockets/server for documentation.

License

Licensed under MIT.

Contact

Reach me out on Discord gbaranski#5119, or mail me at [email protected].

ezsockets's People

Contributors

gbaranski avatar jakkusakura avatar stanbar avatar stymaar avatar ukoehb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

ezsockets's Issues

Client should reply to ping with pong containing the same data as the ping

Problem

The Websocket spec states that:

"a Pong frame sent in response to a Ping frame must have identical 'Application data' as found in the message body of the Ping frame being replied to." - source.

This means every ping received by the client should result in a pong reply with the same bytes as the received ping.

I don't see anywhere in the codebase where this is implemented.

Potential solution

I am happy to fix this and make a PR, however I'd like to know where people think the best place to handle it is.

Should the pong reply happen at the Socket level so that it is abstracted away from the client? If so, should the user be able to specify a closure to handle the payload much like they can with the heartbeat?

Or should new Message::Ping(bytes) and Message::Pong(bytes) be created and on_ping and on_pong be added to the client (with default implementations) to allow the user to decide how they want to reply?

Open to all suggestions and ideas.

Reject connections before accepting them

Currently you can only reject a connection after the websockets backend has accepted the connection. This means clients will always see 'connected' -> 'disconnected' events even if their connection request was rejected by your custom ServerExt.

There should be an additional step in the connection-acceptance protocol for pre-validating requests before the websockets backend can accept the connection.

It should be possible to move that step into the axum Upgrade::from_request() method and tungstenite Acceptor .. callback closure, although it's not clear the optimal/correct approach since server actors are async from their servers (there is a risk of introducing race conditions between pre-validating requests and registering connections in the ServerExt, e.g. if you reject duplicate connections).

Question: can this be used to send json files back and forth?

Iโ€™m very new to Rust (i just recently finished my Hello World program) but i am looking for a basic server/client system that can send JSON files back and forth for compliance checks of endpoints.

basicly the server will send the client a list of checks to perform (atm registry checks since thats what im learning) which the client performs and then sends back the results. The server will then store those results in a (postgresql) database where a web client then can grab the results and build piecharts of it.

Something like that :)

Race conditions

The calls to .unwrap() in the client, server, and session APIs all race with the dropping of the corresponding actors. In the session code, for example, you have PANIC_MESSAGE_UNHANDLED_CLOSE, but it is not possible to race-free test liveness before invoking one of those methods without some kind of lock (which would be hard to implement). Instead, I recommend returning Results from all of those methods. Websocket servers and clients should never panic, because that is an attack vector for DOS.

EDiT: this problem is exacerbated by the inability to call std::panic::catch_unwind() on any of the objects in this crate, since they are not 'unwind safe'.

Reconnect related with Client part (continuously update)

Looks like if first connection failed, then the websocket client will not perform reconnect. Is this by design?
If I want to reconnect, what should I do to perform pretty?

Secondly, looks like websocket client does not export reconnect interval?

use async_trait::async_trait;
use ezsockets::ClientConfig;
use std::io::BufRead;
use url::Url;

struct Client {}

#[async_trait]
impl ezsockets::ClientExt for Client {
    type Params = ();

    async fn text(&mut self, text: String) -> Result<(), ezsockets::Error> {
        println!("received message: {text}");
        Ok(())
    }

    async fn binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
        println!("received bytes: {bytes:?}");
        Ok(())
    }

    async fn call(&mut self, params: Self::Params) -> Result<(), ezsockets::Error> {
        let () = params;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let url = Url::parse("ws://localhost:8080").unwrap();
    let config = ClientConfig::new(url);
    let (handle, future) = ezsockets::connect(|_client| Client {}, config).await;
    tokio::spawn(async move {
        match future.await {
            Ok(_) => {
                println!("OK.")
            },
            Err(_) => {
                // First connection failed.
                // What should I do to perform reconnect?
                println!("Connection failed.")
            },
        }
        // future.await.unwrap();
    });
    let stdin = std::io::stdin();
    let lines = stdin.lock().lines();
    for line in lines {
        let line = line.unwrap();
        println!("sending {line}");
        handle.text(line);
    }
}

random panic if client disconnects non gracefully

thread 'tokio-runtime-worker' panicked at 'called Result::unwrap() on an Err value: SendError

session.rs:99:33

    /// Calls a method on the session
    pub async fn call(&self, params: P) {
        self.calls.send(params).unwrap();
    }

I assume that .unwrap()'s in many places can cause panics which makes websocket server not able to handle further connections.

Close bug

After fixing the handshake protocol error with PR #64, I am seeing another bug when closing clients. If I close a session from either the server OR the client interface, I get the following trace:

2023-07-31T16:58:13.956716Z TRACE ezsockets::socket: sending message: Close(Some(CloseFrame { code: Normal, reason: "test" }))
2023-07-31T16:58:13.956955Z TRACE ezsockets::socket: received message: Ok(Close(Some(CloseFrame { code: Normal, reason: "test" })))
2023-07-31T16:58:13.957047Z TRACE ezsockets::socket: received message: Ok(Close(Some(CloseFrame { code: Normal, reason: "test" })))
2023-07-31T16:58:13.957062Z ERROR ezsockets::socket: failed to send message. stream is closed

The last two messages are unexpected. I have no idea why this is happening, so I hope you can investigate and fix this. There should be a valid session-closing workflow for both server and client that causes no errors in the log.

Confusing API

I had trouble understanding this crate's API because a lot of the names overlap with each other. After lots of reading the source code...

Suggestions:

  • SessionExt -> SessionHandler
  • ClientExt -> ClientHandler
  • ServerExt -> ConnectionHandler
  • SessionExt::on_text()/on_binary()/on_call() -> SessionHandler::on_client_text()/on_client_binary()/on_session_call()
  • ClientExt::on_text()/on_binary()/on_call() -> ClientHandler::on_server_text()/on_server_binary()/on_client_call()
  • ClientExt::on_close() -> ClientHandler::on_server_close()
  • ServerExt::on_call() -> ConnectionHandler::on_server_call() (can also do on_client_connect()/on_client_disconnect() for consistency)
  • ServerExt::Session -> ConnectionHandler::SessionHandler (this the worst offender here, since ServerExt::on_connect() returns an ezsockets::Session NOT a ServerExt::Session)
  • Remove Args from SessionExt and just use a generic parameter on ServerExt::on_connect() (it is very not-obvious why SessionExt has Args)

Reconnect attempt is delayed

If a client wants to reconnect, it first waits for a reconnect interval to pass. Is there a specific reason it needs to be this way? I'd rather try to reconnect immediately, then if that fails wait for the reconnect interval to pass.

It was moved to the start of the loop in this commit without explanation.

Wss support

Is WSS supported for client and server? And how would a simple example look?

Ability to parse headers and query parameter

In my usage, I extract jwt from query parameter and extract real ip address from X-forwarded-For.

But there's no way to do it now. I'd like to implement these method and raise a PR later

Configurable SocketConfig for servers

Adding configurable socket through SocketConfig could allow users to change the default heartbeat duration, timeout and message. It is already implemented on the client side.

For tungstenite:

async fn accept(
&self,
stream: TcpStream,
handle: &enfync::builtin::native::TokioHandle,
) -> Result<(Socket, Request), Error> {
let mut req0 = None;
let callback = |req: &http::Request<()>,
resp: http::Response<()>|
-> Result<http::Response<()>, ErrorResponse> {
let mut req1 = Request::builder()
.method(req.method().clone())
.uri(req.uri().clone())
.version(req.version());
for (k, v) in req.headers() {
req1 = req1.header(k, v);
}
let Ok(body) = req1.body(()) else {
return Err(ErrorResponse::default());
};
req0 = Some(body);
Ok(resp)
};
let socket = match self {
Acceptor::Plain => {
let socket = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
Socket::new(socket, SocketConfig::default(), handle.clone())
}
#[cfg(feature = "native-tls")]
Acceptor::NativeTls(acceptor) => {
let tls_stream = acceptor.accept(stream).await?;
let socket = tokio_tungstenite::accept_hdr_async(tls_stream, callback).await?;
Socket::new(socket, SocketConfig::default(), handle.clone())
}
#[cfg(feature = "rustls")]
Acceptor::Rustls(acceptor) => {
let tls_stream = acceptor.accept(stream).await?;
let socket = tokio_tungstenite::accept_hdr_async(tls_stream, callback).await?;
Socket::new(socket, SocketConfig::default(), handle.clone())
}
};
let Some(req_body) = req0 else {
return Err("invalid request body".into());
};
Ok((socket, req_body))
}
}

and for Axum, on_upgrade_with_config() function is not used anywhere.

pub fn on_upgrade_with_config<E: ServerExt + 'static>(
self,
server: Server<E>,
socket_config: SocketConfig,
) -> Response {
self.ws.on_upgrade(move |socket| async move {
let handle = enfync::builtin::native::TokioHandle::try_adopt()
.expect("axum server runner only works in a tokio runtime");
let socket = Socket::new(socket, socket_config, handle);
server.accept(socket, self.request, self.address);
})
}

It comes down into three options:

  • Configuration through Server::create(), global options for all incoming sessions.
  • Configuration through Session::create(), but it could be a little bit tricky, as Socket is already created when the Session::create() is called.
  • Configuration through mentioned above on_upgrade_with_config() in case of Axum. It would work only for Axum

First option doesn't seem bad? Unless you want to have different configuration for each of the clients connected to your server, but is it actually necessary?

Messages are unbounded size

Messages store buffers with unbounded size. Since messages will be logged at log level trace, log files have potentially unbounded size even with only one server/client pair. I'm not sure if the websockets backend has size constraints, but if not it may be nice to add them in ezsockets.

Client messages can silently fail

If you send a message into a client, it can silently fail if the underlying connection is broken while trying to send the message. Normally when working with a raw websocket API, the send method will return an error if sending failed. However, ezsockets defers sending to the internal async mechanisms, so success/failure of a message is hidden from the API.

One possible solution is for the binary/text methods to return a handle to a oneshot channel which will contain the eventual result of sending (i.e. Result<(), ezsockets::SendError> with a new SendError enum). Then if the channel breaks or returns Err, you will know that a send message fails. Doing it this way affects perf, obviously, but would be a reliable mechanism for reporting async success/failure.

Disabling logs?

Is there a way to disable the default logs like

2024-03-13T15:49:57.809170Z INFO ezsockets::server: starting websocket server

I'm trying to create a server based on the echo-server example and it's working great but I want to print my own status messages if possible.

get_args is useless

run_on() takes get_args: impl Fn(&mut Socket) -> GetArgsFut, but Socket is just a pair of channels. It's not possible to do anything useful with get_args, even in demos

Client in browser

It would great if ezsockets::Client could be used in the browser. There are a few things blocking that scenario:

  1. Ping/Pong are not supported in the browser.

See #76, #77.

  1. tokio is not supported in the browser.

I wrote a crate enfync that supports arbitrary async backends (including tokio and WASM).

  1. tokio-tungstenite is not supported in the browser (can't do tokio_tungstenite::connect()).

There is a nice crate tokio-tungstenite-wasm that provides a WASM client connector with a very similar interface to tokio_tungstenite::connect() (actually it uses tokio-tungstenite for the native backend, but we don't want that since his crate doesn't emit Ping/Pong messages).

I will write an experimental implementation using the above solutions, and follow up here.

Connection synchronization

When you use tokio-tungstenite to run a server, accepting a new connection blocks the acceptor loop until the websocket handshake is complete. This opens a DOS vector where malicious clients can prevent other clients from connecting by hogging the acceptor loop.

The solution here seems to be spawning a new tokio task to finish accepting a connection after the TcpListener connects.

Client on_close() should get reason from server

ClientExt::on_close() should take the closure frame as an input, so implementers can customize what on_close() does based on the closure reason. For example, the client may not want to reconnect if the server commanded it to close (e.g. because the client was banned, which means future reconnect attempts will be rejected).

Question: How to talk to one of the clients/sessions from an external tokio:task?

As I see, the session hashmap is encapsulated in the Server struct. I would also need to send requests to the clients from the server side(and wait for the response from the client) based on a channel event from another tokio task(serialport). So I guess it is either

  • make the session hashmap global, covering it with arc-mutex, but the server doesn't accept the mutex type
  • somehow pass a channel to the server and serialport task so they can communicate, is this possible?

maybe you could point me to another way, how to communicate with the server instance, or would I need to write something custom myself?

Connection callbacks

I need to respond to connection drops on the client, can you add reconnect() and close() callback handlers to ClientExt? I could make a PR if you're busy.

Custom Ping/Pong handling

There is no customization of the Ping/Pong messages sent/read to/from the server.
The assumption is that the server is always sending timestamps in milliseconds, but this is not always the case.

A common scenario for a WSS server is to send custom message (e.g. with ping UUID) and expect that same message in pong response. Not necessarily a timestamp.

Could we add a mechanism to allow the user to provide some logic for the Ping/Pong messaging process?

The way I see it, we could add an enum HeartbeatMode in Config, with multiple choice like HeartbeatMode::TimeStamp or HeartbeatMode::Custom("always-send-this-string") or HeartbeatMode::None (disable heartbeat logic) or HeartbeatMode::Random...

There could be better ways to implement this customization, but I believe adding this feature would benefit the lib.

Thanks

Early returning from on_connect results in panic

Hi there, great work on this crate
I'm using an on_connect implementation such as the following to have the server kill a connection if it doesn't pass some checks (specifically, using axum to extract headers and validate data passed within them)

async fn on_connect(&mut self, socket: Socket, address: SocketAddr, args: <Self::Session as SessionExt>::Args) -> Result<ezsockets::Session<<Self::Session as SessionExt>::ID, <Self::Session as SessionExt>::Call>, Error> {
        info!("New connection from {}, {:?}", &address, &args);
        return Err(anyhow!("Not implemented!").into());
}

which results in the following panic (removed full backtrace, will attach if relevant)

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: ()', /Users/roee/.cargo/registry/src/github.com-1ecc6299db9ec823/ezsockets-0.5.1/src/server.rs:248:14

which leads me to the following snippet in server.rs

pub async fn accept(
        &self,
        socket: Socket,
        address: SocketAddr,
        args: <E::Session as SessionExt>::Args,
    ) -> <E::Session as SessionExt>::ID {
        let (sender, receiver) = oneshot::channel();
        self.connections
            .send(NewConnection {
                socket,
                address,
                args,
                respond_to: sender,
            })
            .map_err(|_| ())
            .unwrap();
        receiver.await.unwrap()
    }

A potential solution would be to eliminate the unwrap from both expressions, or modify the return type of the 'accept' function to an enumeration that includes either a session or an exit code.

Ping/Pong are unavailable in browser

Ping/Pong are not available in all browsers, and if they are it always happens in the background. The canonical web_sys::WebSocket implementation offers no Ping/Pong functionality.

This is a problem because I want to enable browser-based websocket clients in ezsockets (e.g. this is the best contender), but ezsockets depends on Ping/Pong for the heartbeat. (I want to abstract out the client's tokio_tungstenite::connect_async() call.)

To get around this, I want to:
A) Move the Socket's last_alive to always update when receiving a message, instead of only when receiving a Pong.
B) In my downstream project, use Message::Text to manually implement a Ping/Pong keep-alive when compiling to WASM.
C) Disable the keep-alive mechanism entirely when compiling to WASM (do this in addition to A, since servers won't compile to WASM and still need the keep-alive mechanism).

Alternatively, we could inject the keep-alive Ping/Pong into Message::Text in ezsockets (pretty invasive).

Handle custom behaviour when disconnected

When my client gets disconnected, I'd like to do some custom action (like print a warning) as well as attempting to reconnect. Right now the only thing I can customize is how long it waits to reconnect, right?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.