Giter Site home page Giter Site logo

tokio-rs / tokio Goto Github PK

View Code? Open in Web Editor NEW
24.6K 311.0 2.2K 12.82 MB

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...

Home Page: https://tokio.rs

License: MIT License

Rust 100.00%
rust asynchronous networking

tokio's Introduction

Tokio

A runtime for writing reliable, asynchronous, and slim applications with the Rust programming language. It is:

  • Fast: Tokio's zero-cost abstractions give you bare-metal performance.

  • Reliable: Tokio leverages Rust's ownership, type system, and concurrency model to reduce bugs and ensure thread safety.

  • Scalable: Tokio has a minimal footprint, and handles backpressure and cancellation naturally.

Crates.io MIT licensed Build Status Discord chat

Website | Guides | API Docs | Chat

Overview

Tokio is an event-driven, non-blocking I/O platform for writing asynchronous applications with the Rust programming language. At a high level, it provides a few major components:

  • A multithreaded, work-stealing based task scheduler.
  • A reactor backed by the operating system's event queue (epoll, kqueue, IOCP, etc...).
  • Asynchronous TCP and UDP sockets.

These components provide the runtime components necessary for building an asynchronous application.

Example

A basic TCP echo server with Tokio.

Make sure you activated the full features of the tokio crate on Cargo.toml:

[dependencies]
tokio = { version = "1.37.0", features = ["full"] }

Then, on your main.rs:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // In a loop, read data from the socket and write the data back.
            loop {
                let n = match socket.read(&mut buf).await {
                    // socket closed
                    Ok(n) if n == 0 => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                    }
                };

                // Write the data back
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

More examples can be found here. For a larger "real world" example, see the mini-redis repository.

To see a list of the available features flags that can be enabled, check our docs.

Getting Help

First, see if the answer to your question can be found in the Guides or the API documentation. If the answer is not there, there is an active community in the Tokio Discord server. We would be happy to try to answer your question. You can also ask your question on the discussions page.

Contributing

๐ŸŽˆ Thanks for your help improving the project! We are so happy to have you! We have a contributing guide to help you get involved in the Tokio project.

Related Projects

In addition to the crates in this repository, the Tokio project also maintains several other libraries, including:

  • axum: A web application framework that focuses on ergonomics and modularity.

  • hyper: A fast and correct HTTP/1.1 and HTTP/2 implementation for Rust.

  • tonic: A gRPC over HTTP/2 implementation focused on high performance, interoperability, and flexibility.

  • warp: A super-easy, composable, web server framework for warp speeds.

  • tower: A library of modular and reusable components for building robust networking clients and servers.

  • tracing (formerly tokio-trace): A framework for application-level tracing and async-aware diagnostics.

  • mio: A low-level, cross-platform abstraction over OS I/O APIs that powers tokio.

  • bytes: Utilities for working with bytes, including efficient byte buffers.

  • loom: A testing tool for concurrent Rust code.

Changelog

The Tokio repository contains multiple crates. Each crate has its own changelog.

Supported Rust Versions

Tokio will keep a rolling MSRV (minimum supported rust version) policy of at least 6 months. When increasing the MSRV, the new Rust version must have been released at least six months ago. The current MSRV is 1.63.

Note that the MSRV is not increased automatically, and only as part of a minor release. The MSRV history for past minor releases can be found below:

  • 1.30 to now - Rust 1.63
  • 1.27 to 1.29 - Rust 1.56
  • 1.17 to 1.26 - Rust 1.49
  • 1.15 to 1.16 - Rust 1.46
  • 1.0 to 1.14 - Rust 1.45

Note that although we try to avoid the situation where a dependency transitively increases the MSRV of Tokio, we do not guarantee that this does not happen. However, every minor release will have some set of versions of dependencies that works with the MSRV of that minor release.

Release schedule

Tokio doesn't follow a fixed release schedule, but we typically make one to two new minor releases each month. We make patch releases for bugfixes as necessary.

Bug patching policy

For the purposes of making patch releases with bugfixes, we have designated certain minor releases as LTS (long term support) releases. Whenever a bug warrants a patch release with a fix for the bug, it will be backported and released as a new patch release for each LTS minor version. Our current LTS releases are:

  • 1.32.x - LTS release until September 2024. (MSRV 1.63)
  • 1.36.x - LTS release until March 2025. (MSRV 1.63)

Each LTS release will continue to receive backported fixes for at least a year. If you wish to use a fixed minor release in your project, we recommend that you use an LTS release.

To use a fixed minor version, you can specify the version with a tilde. For example, to specify that you wish to use the newest 1.32.x patch release, you can use the following dependency specification:

tokio = { version = "~1.32", features = [...] }

Previous LTS releases

  • 1.8.x - LTS release until February 2022.
  • 1.14.x - LTS release until June 2022.
  • 1.18.x - LTS release until June 2023.
  • 1.20.x - LTS release until September 2023.
  • 1.25.x - LTS release until March 2024.

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Tokio by you, shall be licensed as MIT, without any additional terms or conditions.

tokio's People

Contributors

alexcrichton avatar asomers avatar aturon avatar blasrodri avatar carllerche avatar darksonn avatar davidpdrsn avatar dekellum avatar hawkw avatar hds avatar ipetkov avatar jonhoo avatar kleimkuhler avatar kpp avatar luciofranco avatar maminrayej avatar marwes avatar mikailbag avatar mox692 avatar noah-kennedy avatar nylonicious avatar radicalzephyr avatar satakuma avatar seanmonstar avatar sfackler avatar taiki-e avatar udoprog avatar vorner avatar vorot93 avatar zaharidichev avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tokio's Issues

Cannot build Example: serving database content using proto

The subject example will not build, no matter where I look for the crates. The example doesn't say where to get the crates from (some are in crates.io and some are not), nor what versions the dependencies should have.

Example problems I have had:

error: no matching package named `tokio-minihttp` found (required by `loda`)
location searched: registry https://github.com/rust-lang/crates.io-index

and then I resolve this with tokio-minihttp = { git = "https://github.com/tokio-rs/tokio-minihttp" }, and follow this by resolving r2d2, but fail with r2d2-postgres:

error: no matching package named `r2d2-postgres` found (required by `loda`)
location searched: registry https://github.com/rust-lang/crates.io-index

I find a specific github for this, but it doesn't seem to work:

error: no matching package named `r2d2-postgres` found (required by `loda`)
location searched: https://github.com/sfackler/r2d2-postgres

eventually, I gave up.

Please update the example so that it compiles, at least, and provide enough information to be able to reproduce it.

Setup benchmarking suite

Changes should be metrics driven. There are already some benchmarks in the repository, but a solid suite of benchmarks representative of real cases would be very useful.

Maintain a change log

The library really should be maintaining a change log. A new crate release is the right time to start.

Restructure documentation from the ground up

One of the most frequent pieces of feedback we get about futures and Tokio in Rust is "it's confusing!" While https://tokio.rs has some great content it doesn't seem to have stood well against the test of time. It's time we rewrite/restructure our documentation from the ground up. I hope to use this issue to track this work and provide a location to discuss it as well!

@aturon and I talked a bit today about how to start structuring docs, and we came initially to some high-level conclusions:

  • We should assume that async/await syntax will happen "relatively soon". This syntax is such a game changer for futures that we should make liberal use of it in documentation, even if it's unstable. Documentation will have a disclaimer about why it is unstable, links to tracking issues for stability progress, and links to all code examples which work on stable Rust as well. Our hope is that by the time this is done async/await is at least in the standard nightly compilers rather than having to work off a rustc fork.
  • We likely want to not mention tokio-proto/tokio-service at all in the first pass of documentation. This has led to quite some confusion with how the current documentation is so concretely centered around tokio-proto.
  • We need to inundate ourselves with examples. We cannot have enough examples. The current documentation suffers quite a bit in this respect.

Other than that, here's the thinking for an outline so far:

Compare and contrast sync and async

  • Goal: (1) async isnโ€™t scary! (2) async gives you super-powers (3) you are already empowered to kick the tires
  • Async programming is actually super easy! Looks just like sync
  • Async is really expressive
    • show off something super easy with futures thatโ€™d be a pain with std
      • timeouts
      • multiplexing heterogeneous events
      • incredibly cheap tasks
        • Rayon integration
      • not critical understand everything, just show some high-level things to get across the โ€œpowerโ€ point
  • Async is really performant
    • Killer memory usage comparison
    • Killer throughput comparison
    • Both should use relatively simple/easy to understand but quasi-realistic servers. Ideally people can download and play themselves

Behind the curtain: Futures

  • Kernel
    • Basic conceptual model of polling etc
    • The trait
      • fn poll only
    • Task model
  • Combinators
    • How does async/await map down
  • Streams/sinks

I/O with futures

  • Tokio!

There's definitely quite a bit to fill out here! We'll try to do that over time.

Graceful shutdown mechanism

I don't think (and please correct me if I'm wrong) there's currently a general approach to "shutting down" a reactor where it will cease to accept new jobs but continue to execute until all spawned jobs have been executed, even if the first requirement is dropped (i.e. so long as futures remain spawned against the reactor, new jobs can still be added).

I also don't think there's a mechanism by means of which a thread can be notified that a future it spawned against a reactor will never be run (i.e. the reactor has been destroyed with spawned jobs enqueued), so such a feature would at least mitigate that concern.

Is the current architecture of a tokio core/reactor amenable to such a feature, and would you be open to considering this feature request?

Interleaving CPU and IO work on the main reactor task

I'm experimenting with porting a Java-based server (which I'll describe below) to Tokio + Rust, and having some difficulty in getting adequate performance from it; the reason for the performance gap, I suspect, is me misusing the reactor, or rather, not playing along nicely with it. I wanted to confirm this, and see if there's a way to achieve what I'd like without introducing more threads into the mix. So, without further ado, here's how the server works:

Phase 1

  1. Runs a tokio_core::net::TcpListener on a Core and waits for clients to connect. The server knows a priori which clients to expect - a handshake is done upon accepting a connection, and this handshake identifies the client.
  2. Once all expected clients are connected, the main future completes and the TcpListener is dropped (no need to keep the listener up since we do not expect more connections after this point). This main future resolves to a Vec of structs representing a client, which is essentially a wrapper over the tokio_core::net::TcpStream.

This phase is pretty straighforward - no issues here. I wanted to outline it anyway in case someone finds it of relevance.

Phase 2

This is the phase that's slow. It runs a simulation of sorts, modeling D days and each day having N iterations of clients taking turns interacting with the simulation state. More details:

  1. For each day, we run N iterations. Each iteration consists of:
  • a client is picked from the list of clients
  • some simulation state is gathered for this client - this is purely in-memory calculations, and are very light on the CPU.
  • the simulation state is sent to the client (over the TCP connection using textual json as the encoding). This part can happen in the background, technically speaking
  • similar simulation state is gathered for the other clients, and sent to them in the same manner.
  • the server then needs to receive a request from the picked client from the first step, and update the simulation state based on the contents of that message. This part must complete before we move to the next iteration.
  • the next iteration is the same except a different client is picked.
  1. The above proceeds for all D days, and then the simulation ends.

The way I have Phase 2 implemented (which I suspect is less than ideal) is via one large chain of futures, with the "root" future being run() on the Core. For each client, however, I did spawn a future that handles its socket (and framed) rx/tx communication. Each client has a futures::unsync::mpsc::UnboundedSender that the simulation task uses to send messages to it; this sender is wired up to the FramedWrite (on top of the socket) with a send_all combinator (i.e. the socket Sink is writing all messages received from the channel). The rx portion of the tcp stream is given to the client directly, where the simulation treats it like a Stream when trying to read messages from it.

The root simulation future is implemented as a couple of LoopFns, one looping over the days and a nested one looping over the iterations.

The performance is pretty bad, and I suspect it's because I'm doing too much on the root task and not allowing enough concurrency (i.e. monopolizing the reactor thread, so to speak). The actual non-IO work in the simulation is pretty cheap, as mentioned, so I was trying to see if I could interleave IO and execution on the same reactor thread without taking a performance hit. However, I've thus far failed to do that :).

Am I just barking up the wrong tree here? I could try redesigning this with a dedicated thread for the reactor, a background thread for running the simulation, and communicating between the two via channels. But I wanted to see if there was a way to keep it single-threaded and delicately interleave the IO and CPU work.

Any suggestions/thoughts/tips? Also happy to elaborate on the current design more, or anything else that's unclear.

Metrics collection for Services

I know this will not be looked at for a while, and this might get moved to a different repo, but I wanted to get my thoughts written down.

I'd love to get more thoughts on two things:

  • We need a trait for Metrics, that a service(via middleware) can then use.
pub trait Metric {
    /// Name of service
    fn service_name(&self) -> &str;

    /// Name of api
    fn api_name(&self) -> &str;

    /// Request Id
    fn request_id(&self) -> &str; // Or `usize` or `T`

    /// Start time for the call; To calculate latency
    fn start(&self) -> Instant;

    /// End time for the call; To calculate latency
    fn end(&self) -> Instant;

    // There are some more here we can discuss later: [children_calls, error_type, or some generic "other_info"]
}
  • How would we end up returning such an object from the Future?
Future<(Response, Metrics), (Error, Metrics)>

This feels really awkward, and it breaks the fluency of Futures with .map and such. What other options do we have? Given the nature of Futures (i.e. long tasks), and the importance of metrics, it could make sense to bake in Metrics into the Future trait, but that may not be possible in a zero-cost way. Does it make sense for Tokio-Service to implement its own Future which has Metrics built-in?

Question: Is there a dependency between listener's socket stream and other tasks?

Hi,

in a minimal server (using hyper) I am seeing a behaviour that just cannot understand. It is likely that I am just missing a key point about Tokio internals, but I'd really like to understand the reason for what I am seeing.

The server uses a single thread, takes accepted connections from the incoming() stream and hands them to a hyper server for very simple processing (doing some string ops to simulate CPU work). Every connection's server is spawned on a new task by hyper.

The server can handle ~250 req/s (each request's work takes about 4ms, so that makes sense). I am load testing this with an external load generator and I am using a new connection for each request. I have metrics inside the service, and I can validate that the above indeed happens as described.

I also measure the number of connections taken from incoming() stream and I can see that ~250 connections/s are taken from the stream.

This completely goes against my expectations - I would have thought Tokio would just read from the stream as much as provided and I'd expect new connections to pile up, while waiting to be processed. Instead, they pile up before the incoming() stream - the are visible as not-served / long latency test requests outside my service.

When I reduce the work amount (eg from 5ms to 2ms duration), the server handles more requests according to this reduction - but it also shows more accepted connections. Again about the same rate as the req/s rate.

This relationship between the processing speed of incoming and the request/s rate confuses me completely - since they are on different tasks, what inside of Tokio makes them go a similar pace?

Can somebody help me figure out the reason for this behaviour?

The test server's incoming stream loop is here https://github.com/algermissen/web-rust/blob/708831056b92304426e00e3e2f529b16d229d2ea/src/bin/testserver.rs#L155

Jan

Missing event loop understanding documentation

Sorry if the description is vague, but I just keep hitting my head against the wall trying to understand how the bits and pieces of Tokio fit together and how to make it work with e g my dbus crate. I hope you don't see this as a rant; I'm trying to express my frustration as constructively as I can.

I've seen other main loops; those seem to all support a few crucial methods, which you can then build upon:

  • have a callback ASAP, but at the end of the queue, like node.js's process.nextTick.
  • have a callback after some time, like node.js's process.setTimeout and process.setInterval.
  • have a callback when a file descriptor is ready for reading or writing (or error/hangup/etc)
  • basic stuff like making the main loop run and quit, like glib's g_main_loop_quit.

Out of these, I've only found tokio-timer to deal with the second one. The other three I don't understand or can't find, and trying to read documentation only ends up with me asking myself things like "okay, so a Task is something to be executed in the future, hmm wait, that's what a Future is, so why do we need both concepts?", rather than finding the answers to my questions.

The first one is probably simple, I just don't understand the Tokio architecture to know how to write it.

The third one brson wrote some draft code for and it seems utterly complex to get right for someone new to Tokio, and even includes an extra call to libc::poll which I hope should not be necessary...?

The fourth one; there is Core::run which unlike other loops takes an argument, and instead has no quit method. I guess these are somewhat the same and either style can be used to emulate the other style, but nothing in the documentation about event loops (as far as I've found) indicates how to get a server to quit cleanly; they all end with a let server = listener.incoming().for_each(/*... */); core.run(server);, so it's not obvious how to do it.

I'm sure the documentation is great if you want to build your own network protocol like IMAP or NTP, but for us coming from a completely different angle, it's a different story.

Provide a way to listen for tcp disconnects

As I understand it, this wouldn't be possible with the way tokio is currently written because it doesn't distinguish between hup events and readable events. Assuming we can fix this, I'd like a way to detect when a tcp stream has closed without reading from it. The API could look something like this:

impl TcpStream {
    pub fn on_close(self) -> OnClose { .. }
}

impl OnClose {
    pub fn into_inner(self) -> TcpStream { .. }
}

impl Future for OnClose {
    type Item = ();
    type Error = io::Error;
    ..
}

That would let you put the stream into a state where you're just waiting for it to close, but you could convert that state back into a TcpStream if you later decide you want to read from the stream.

Question / Suggestion + example idea

Hi Folks, I have been poking around tokio (both the documentation and the repos here) trying to understand the new concepts; I think the documentation does a good job at explaining the basics of Futures, (I read it sequentially) Streams and Sink (although would be nice to see some medium complexity examples).

In the docs I couldn't find any explanation how to chain several futures together. I have seen I can use future_a.and_then(future_b); but what if I have a bunch of futures and writing them by hand like that is not an option? (see what I mean below).

I came up with a simple example where this use-case shows;
the idea is to fetch the stop stores from HN.

extern crate hyper;
extern crate hyper_tls;
extern crate futures;
extern crate tokio_core;
extern crate serde;
extern crate serde_json;

#[macro_use]
extern crate serde_derive;

use futures::{Future, Stream};
use hyper::{Client, Chunk};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;

use std::error::Error;
use std::io;
use std::fmt;

#[derive(Deserialize)]
struct Story {
    by: String,
    id: i32,
    score: i32,
    time: i32,
    title: String,
}

impl fmt::Display for Story {
    fn fmt (&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}, by {}", self.title, self.by)
    }
}

fn run () -> Result<(), Box<Error>> {

    let mut core = Core::new()?;

    let client = Client::configure()
                    .connector(HttpsConnector::new(8, &core.handle())?)
                    .build(&core.handle());

    let url = "https://hacker-news.firebaseio.com/v0/topstories.json".parse()?;
    let request = client.get(url).and_then(|response| {
        response.body().concat2().and_then(move |body: Chunk| {
            let v: Vec<i32> = serde_json::from_slice(&body).map_err(|e|{
                io::Error::new(io::ErrorKind::Other, e)
            })?;
            Ok(v)
        })
    });

    let ids: Vec<i32> = core.run(request)?;

    for id in ids {
        let url = format!("https://hacker-news.firebaseio.com/v0/item/{}.json", id).parse()?;
        let request = client.get(url).and_then(|response| {
            response.body().concat2().and_then(move |body: Chunk| {
                let v: Story = serde_json::from_slice(&body).map_err(|e|{
                    io::Error::new(io::ErrorKind::Other, e)
                })?;
                Ok(v)
            })
        });

        println!("{}", core.run(request)?);
    }


    Ok(())
}

fn main() {
    if let Err(e) = run () {
        eprintln!("Error: {}", e);
    }
}

At the moment this code is running extremely slow, I think I'm not handling futures properly.... or maybe because I'm calling core.run so many times; Thoughts?

PS: feel free to re-use the idea if you want to include something like that in the docs.

More Example how to write concurrent code.

i am loving rust and tokio and futures and stuff.

i am just so much confused right now because code examples given are too intelligent for beginner to understand, can you guys please wrote some beginner level example how concurrency works with tokio and if possible step by step stuff. i know its lot to do but this is rust's turning point may be, and i think it should be better. thank you.

No easy way to report errors that occur in UdpCodec::encode

I'm writing a UdpCodec implementation, and it looks like there's no way to (easily) report an error during my implementation's encode function.

Specifically, while decode(...) -> Result<In, ...>, encode(...) -> SocketAddr.

I'm wondering if it's possible to change this so encode(...) -> Result<SocketAddr, ...>

For now I'll lift the fallible chunk of code to somewhere else, but would like to know if this is possible.

Remove poll_read / poll_write from TCP / UDP types

These types are legacy from the original Io trait implementation. There isn't really much of a point to having them and the exact guarantees that these functions provide are undefined (specifically, do they permit false positives or not?).

Pre-RFC Tokio/Futures Cookbook

Intent

The deliverable of this ticket will be a fairly complete list of topics for v1 of a Tokio and Futures cookbook, as well as a document structure and design guideline for ongoing additions. The finished product is intended to address the cookbook part of #13.

In addition to fielding and refining topics, there is a question of the actual structure of the topics as well. This is because many of the topics have valid solutions as compositions of combinators, or as implementations as futures traits (or both).

Also, at the high level, I'd like to come up with heuristics for the size and scope of what is considered a 'recipe' as many of these might exceed those, and could be broken down into sub-recipes.

Format

The plan is to plagiarize the structure and format of the existing Rust Cookbook

Hosting

This effort does not propose an actual hosting location. It is hoped that it will live either as a subcategory of the existing tokio documentation site, or under a Futures topic of the Rust Cookbook.

Topics

Common Tasks

  1. Starting and stopping the reactor core.
  2. Interacting with async interfaces that don't have an FD
    1. Polling resources (e.g. hardware pins, etc)
    2. Callback APIs
    3. Non-blocking APIs (try_*, etc)
  3. Running multiple reactors/execs in multiple threads.
  4. Running 1 reactor/exec in multiple threads.
  5. Sending to futures from blocking/sync threads.
  6. Receiving from futures in blocking/sync threads.
  7. Aggregating from multiple streams into a single stream.

Managing Monomorphization (need a better name)

  1. Returning multiple error types from a function
  2. Returning multiple futures/streams types but with the same Item
    1. (from match/if let)
  3. A workflow that might produce one of several futures based on some conditional

Io Tasks

  1. Read/Write a file asynchronously, complete future when read/write completes. (futures_cpupool)
  2. Stream stdin, Sink stdout while not blocking (similar to Io Tasks #1)

Standard Protocols

  1. http 1 and 2.0 client basic request response
  2. http 1.1 and 2.0 server - basic RESTful style
  3. Database read/write
  4. TLS / APN
  5. ASIO? (not common, but interesting)

Advanced networking

  1. (client or server) streams of data that reconnect semi-transparently.
  2. Managing connections with out of band data (separating ping/heartbeats)

Audit UDP APIs

The UDP types are expected to match std's UDP types. The API should be audited to ensure that they still match up before we release 0.1 and commit to an API.

Future returned to Handle.spawn()/.execute() is never run

I'm experimenting with remote reactors and task spawning, but I'm having trouble with a particular scenario where the Future passed to .spawn()/.execute() is not resolved.

This is the self-contained example: https://git.neosmart.net/mqudsi/futuretest/src/execute-no-future-unwrap/src/main.rs

I create a reactor and then call a function that queues a task with the event pump:

    let f = future::result::<(), ()>(Ok(())).map(|_| {
        println!("Starting first worker");
        match RUNNER.lock() {
            Ok(guard) => {
                match *guard {
                    Some(ref x) => {
                        println!("Starting runner");
                        x.run()
                    }
                    None => eprintln!("RUNNER not initialized!"),
                }
            }
            Err(_) => eprintln!("Could not lock mutex!"),
        }
    });

and the code that does the queuing:

impl TaskRunner {
    pub fn run(&self) {
        self.remote_handle
            .spawn({
                println!("Running!");
                future::ok(())
                .map(|()| println!("Running followup!"))
            })
            .unwrap()
    }
}

The closure passed to .spawn() is evaluated ("Running!" is printed) but the future returned to .spawn() from the closure is never executed ("Running followup!" does not appear).

Am I missing something? .spawn() and .execute() take a future but return void, so whose job is it to run the future if not theirs?

More realistic demos in tokio-core

As someone new to tokio learning has been fairly difficult as I'm sure a lot of people are aware. Something specifically that has been difficult for me was trying to move from the examples in tokio-core to writing real software partly because the examples are so different from how real software would be written.

For instance, the udp-codec example shows how to use the UdpCodec trait to build a server that can send and receive messages that are managed by a defined codec. Unfortunately the way this example is written is unlike how any real software would be written and avoids dealing with some of the issues I ran into while trying to figure out the correct pattern to use for building something that uses this. Specifically it's unlikely you'll ever have both sockets on the same machine sending data back and forth. One of the first strategies I tried was using for_each on the stream and in the block passed attempting to write to the sink the recipient, message pair. This failed due to an issue closure taking ownership of the sink binding.

Ultimately the first solution I ended up getting working was switching to map and having the stream return a new stream of responses. This personally seemed like a non-ergonomic solution and not how I would think about writing the code coming from other using async languages/platforms where a callback with non-blocking I/O would be scheduled inside the event loop.

The next solution was writing to a channel inside the loop and having another task that is reading from the channel and sending the message pairs from it. This was more like the solution in the example but having a more realistic solution would have definitely helped getting something working significantly faster.

Echo server grinds to a crawl when handling many sequential requests

Not sure what the best repository is to file this.

I implemented the code from the echo server example (pasted below for your convenience)

Throughput decreases dramatically as the number of requests submitted increases:

Number of requests Requests/second
10,000 94,900
500,000 1,170
10,000,000 150

The server process is spinning at 100% CPU while doing this.

Benchmark:

yes|dd bs=2 count=10M|nc localhost 12345|dd bs=2 count=10M|pv -i 10 > /dev/null 

Code:

// [dependencies]
// bytes = "0.4"
// futures = "0.1"
// tokio-io = "0.1"
// tokio-core = "0.1"
// tokio-proto = "0.1"
// tokio-service = "0.1"

extern crate bytes;
extern crate futures;
extern crate tokio_io;
extern crate tokio_proto;
extern crate tokio_service;

use std::io;
use std::str;

use bytes::BytesMut;
use futures::{future, Future, BoxFuture};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Encoder, Decoder, Framed};
use tokio_proto::pipeline::ServerProto;
use tokio_proto::TcpServer;
use tokio_service::Service;

pub struct LineCodec;

impl Decoder for LineCodec {
    type Item = String;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<String>> {
        if let Some(i) = buf.iter().position(|&b| b == b'\n') {
            // remove the serialized frame from the buffer.
            let line = buf.split_to(i);

            // Also remove the '\n'
            buf.split_to(1);

            // Turn this data into a UTF string and return it in a Frame.
            match str::from_utf8(&line) {
                Ok(s) => Ok(Some(s.to_string())),
                Err(_) => Err(io::Error::new(io::ErrorKind::Other,
                                             "invalid UTF-8")),
            }
        } else {
            Ok(None)
        }
    }
}

impl Encoder for LineCodec {
    type Item = String;
    type Error = io::Error;

    fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
        buf.extend(msg.as_bytes());
        buf.extend(b"\n");
        Ok(())
    }
}

pub struct LineProto;

impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for LineProto {
    /// For this protocol style, `Request` matches the `Item` type of the codec's `Decoder`
    type Request = String;

    /// For this protocol style, `Response` matches the `Item` type of the codec's `Encoder`
    type Response = String;

    /// A bit of boilerplate to hook in the codec:
    type Transport = Framed<T, LineCodec>;
    type BindTransport = Result<Self::Transport, io::Error>;
    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(LineCodec))
    }
}

pub struct Echo;

impl Service for Echo {
    // These types must match the corresponding protocol types:
    type Request = String;
    type Response = String;

    // For non-streaming protocols, service errors are always io::Error
    type Error = io::Error;

    // The future for computing the response; box it for simplicity.
    type Future = BoxFuture<Self::Response, Self::Error>;

    // Produce a future for computing a response from a request.
    fn call(&self, req: Self::Request) -> Self::Future {
        // In this case, the response is immediate.
        future::ok(req).boxed()
    }
}

fn main() {
    // Specify the localhost address
    let addr = "0.0.0.0:12345".parse().unwrap();

    // The builder requires a protocol and an address
    let server = TcpServer::new(LineProto, addr);

    // We provide a way to *instantiate* the service for each new
    // connection; here, we just immediately return a new instance.
    server.serve(|| Ok(Echo));
}

Consider re-exporting types to the root

Currently, the tokio crate has nothing exported at the root. The root only contains two modules: net and reactor.

Odds are, we should export common types at the root.

'Example: serving database content using proto' isn't returning InvalidPermissionsError from rust-postgres

I've made a few alterations to the default code specified in: 'Example: serving database content using photo', most notably the query.

#[macro_use]
extern crate serde_derive;

extern crate futures;
extern crate futures_cpupool;
extern crate r2d2;
extern crate r2d2_postgres;
extern crate rand;
extern crate serde;
extern crate serde_json;
extern crate tokio_minihttp;
extern crate tokio_proto;
extern crate tokio_service;

use std::io;
use std::ops::Deref;

use futures::{BoxFuture, Future};
use futures_cpupool::CpuPool;
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use tokio_minihttp::{Request, Response};
use tokio_proto::TcpServer;
use tokio_service::Service;

struct Server {
    thread_pool: CpuPool,
    db_pool: r2d2::Pool<r2d2_postgres::PostgresConnectionManager>,
}

#[derive(Serialize)]
struct Message {
    id: i32,
}

impl Service for Server {
    type Request = Request;
    type Response = Response;
    type Error = io::Error;
    type Future = BoxFuture<Response, io::Error>;

    fn call(&self, req: Request) -> Self::Future {
        assert_eq!(req.path(), "/db");

        let db = self.db_pool.clone();
        let msg = self.thread_pool.spawn_fn(move || {
            let conn = db.get().map_err(|e| {
                io::Error::new(io::ErrorKind::Other, format!("timeout: {}", e))
            })?;

            println!("Sending query...");
            let stmt = &conn.query("SELECT * FROM users", &[])?;
            println!("Got response!...");
            let row = stmt.get(0);
            Ok(Message {
                id: row.get("id"),
            })
        });

        msg.map(|msg| {
            let json = serde_json::to_string(&msg).unwrap();
            let mut response = Response::new();
            response.header("Content-Type", "application/json");
            response.body(&json);
            response
        }).boxed()
    }
}

fn main() {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let thread_pool = CpuPool::new(10);

    let db_url = "postgres://username:password@localhost";
    let db_config = r2d2::Config::default();
    let db_manager = PostgresConnectionManager::new(db_url, TlsMode::None).unwrap();
    let db_pool = r2d2::Pool::new(db_config, db_manager).unwrap();

    println!("Starting TCP server...");
    TcpServer::new(tokio_minihttp::Http, addr).serve(move || {
        Ok(Server {
            thread_pool: thread_pool.clone(),
            db_pool: db_pool.clone(),
        })
    });
    println!("TCP server running...");
}

The problem that I'm having is the &conn.query("SELECT * FROM users", &[])?; doesn't seem to return. I've isolated the rust-postgres and r2d2-postgres crates, and the error I was getting was an "invalid permissions error" because my user didn't have admin privileges, both these crates panic.

The error seems to be consumed(?), and the connection still thinks it's fine. I checked via the is_valid() function, and can't seem to find a solution as to what's happening.

"Example: an echo server using proto" does not work for me.

It accepts connections, but does not send back any data to socket.

$ rustc --version
rustc 1.19.0-nightly (2d4ed8e0c 2017-05-03)

$ cat Cargo.toml 
[package]
name = "tokiotest"
version = "0.1.0"
authors = ["Vitaly _Vi Shukela <[email protected]>"]

[dependencies]
bytes = "0.4"
futures = "0.1"
tokio-io = "0.1"
tokio-proto = "0.1"
tokio-service = "0.1"
tokio-core = "0.1"

$ cat src/main.rs 
extern crate tokio_proto;
extern crate tokio_core;
extern crate tokio_io;
extern crate futures;
extern crate tokio_service;
extern crate bytes;

use std::io;

use futures::future;
use futures::{Future, BoxFuture};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Framed, Encoder, Decoder};
use bytes::BytesMut;
use tokio_proto::TcpServer;
use tokio_proto::pipeline::ServerProto;
use tokio_service::Service;

struct LineCodec;

impl Encoder for LineCodec {
  type Item = String;
  type Error = io::Error;

  fn encode(&mut self, out: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
      Ok(())
  }
}

impl Decoder for LineCodec {
  type Item = String;
  type Error = io::Error;

  fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
      Ok(None)
  }
}

struct LineProto;

impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for LineProto {
    type Request = String;
    type Response = String;
    type Transport = Framed<T, LineCodec>;
    type BindTransport = Result<Self::Transport, io::Error>;
    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(LineCodec))
    }
}

struct Echo;

impl Service for Echo {
    type Request = String;
    type Response = String;
    type Error = io::Error;
    type Future = BoxFuture<Self::Response, Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        future::ok(req).boxed()
    }
}

fn main() {
    // Specify the localhost address
    let addr = "0.0.0.0:12345".parse().unwrap();

    // The builder requires a protocol and an address
    let server = TcpServer::new(LineProto, addr);

    // We provide a way to *instantiate* the service for each new
    // connection; here, we just immediately return a new instance.
    server.serve(|| Ok(Echo));
}

$ strace -f ./target/x86_64-unknown-linux-gnu/debug/tokiotest
execve("./target/x86_64-unknown-linux-gnu/debug/tokiotest", ["./target/x86_64-unknown-linux-gn"...], [/* 29 vars */]) = 0
brk(NULL)                               = 0x31b00e97000
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773ddb000
access("/etc/ld.so.preload", R_OK)      = -1 ENOENT (No such file or directory)
open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=328047, ...}) = 0
mmap(NULL, 328047, PROT_READ, MAP_PRIVATE, 3, 0) = 0x3a773d8a000
close(3)                                = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/libdl.so.2", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\16\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0644, st_size=14664, ...}) = 0
mmap(NULL, 2109712, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x3a7739b9000
mprotect(0x3a7739bc000, 2093056, PROT_NONE) = 0
mmap(0x3a773bbb000, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x2000) = 0x3a773bbb000
close(3)                                = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/librt.so.1", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0P#\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0644, st_size=31784, ...}) = 0
mmap(NULL, 2128920, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x3a7737b1000
mprotect(0x3a7737b8000, 2093056, PROT_NONE) = 0
mmap(0x3a7739b7000, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x6000) = 0x3a7739b7000
close(3)                                = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/libpthread.so.0", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320n\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=137384, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773d89000
mmap(NULL, 2213008, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x3a773594000
mprotect(0x3a7735ac000, 2093056, PROT_NONE) = 0
mmap(0x3a7737ab000, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x17000) = 0x3a7737ab000
mmap(0x3a7737ad000, 13456, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x3a7737ad000
close(3)                                = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/libgcc_s.so.1", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\260*\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0644, st_size=90096, ...}) = 0
mmap(NULL, 2185952, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x3a77337e000
mprotect(0x3a773394000, 2093056, PROT_NONE) = 0
mmap(0x3a773593000, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x15000) = 0x3a773593000
close(3)                                = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0P\34\2\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=1738176, ...}) = 0
mmap(NULL, 3844640, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x3a772fd3000
mprotect(0x3a773174000, 2097152, PROT_NONE) = 0
mmap(0x3a773374000, 24576, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x1a1000) = 0x3a773374000
mmap(0x3a77337a000, 14880, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x3a77337a000
close(3)                                = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773d88000
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773d86000
arch_prctl(ARCH_SET_FS, 0x3a773d86840)  = 0
mprotect(0x3a773374000, 16384, PROT_READ) = 0
mprotect(0x3a7737ab000, 4096, PROT_READ) = 0
mprotect(0x3a7739b7000, 4096, PROT_READ) = 0
mprotect(0x3a773bbb000, 4096, PROT_READ) = 0
mprotect(0x31b0002d000, 28672, PROT_READ) = 0
mprotect(0x3a773ddd000, 4096, PROT_READ) = 0
munmap(0x3a773d8a000, 328047)           = 0
set_tid_address(0x3a773d86b10)          = 5837
set_robust_list(0x3a773d86b20, 24)      = 0
rt_sigaction(SIGRTMIN, {sa_handler=0x3a77359a9b0, sa_mask=[], sa_flags=SA_RESTORER|SA_SIGINFO, sa_restorer=0x3a7735a3890}, NULL, 8) = 0
rt_sigaction(SIGRT_1, {sa_handler=0x3a77359aa40, sa_mask=[], sa_flags=SA_RESTORER|SA_RESTART|SA_SIGINFO, sa_restorer=0x3a7735a3890}, NULL, 8) = 0
rt_sigprocmask(SIG_UNBLOCK, [RTMIN RT_1], NULL, 8) = 0
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM64_INFINITY}) = 0
readlink("/etc/malloc.conf", 0x3ffacf57300, 4096) = -1 ENOENT (No such file or directory)
brk(NULL)                               = 0x31b00e97000
mmap(NULL, 2097152, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a772dd3000
munmap(0x3a772dd3000, 2097152)          = 0
mmap(NULL, 4190208, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a772bd4000
munmap(0x3a772bd4000, 180224)           = 0
munmap(0x3a772e00000, 1912832)          = 0
open("/sys/devices/system/cpu/online", O_RDONLY|O_CLOEXEC) = 3
read(3, "0-3\n", 8192)                  = 4
close(3)                                = 0
rt_sigaction(SIGPIPE, {sa_handler=SIG_IGN, sa_mask=[PIPE], sa_flags=SA_RESTORER|SA_RESTART, sa_restorer=0x3a7730080e0}, {sa_handler=SIG_DFL, sa_mask=[], sa_flags=0}, 8) = 0
mmap(NULL, 2097152, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a772a00000
open("/proc/self/maps", O_RDONLY|O_CLOEXEC) = 3
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM64_INFINITY}) = 0
fstat(3, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773dda000
read(3, "31affd19000-31affe2e000 r-xp 000"..., 1024) = 1024
read(3, "0-3a773394000 r-xp 00000000 fd:1"..., 1024) = 1024
read(3, "librt-2.19.so\n3a7739b7000-3a7739"..., 1024) = 1024
read(3, "                       /lib/x86_"..., 1024) = 428
close(3)                                = 0
munmap(0x3a773dda000, 4096)             = 0
sched_getaffinity(5837, 32, [0, 1, 2, 3]) = 8
mmap(0x3ffac759000, 4096, PROT_NONE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x3ffac759000
rt_sigaction(SIGSEGV, {sa_handler=0x31affdbc140, sa_mask=[], sa_flags=SA_RESTORER|SA_STACK|SA_SIGINFO, sa_restorer=0x3a7735a3890}, NULL, 8) = 0
rt_sigaction(SIGBUS, {sa_handler=0x31affdbc140, sa_mask=[], sa_flags=SA_RESTORER|SA_STACK|SA_SIGINFO, sa_restorer=0x3a7735a3890}, NULL, 8) = 0
sigaltstack(NULL, {ss_sp=NULL, ss_flags=SS_DISABLE, ss_size=0}) = 0
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x3a773dd9000
sigaltstack({ss_sp=0x3a773dd9000, ss_flags=0, ss_size=8192}, NULL) = 0
futex(0x3a773bbc0c8, FUTEX_WAKE_PRIVATE, 2147483647) = 0
epoll_create1(EPOLL_CLOEXEC)            = 3
pipe2([4, 5], O_NONBLOCK|O_CLOEXEC)     = 0
epoll_ctl(3, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLET, {u32=4294967295, u64=18446744073709551615}}) = 0
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 6
ioctl(6, FIOCLEX)                       = 0
setsockopt(6, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(6, {sa_family=AF_INET, sin_port=htons(12345), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(6, 1024)                         = 0
ioctl(6, FIONBIO, [1])                  = 0
epoll_ctl(3, EPOLL_CTL_ADD, 6, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2, u64=2}}) = 0
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN, {u32=2, u64=2}}], 1024, -1) = 1


write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x3ffacf56588, 128)             = -1 EAGAIN (Resource temporarily unavailable)
accept4(6, {sa_family=AF_INET, sin_port=htons(35182), sin_addr=inet_addr("127.0.0.1")}, [128->16], SOCK_CLOEXEC) = 7
ioctl(7, FIONBIO, [1])                  = 0
epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4, u64=4}}) = 0
accept4(6, 0x3ffacf55dd0, [128], SOCK_CLOEXEC) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(3, [{EPOLLOUT, {u32=4, u64=4}}], 1024, 0) = 1
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=4, u64=4}}], 1024, -1) = 1


write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x3ffacf56588, 128)             = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(7, "dsfadsf\n", 8192, 0, NULL, NULL) = 8
recvfrom(7, 0x3a772a5a008, 8184, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=4, u64=4}}], 1024, -1) = 1
write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x3ffacf56588, 128)             = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(7, "213213214324\n", 8184, 0, NULL, NULL) = 13
recvfrom(7, 0x3a772a5a015, 8171, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=4, u64=4}}], 1024, -1) = 1
write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x3ffacf56588, 128)             = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(7, "", 8171, 0, NULL, NULL)    = 0
close(7)                                = 0
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, ^Cstrace: Process 5837 detached
 <detached ...>

`TcpClient` leaks connection

I don't know if this is an expected behavior, but it seems that a TcpClient somehow leaks its internal connection (a TcpStream instance?) into the tokio_core::reactor::Core it's running on.

I created an example test case that has two tests. One test forces the Core instance to drop before the test asserts that the connection is closed (somewhat indirectly, by checking if a dummy server thread has finished). The other lets the Core instance live beyond the assertion, so it's only dropped in the end of the test. The first test succeeds, while the second fails.

Audit TCP/UDP APIs

The net types are expected to match std's net types. The API should be audited to ensure that they still match up before we release 0.1 and commit to an API.

Consider providing an accessor for the inner `std` types.

It could be useful for Tokio's TcpStream et al. to provide an accessor that returns a ref to the std::TcpStream. Specifically, crates like net2 provide useful helpers but only against std::TcpStream. Tokio (and mio) redefines all of these fns, but adding a new fn requires updating many crates.

Consider extracting `UdpFramed`

In general, traits that could be useful in situations that do not require a reactor should not live in the tokio crate. This is why tokio-io was extracted initially.

Anyway, maybe we should not include something that may or may not be extract in the future. It would be easy enough to include these types in a tokio-udp crate.

How to shutdown the global reactor

Original comment: #57 (comment)

How to shutdown the global reactor? The only (remaining) reason that I think it should be shutdown is to avoid FD leaks (the epoll FD never getting closed) which would be raised by linting tools and would be pretty annoying if there was no way to avoid. There doesn't need to be a way to restart it once it is shutdown and I think it would be pretty trivial to have a fn Reactor::shutdown_default_global or something that signals global::HelperThread to terminate and blocks until it does so.

Consider removing `Core::id`.

I'm not sure exactly why it was added in the first place. Specifically, I know it was added to provide some token to use as a map key, but I don't recall the exact scenario that required this to be in the library.

The implementation also doesn't handle usize wrapping, which is a very edge case, but it seems like it should be handled.

Basically, enough has changed w/ the usage patterns of how the reactor should be used that I would like to remove Core::id from the initial release and re-evaluate if it is needed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.