Giter Site home page Giter Site logo

lemunozm / message-io Goto Github PK

View Code? Open in Web Editor NEW
1.0K 15.0 67.0 707 KB

Fast and easy-to-use event-driven network library.

License: Apache License 2.0

Rust 100.00%
network network-programming message-queue asynchronous event-driven events event-manager tcp-server udp-server actor-model

message-io's Introduction

message-io is a fast and easy-to-use event-driven network library. The library handles the OS socket internally and offers a simple event message API to the user. It also allows you to make an adapter for your own transport protocol following some rules, delegating the tedious asynchrony and thread management to the library.

If you find a problem using the library or you have an idea to improve it, do not hesitate to open an issue. Any contribution is welcome! And remember: more caffeine, more productive!

Motivation

Managing sockets is hard because you need to fight with threads, concurrency, full duplex, encoding, IO errors that come from the OS (which are really difficult to understand in some situations), etc. If you make use of non-blocking sockets, it adds a new layer of complexity: synchronize the events that come asynchronously from the Operating System.

message-io offers an easy way to deal with all these aforementioned problems, making them transparent for you, the programmer that wants to make an application with its own problems. For that, the library gives you a simple API with two concepts to understand: messages (the data you send and receive), and endpoints (the recipients of that data). This abstraction also offers the possibility to use the same API independently of the transport protocol used. You could change the transport of your application in literally one line.

Features

  • Highly scalable: non-blocking sockets that allow for the management of thousands of active connections.
  • Multiplatform: see mio platform support.
  • Multiple transport protocols (docs):
    • TCP: stream and framed mode (to deal with messages instead of stream)
    • UDP, with multicast option
    • WebSocket: plain and secure#102 option using tungstenite-rs (wasm is not supported but planned).
  • Custom FIFO events with timers and priority.
  • Easy, intuitive and consistent API:
    • Follows KISS principle.
    • Abstraction from transport layer: don't think about sockets, think about messages and endpoints.
    • Only two main entities to use:
      • a NodeHandler to manage all connections (connect, listen, remove, send) and signals (timers, priority).
      • a NodeListener to process all signals and events from the network.
    • Forget concurrency problems: handle all connection and listeners from one thread: "One thread to rule them all".
    • Easy error handling: do not deal with dark internal std::io::Error when sending/receiving from the network.
  • High performance (see the benchmarks):
    • Write/read messages with zero-copy. You write and read directly from the internal OS socket buffer without any copy in the middle by the library.
    • Full duplex: simultaneous reading/writing operations over the same internal OS socket.
  • Customizable: message-io doesn't have the transport you need? Easily add an adapter.

Documentation

Getting started

Add to your Cargo.toml (all transports included by default):

[dependencies]
message-io = "0.18"

If you only want to use a subset of the available transport battery, you can select them by their associated features tcp, udp, and websocket. For example, in order to include only TCP and UDP, add to your Cargo.toml:

[dependencies]
message-io = { version = "0.18", default-features = false, features = ["tcp", "udp"] }

All in one: TCP, UDP and WebSocket echo server

The following example is the simplest server that reads messages from the clients and responds to them with the same message. It is able to offer the "service" for 3 differents protocols at the same time.

use message_io::node::{self};
use message_io::network::{NetEvent, Transport};

fn main() {
    // Create a node, the main message-io entity. It is divided in 2 parts:
    // The 'handler', used to make actions (connect, send messages, signals, stop the node...)
    // The 'listener', used to read events from the network or signals.
    let (handler, listener) = node::split::<()>();

    // Listen for TCP, UDP and WebSocket messages at the same time.
    handler.network().listen(Transport::FramedTcp, "0.0.0.0:3042").unwrap();
    handler.network().listen(Transport::Udp, "0.0.0.0:3043").unwrap();
    handler.network().listen(Transport::Ws, "0.0.0.0:3044").unwrap();

    // Read incoming network events.
    listener.for_each(move |event| match event.network() {
        NetEvent::Connected(_, _) => unreachable!(), // Used for explicit connections.
        NetEvent::Accepted(_endpoint, _listener) => println!("Client connected"), // Tcp or Ws
        NetEvent::Message(endpoint, data) => {
            println!("Received: {}", String::from_utf8_lossy(data));
            handler.network().send(endpoint, data);
        },
        NetEvent::Disconnected(_endpoint) => println!("Client disconnected"), //Tcp or Ws
    });
}

Echo client

The following example shows a client that can connect to the previous server. It sends a message each second to the server and listen its echo response. Changing the Transport::FramedTcp to Udp or Ws will change the underlying transport used.

use message_io::node::{self, NodeEvent};
use message_io::network::{NetEvent, Transport};
use std::time::Duration;

enum Signal {
    Greet,
    // Any other app event here.
}

fn main() {
    let (handler, listener) = node::split();

    // You can change the transport to Udp or Ws (WebSocket).
    let (server, _) = handler.network().connect(Transport::FramedTcp, "127.0.0.1:3042").unwrap();

    listener.for_each(move |event| match event {
        NodeEvent::Network(net_event) => match net_event {
            NetEvent::Connected(_endpoint, _ok) => handler.signals().send(Signal::Greet),
            NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
            NetEvent::Message(_endpoint, data) => {
                println!("Received: {}", String::from_utf8_lossy(data));
            },
            NetEvent::Disconnected(_endpoint) => (),
        }
        NodeEvent::Signal(signal) => match signal {
            Signal::Greet => { // computed every second
                handler.network().send(server, "Hello server!".as_bytes());
                handler.signals().send_with_timer(Signal::Greet, Duration::from_secs(1));
            }
        }
    });
}

Test it yourself!

Clone the repository and test the Ping Pong example (similar to the README example but more vitaminized).

Run the server:

cargo run --example ping-pong server tcp 3456

Run the client:

cargo run --example ping-pong client tcp 127.0.0.1:3456

You can play with it by changing the transport, running several clients, disconnecting them, etc. See more here.

Do you need a transport protocol that message-io doesn't have? Add an adapter!

message-io offers two kinds of API. The user API that talks to message-io itself as a user of the library, and the internal adapter API for those who want to add their protocol adapters into the library.

If a transport protocol can be built in top of mio (most of the existing protocol libraries can), then you can add it to message-io really easily:

  1. Add your adapter file in src/adapters/<my-transport-protocol>.rs that implements the traits that you find here. It contains only 8 mandatory functions to implement (see the template), and it takes arround 150 lines to implement an adapter.

  2. Add a new field in the Transport enum found in src/network/transport.rs to register your new adapter.

That's all. You can use your new transport with the message-io API like any other.

Oops! one more step: make a Pull Request so everyone can use it :)

Open source projects using message-io

  • Termchat Terminal chat through the LAN with video streaming and file transfer.
  • Egregoria Contemplative society simulation.
  • Project-Midas Distributed network based parallel computing system.
  • AsciiArena Terminal multiplayer death match game (alpha).
  • LanChat LanChat flutter + rust demo.

Does your awesome project use message-io? Make a Pull Request and add it to the list!

Is message-io for me?

message-io has the main goal to keep things simple. This is great, but sometimes this point of view could make more complex the already complex things.

For instance, message-io allows handling asynchronous network events without using an async/await pattern. It reduces the complexity to handle income messages from the network, which is great. Nevertheless, the applications that read asynchronous messages tend to perform asynchronous tasks over these events too. This asynchronous inheritance can easily be propagated to your entire application being difficult to maintain or scale without an async/await pattern. In those cases, maybe tokio could be a better option. You need to deal with more low-level network stuff but you gain in organization and thread/resource management.

A similar issue can happen regarding the node usage of message-io. Because a node can be used independently as a client/server or both, you can easily start to make peer to peer applications. In fact, this is one of the intentions of message-io. Nevertheless, if your goal scales, will appear problems related to this patter to deal with, and libraries such as libp2p come with a huge battery of tools to help to archive that goal.

Of course, this is not a disclaiming about the library usage (I use it!), it is more about being honest about its capabilities, and to guide you to the right tool depending on what are you looking for.

To summarize:

  • If you have a medium complex network problem: make it simpler with message-io!
  • If you have a really complex network problem: use tokio, libp2p or others, to have more control over it.

message-io's People

Contributors

garin1000 avatar iamazy avatar jhg avatar kgraefe avatar kination avatar laiot avatar lemunozm avatar protowalker avatar robdavenport avatar sigmasd avatar tatref avatar uriopass avatar vars1ty avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

message-io's Issues

Future Enhancements / WIP / Roadmap?

As I mentioned in my previous issue, I'm really enjoying this crate.

I'd like to more about what upcoming enhancements or new features are on the way, and maybe contribute where I can. What is currently in development for the near future? Would it be possible to get these listed as issues on this board for others to contribute and discuss?

Game server

Hi,

I am looking into creating a game server for my classical RPG project at Eldiron.com.

Question: From your perspective what would be a good design implementation to have one thread on the server communicate with the clients and another thread performing the game ticks on the characters ? Is there some kind of best practice for this ?

Thanks!

cross compile for websocket

if i want to use ws,but do not open ssl,how to do it?
em....i mean,cross compile openssl from windows or mac to linux is so troublesome.so maybe there is a way use ws but not use wss?
for now,when i cross compile from mac to linux,it's always occur error,the errors are about openssl,if i open Ws feature.
if i mistake something,pls tell me.
by the way,thanks for your work,message-io is great!

Encryption

Are you planning on adding encrypted connections at some point?

Improving `Decoder` performance using `Read` trait.

The Decoder is used by the FramedTcp transport to transform a stream-based protocol (TCP) into a packet-based protocol that fits really well with the concept of message.

The Decoder collects data from the stream until it can be considered a message. In that process, each chunk of data received from the network is written in a temporal buffer. If that data is not yet a message, then, de Decoder copies from that buffer to its internal buffer in order to wait for more chunks.

This last copy can be avoided if we are able to read directly into the decoder. To get this, the decoder could expose its buffer in order to allow the stream.read() dumping its data directly into the decoder, or even better, the Decoder can receive a Read trait object (that would be the socket) from which extract the data. Something similar to:

Decoder::decode_from(&self mut, reader: &dyn Read, impl decoded_callback: impl FnMut(&[u8]) -> Result<()>

Note that since it works in a non-blocking way, several calls to read must be performed inside this function until receiving a WouldBlock io error.

non-blocking connections

Currently, the connect() method performs a connection in a blocking way. For many basic use cases, this is even preferred to an async connection, since it allows you to configure the returned Endpoint easily before listening to any event.

Currently, the blocking way:

let (node, listener) = node::split();
// This function waits until the connection is finished
let (endpoint, local_addr) = node.connect(Transport::FramedTcp, "123.4.5.6:1234")?; 
// The user could decide some logic in base of the connection result before call listener.for_each
listener.for_each(|event| match event.network() {
    NetEvent::Connected(endpoint, listener_id) => { /* */ }
    NetEvent::Message(endpoint, data) => { /* */ }
    NetEvent::Disconnected(endpoint) => { /* */ }
});

Nevertheless, the user could not want to wait for that connection and do other stuff in the meantime. In this case, we need an asynchronous connection. This also implies a new NetEvent to notify that the connection has been established or not. The propossed method:

let (node, listener) = node::split();
// This function NOT wait until the connection is finished
let local_addr = node.connect(Transport::FramedTcp, "123.4.5.6:1234")?; //Now works asynchonously
listener.for_each(|event| match event.network() {
    NetEvent::Connected(endpoint, established) => { /* connection succesful or not*/ }
    NetEvent::Accepted(endpoint, listener_id) => { /* used for listenings */ }
    NetEvent::Message(endpoint, data) => { /* */ }
    NetEvent::Disconnected(endpoint) => { /* */ }
});

The proposed way will split the NetEvent::Connected into Connected and Accepted. This is due to two reasons:

  1. The Accepted event (that is the previous Connected) contains the listener_id of the listen that accepted that connection, which does not exist when the user explicitly creates the connection. Both events offer different things.
  2. Adds clarity to the reader and the API is more symmetric: Connected for connect, Accepted for listen

Pending task after CTRL+C

In main I start two tasks via tokio. One is a graphql server and the other is a service running
node_listener.for_each(... to handle messages

    // Create and start discovery service
    // thread::spawn(move || {
    let t1 = tokio::task::spawn_blocking(move || {
        debug!("Creating Discovery Service");
        let dsserver = DiscoveryServer::new();
        if let Ok(discovery) = dsserver {
            debug!("Starting discovery service");
            let _ = discovery.run();
        }
    });

    tokio::join!(async move {
        let _ = graphql_server.run().await;
    });

When I want to terminate, the discovery service stays and can only be killed brutally sudo kill -9 PID
Is there a clean way to structure the code such that it can terminate gently?

Close connection

Let's say a client sends a bogus packet, or some error occurs, or the server wants to ban or rate-limit a client, would it be possible to add a way to close an endpoint? That is, cut the connection. The API could look like Network::close(Endpoint).
The NetEvent::Disconnected event would be sent to the server, and if a new connection attempt is done, the NetEvent::Connected would be sent again.

How to use endpoint

Hi,
I have a question , most likely is a simple/silly one but I'm new in Rust so I hope you can answer me:

I run the ping-pong example and it work. But in order to use it in my project I need to access to client/server endpoint from outside listener loop because I want to keep just one connection all the time between client and server.

Outside the ping-pong example I have my project (I use a game engine which use winit as crate) where I have my UI with a button that I made in order to send a message when I click on it. I was going to use handler.network().send(server_id, &output_data); But I have no access to the server_id / endpoint because the point where my button trigger is inside at winit crate loop (aka UI).

Because the UI loop consume(move) the data - it use a closure - it give an error when I try to save endpoint in a external struct because Endpoint doesn't have Clone trait.

But I'm not confident to make changes yet in your code :).

So my question is: which solution/option do you suggest? I tried my best to explain my problem but tell me if you need more info.

Compiling message-io with just `websocket` feature breaks.

Error message:

error[E0433]: failed to resolve: could not find `tcp` in `super`
   --> /home/redact/.cargo/registry/src/github.com-1ecc6299db9ec823/message-io-0.14.1/src/adapters/ws.rs:183:45
    |
183 |                     let tcp_status = super::tcp::check_stream_ready(&stream.0);
    |                                             ^^^ could not find `tcp` in `super`

This might be intended, but in that case there should be websocket = ["tcp"]. It also doesn't make sense for the web either. Maybe have two different websocket features? websocket and websocket-web.

Add a SMTP adapter

Add an adapter to send mails easily through message-io as if they were messages.

Things into consideration:

  • The messages should be sent in text. What happens if somebody sends binary data?
  • At connection phase, how to pass the remittent and server info? Encoded by RemoteAddr::Str or by #54
  • How to receive messages (based on mio poll)? The internal Waker should be exposed to the adapter API?

Node API: zero-copy write/read message

Problem

To read a message, you use:

let (mut network, mut events) = Network::split();
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
loop {
    match events.receive() {
        NetEvent::Message(endpoint, data) => { /* data has been copied here */ },
        _ => (),
    }
}

Although the current API is quite simple, it has a drawback: in order to pass messages into the EventQueue, you need to perform a copy of that message. This is why the signature of the NetEvent::Message<Endpoint, Vec<u8>> has an allocated vector instead of a reference like &[u8]. This copy is necessary because once you send data into the queue, the lifetime of the referenced data is lost. The internal socket buffer can be overwritten with a new incoming message before you read the previous one.

To avoid this issue you can avoid sending the data into EventQueue in order to process the message directly from the AdapterEvent which signature reference the internal input buffer: AdapterEvent::Data(Endpoint, &[u8]). You can archieve this using the Network::new() constructor:

let mut network = Network::new(|adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { /* data direclty from adapter*/ },
        _ => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

Although this works with the desired performance, it reduces the API usage, for example:

  • How I can send a message just after read it?. I can not access to the Network instance in its own callback, so I need to push this "action" as an event and send it into an EventQueue to read it out of the callback, in the EventQueue loop, and then call Network::send() properly.
  • If I want to send some signal based on the message read, I need an EventQueue.

These problems forced you to divide your application logic, offuscating the code: some events will be processed in the Network callback and other events will be processed in the EventQueue loop:

let events = EventQueue::new();
let sender = events.sender().clone();
let mut network = Network::new(move |adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { 
           // data directly from adapter
          let response = process_data(data);
          // Here I can not send by the network, I need to perform this action out of the callback.
          sender.send(UserEvent::SendResponse(endpoint, response));
        },
        _ => (),
    }
});

network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

loop {
    match events.receive() {
        UserEvent::SendResponse(endpoint, response) => { 
              network.send(endpoint, response);
        },
        _ => (),
    }
}

Solution

To solve this problem, and allow the user to process all their events only in the callback, it is needed some additions:

  • Add network in the own Network callback.
  • Add to the network the possibility to react to timer signals, to avoid completely the use of an EventQueue.
  • Allow natively two types of events, network events, and custom signal events.

Example 1

Signals as part of the network.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Network::new(|network, event|{
   match event { 
        NetEvent::Data(endpoint, data) => { 
              // data direclty from adapter
              network.send(endpoint, data);
              network.self_signal(UserSignal::Tick(1), Duration::from_millis(50));
        },
       NetEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSignal::Close => network.stop(), // The callback never be called again.
        }
       NetEvent::Connected(..) => (),
       NetEvent::Disconnected(..) => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
network.self_signal(Close, Duration::from_secs(3));
network.wait_to_close();
// You still can make any network call and send signals outside the callback.

Example 2

Node concept: the node, contains network, signals and handles the internal thread . The node can be used inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Node::new(|node, event| {
   match event { 
        Event::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        Event::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
node.await(); //waiting until the node be stoped.
// You still can make any network call and send signals outside the callback.
// ... 
let node = Node::new(...);
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
std::thread::sleep(Duration::from_secs(3));
node.stop();

Example 3 candidate

Split node into a NodeHandler and a NodeListener. The handler manages the network, signals and can stop the internal thread. The NodeListener dispatch received events. The NodeHandler can be used both, inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let (handler, listener) = Node::split();
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
let node_task = listener.fo_each(|event| {
   match event { 
        NodeEvent::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        NodeEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
// You can still make any network call and send signals outside the callback.
node.network.listen(Transport::Udp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Tick, Duration::from_secs(1));
drop(node_task); //waits until node.stop() be called.

why do not support for x84_64-unknown-linux-musl?

i really like this crate,it's easy to understand and use,but why do not support for x84_64-unknown-linux-musl?
i mean,the cross compile for compile x84_64-unknown-linux-musl is easy than x84_64-unknown-linux-gnu right?
i can not find a good way to easy cross compile to x84_64-unknown-linux-gnu.
if u know,could u pls tell me?oh,by the way,i am coding on mac.

Player-hosted by p2p

Can message-io help facilitate peer-to-peer connections?

I would eventually want the default for my game to be an authoritative server, but in the start it would work just fine with p2p. I trust the people I’m play testing with to not cheat, so none of that needs to be mitigated. The most important thing is just to be able to set up hosted games easily and cost-efficiently.

Encoding with less bytes.

Currently, the FramedTcp adapter reaches its target of "transform" TCP from stream-based to packet-based adding an offset of 4 bytes before the packet to determine its size using the encoding module (this module could be used for other adapters but currently is only used for FramedTcp).

For most cases, 4 bytes is too many bytes (most of the messages could use 1 or 2 bytes). Fortunately, bincode has an option to make variadic int encoding. This should be relatively easy to implement.

EDIT: Other cool library to get this functionality instead bincode: integer_encoding

32-bit OS

message-io doesn't seem to work on 32-bit systems. Some of the constants in resource_id.rs (namely RESOURCE_TYPE_BIT and ADAPTER_ID_MASK_OVER_ID) overflow in 32-bit systems.

Perhaps the constants can be updated to make use of std::mem::size_of, or the type changed from usize to u64?

Adapter with configuration properties by connection/listening

Add an easy way to pass configuration properties to the adapter when you perform a connect()/listen():
You could want to configure a specific connection with some extra properties.

Currently:

network.connect(transport, addr);
network.listen(transport, addr);

Expected:

network.connect(transport, addr, <config info to the adapter>)
network.listen(transport, addr, <config info to the adapter>)

Things to consider:

  • Avoid that the user could mix configurations from one adapter with a transport that doesn't belong. Is it possible to maintain this safety at compile time?
  • The adapter implementation should specify a default configuration to configure the adapter in case of no configuration.

Modify thread name

I would like the ability to provide an alternate thread name for the threads in for_each() and for_each_async(). If this is a change you are open to incorporating, I can make the change and put up a PR. Any interest?

Support browser client connections (if not already?)

Hello, I read through the docs but am still unsure if my use case is supported

I'm trying to connect to a message-io websocket server from a Chrome extension (i.e. the standard browser Websocket object)

Reproduction steps

  1. Setup a minimal example of message-io according to the readme example
  2. Remove TCP and UDP transports from the example
  3. Configure message-io to listen at 127.0.0.1:<port>
  4. Install this chrome extension (source here)
  5. Open the extension page
  6. Connect to 127.0.0.1:<port> in the URL field
  7. Note that "Client connected" is correctly reported by message-io in rust terminal
  8. Send a test message via the Request field

Expected outcome

  • Received: <message> should be logged in the rust terminal
  • The message-io server should ping-pong the test message back to the chrome extension
  • The response message should be shown in black in the Message Log field

image

Actual outcome

  • No messages are logged in the rust terminal
  • Test message is shown in red (indicates that it was requested by client)
  • No response message (black) is shown

image

Why is no response being sent back?

Panicked at Result unwrap bug

Reproducibility

While running the discovery server (as in the examples), if I disconnect with some peers, I get a panic from

let addr = self.stream.peer_addr().unwrap();
in message-io-0.14.4/src/adapters/framed_tcp.rs:65:56

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 107, kind: NotConnected, message: "Transport endpoint is not connected" }', /home/frag/.cargo/registry/src/github.com-1ecc6299db9ec823/message-io-0.14.4/src/adapters/framed_tcp.rs:65:56

Improve `RemoteAddr` usage

RemoteAddr is very similar to SocketAddr (a kind of extended form of this). One of the biggest points of SocketAddr is that it is really comfortable to use because to the From and ToSocketAddrs trait implementations.

The aim is to give a similar level of support to RemoteAddr using both From and ToRemoteAddr, to make it as cool as SocketAddr.

Request for try_receive() or some variant of non-blocking receive for EventQueue

Awesome create! I'm really impressed with the ergonomics and flexibility of this so far!

It would be nice to have access to a non-blocking receive function inside of EventQueue.

Reasons

  • Working within a game-loop with specific times to handle events.
  • Not wanting to deal with multiple threads, inter-thread communication with channels etc.
  • (Maybe?) faster performance than the current solution listed below due to unnecessary creation of Duration & compares against times.

Current solution:
Calling receive_timeout with zero duration, such as:
events.receive_timeout(Duration::from_nanos(0))

I considered making a PR for this myself (and probably will if it doesn't get added by anyone else), unless it's deemed unnecessary or pointless compared to the current solution. The code looked a little more involved than I thought though!

About the new node API: How do I keep control ?

I'm using message-io as part of my game's networking. Using the previous API I was doing something like this (pseudo code)

let mut game_state = GameState::new();
let mut networking = Networking::new(); // connects to the server

loop {
    let new_inputs = networking.handle_packets();
    game_state.update(new_inputs);
    draw(&game_state);
}


// handle_packets being something like this
fn handle_packets(&mut self) -> Option<Inputs> {
    while let Ok(packet) = self.network.try_receive() {
         // handle packet
    }
    self.new_inputs
}

However, it looks like using the new API I cannot do this as the for_each loop completely takes control of the thread used.

My guess is that I now have to communicate between my game state and the networking code using channels.. I liked that it was abstracted away.

Blocking on for_each_async

I'm trying to implement message-io as a tcp client within the actix framework.
This is my current implementation:

use actix::Message;
use actix::{
    Actor, ActorContext, AsyncContext, Context, Handler, Running, StreamHandler, WrapFuture,
};
use log::info;
use message_io::network::{NetEvent, Transport};
use message_io::node::{
    self, NodeEvent, NodeHandler, NodeListener, NodeTask, StoredNetEvent, StoredNodeEvent,
};

pub struct TcpClientActor {
    handler: NodeHandler<String>,
    listener: Option<NodeListener<String>>,
}

impl TcpClientActor {
    pub fn new() -> Self {
        let (handler, listener) = node::split::<String>();
        TcpClientActor {
            handler,
            listener: Some(listener),
        }
    }
}

impl Actor for TcpClientActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        let message = serde_json::json!({
            "op": "subscribe",
            "topic": "/client_count"
        });
        // Listen for TCP, UDP and WebSocket messages at the same time.
        let (server, socket_address) = self
            .handler
            .network()
            .connect(Transport::Tcp, "192.168.43.217:9092")
            .unwrap();
    }

    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
        ctx.stop();
        Running::Stop
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Listen {}

impl Handler<Listen> for TcpClientActor {
    type Result = ();

    fn handle(&mut self, msg: Listen, ctx: &mut Self::Context) -> Self::Result {
        let listener = self.listener.take().unwrap();
        listener
            .for_each_async(move |event| {
                match event {
                    NodeEvent::Network(net_event) => match net_event {
                        NetEvent::Connected(_endpoint, _ok) => {
                            info!("Connected");
                            // handler.signals().send(message.to_string());
                        }
                        NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
                        NetEvent::Message(_endpoint, data) => {
                            println!("Received: {}", String::from_utf8_lossy(data));
                        }
                        NetEvent::Disconnected(_endpoint) => (),
                    },
                    NodeEvent::Signal(signal) => match signal {
                        _ => {
                            // computed every second
                            info!("Signal Received: {}", signal);
                            // handler.signals().send_with_timer(signal, Duration::from_secs(1));
                        }
                    },
                }
            })
            .wait();
    }
}

The problem with the current implementation is that whenever the listener.for_each_async() method runs, the thread gets blocked and the entire program freezes. I though for_each_async was non-blocking.
Am I misinterpreting something here?
Thank you

Ps: If you need any information regarding the actix framework or anything else, just let me know and I'll write it down. I didn't want to overwhelm the issue with information that might not be needed.

Question about Signal usage in ping-pong example

Hi,

In the client, a Signal is sent when the connection is established

if established {
println!("Connected to server at {} by {}", server_id.addr(), transport);
println!("Client identified by local port: {}", local_addr.port());
handler.signals().send(Signal::Greet);

Then the real message is sent later:

NodeEvent::Signal(signal) => match signal {
Signal::Greet => {
let message = FromClientMessage::Ping;
let output_data = bincode::serialize(&message).unwrap();
handler.network().send(server_id, &output_data);

Is there any reason for not sending the message directly in the NetEvent::Connect event? Maybe to prevent blocking the thread while sending the message?

How to dynamic switch discovery-server?

New message-io user, thanks for this useful crate.

I don't know how to dynamic switch discovery-server if it goes down, Is there a way to do it?

Like this:

                    DS1 ---------- DS2
                      /\ 
                     /   \
                   /       \
                 /           \
     participant1 participant2 ...

If DS1 goes down, i want to all the participants switch to DS2.

Response to HTTP messages over Tcp

After setting up

handler
    .network()
    .listen(Transport::Tcp, "0.0.0.0:6443")
    .unwrap();

and then sending

Received: GET / HTTP/1.1
Host: localhost:6443
User-Agent: insomnia/2021.5.3
Accept: */*

I think I should be able to close an HTTP 1.1 request with the following message.

NetEvent::Accepted(_endpoint, _listener) => {
    println!("Client connected")

    // attempt to close HTTP connection
    handler
        .network()
        .send(endpoint, b"HTTP/1.1 200 OK\r\nConnection: Close\r\n\r\n");
}

However the connection just hangs and fails to complete.

@lemunozm do you have any recommendations for handling HTTP requests over TCP? message-io is an awesome library and I'd love to replace a current server with it, but I need to be able to process HTTP requests. Thanks in advance!

Sendin big data, or multiple small data is not working correctly

original discussion lemunozm/termchat#11

Sending multiple small data:

Sending big data:

Unable to compile via webassembly.

I get a bunch of these errors when attempting to compile via webassembly:

error[E0599]: no method named `as_sock` found for reference `&TcpStream` in the current scope
   --> /home/blah/.cargo/registry/src/github.com-1ecc6299db9ec823/net2-0.2.37/src/ext.rs:690:22
    |
690 |         get_opt(self.as_sock(), SOL_SOCKET, SO_RCVBUF).map(int2usize)
    |                      ^^^^^^^ method not found in `&TcpStream`
    |
    = help: items from traits can only be used if the trait is implemented and in scope
note: `AsSock` defines an item `as_sock`, perhaps you need to implement it

It seems like net2 does not work on wasm.

Unable to connect with wss

I can't manage to establish wss connections using message-io version 0.14.2 on macOS Big Sur. Example:

use message_io::network::{NetEvent, RemoteAddr, Transport};
use message_io::node::{self, NodeEvent};

fn main() {
    connect("ws://echo.websocket.org".to_string());
    connect("wss://echo.websocket.org".to_string());
}

fn connect(url: String) {
    let (handler, listener) = node::split::<()>();
    handler
        .network()
        .connect(Transport::Ws, RemoteAddr::Str(url.clone()))
        .unwrap();
    listener.for_each(move |event| match event {
        NodeEvent::Network(NetEvent::Connected(e, success)) => {
            println!("{} success={} {:?}", url, success, e);
            handler.stop();
        }
        _ => panic!(),
    });
}

Output:

ws://echo.websocket.org success=true Endpoint { resource_id: [3.R.0], addr: 174.129.224.73:80 }
wss://echo.websocket.org success=false Endpoint { resource_id: [3.R.0], addr: 174.129.224.73:443 }

However, I can connect with wss using websocat from the same machine:

$ echo foo | websocat wss://echo.websocket.org
foo

It looks like it's hitting this code path in the ws adapter:

WS client handshake error: WebSocket protocol error: Handshake not finished

Would appreciate guidance if I'm doing something incorrectly. Thanks!

There is no reactor running

I am experiencing the error below
thread '' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime',

when I send a message to a NodeListener and then spawn a thread in a tokio runtime to process that message.

Is this error familiar to anyone?

Websocket web-sys implementation for wasm

Currently, WebSocket is working for client/server side. However, the browsers do not allow to create tcp connections direclty (which is the current implementation based). Instead, the web-sys must be used.

  • Use a different WebSocket implementation if the target is wasm.
  • wasm example of a client.

Bump-up crossbeam-channel

Hello~
I'm looking up the project currently, and have a question.

Seems it is using crossbeam-channel, but seems it has been moved to crossbeam, with 0.8 version.
Is there plan to bump-up?

Serve new events with multiple threads

Is there a way to put the loop node_listener.for_each(move |event| match event.network() {...
in a thread pool?

My application is already spawning threads for services and message-io would be one of such services.

multicast example panic (Mac)

hi @lemunozm ! excellent work on this crate, very exciting!

just wanted to share that while i was reviewing and running some of the examples, i found this issue where the second client to attempt running the multicast example panics after connecting, with an Address already in use error.

image

Cancelable send_with_timer

Hi,

It would be very nice to have a way to cancel messages scheduled with send_with_timer.

I also have a question. I need to encrypt the network traffic, do you recommend implementing an adapter for this or just encrypt the content of the messages before sending them?

Add a reliable & ordering over UDP adapter

Internal adapter API docs here.

List of candidates:

  • laminar: From https://amethyst.rs/. It would be blocked until non-blocking support: issue.
  • turbulence: Seems not support using underlying Mio non-blocking sockets 😢
  • rudp (candidate) Seems to work with Mio, but is it maintained? sirkibsirkib/rudp#1
  • Some brave rustacean wants to create a new cool and great library that can be agnostic about the underlying udp socket used (std, mio, ...). Similar to how tungstenite-rs manages the TCP stream in its websocket.

The only requirement is that the library must allow using non-blocking with mio support.

Since this kind of adapter is highly configurable (rates, packet size,...) probably it depends first on #54 but the first approach with default configuration can be already done.

Specify network interface

Hello,
Is there any way to specify which network interface to use (linux user)?
I want message-io to communicate over wired connection, but it uses radio by default.
I cannot find any mention to it... Is it "embedded" in mio?
thanks!

Windows Websocket latency optimizatión

There is an existing optimization for WebSocket here: #73 in order to avoid an extra call to read_message() from tungstenite. This optimization means a reduction of 40% in the latency.

It works fine in Linux and MacOS but in Windows seems like after the peek() the stream is no longer waked from the poll.

Search the way to get it working in Windows if possible

Sending HTTP Responses

Hi,

Thanks for the great library!

I just wanted to ask if it is possible to send HTML/JSON responses back to a client, or even host static files with this library?

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.