Giter Site home page Giter Site logo

fahrenheit's Introduction

fahrenheit (formerly known as toykio)

Build Status

FOR LEARNING PURPOSES ONLY

This is a greatly simplified implementation

  • std::future::Future compatible executor on top of select(2) event loop
  • AsyncRead/AsyncWrite TcpStream implementations

todo:

  • more comments

fahrenheit's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fahrenheit's Issues

AsyncRead with Vec::with_capacity()

Hi i am playing around with async/await and i have got some issue with reading bytes into vec with with_capacity or just new.

This is my code for just read scenario where it always return empty return 0 read bytes for some reason:

async fn process(mut stream: AsyncTcpStream) {
    let mut buf = Vec::with_capacity(100);
    await!(stream.read(&mut buf)); 
    println!("{}", String::from_utf8_lossy(&buf));
}

the same applies for read_exact:

async fn process(mut stream: AsyncTcpStream) {
    let mut buf = Vec::with_capacity(100);
    await!(stream.read_exact(&mut buf)); 
    println!("{}", String::from_utf8_lossy(&buf));
}

However read_to_end works alright (unfortunately i can needs to wait till connection ended)

Can some one explain me if i do something wrong if not then why does it behave like that. Thanks :)

panicked at 'already borrowed: BorrowMutError'

code

#![feature(futures_api,async_await,await_macro)]
extern crate futures;
extern crate fahrenheit;

use futures::io::{AsyncWriteExt,AsyncReadExt};
use futures::stream::{StreamExt};
use fahrenheit::AsyncTcpStream;
use fahrenheit::AsyncTcpListener;
use std::net::SocketAddr;
use std::io;

async fn listen(addr: &str) {
    let addr: SocketAddr = addr.parse().unwrap();
    let listener = AsyncTcpListener::bind(addr).unwrap();
    let mut incoming = listener.incoming();

    while let Some(stream) = await!(incoming.next()) {
        fahrenheit::spawn(process(stream));
    }
}

async fn process(mut stream: AsyncTcpStream) {
    let mut buf = vec![0;1024];
    await!(stream.read_exact(&mut buf));
}

fn main() {
    fahrenheit::run(listen("127.0.0.1:12345"))
}

backtrace:

thread 'main' panicked at 'already borrowed: BorrowMutError', libcore/result.rs:983:5
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1: std::sys_common::backtrace::print
             at libstd/sys_common/backtrace.rs:71
             at libstd/sys_common/backtrace.rs:59
   2: std::panicking::default_hook::{{closure}}
             at libstd/panicking.rs:211
   3: std::panicking::default_hook
             at libstd/panicking.rs:227
   4: std::panicking::rust_panic_with_hook
             at libstd/panicking.rs:475
   5: std::panicking::continue_panic_fmt
             at libstd/panicking.rs:390
   6: rust_begin_unwind
             at libstd/panicking.rs:325
   7: core::panicking::panic_fmt
             at libcore/panicking.rs:77
   8: core::result::unwrap_failed
             at /checkout/src/libcore/macros.rs:26
   9: <core::result::Result<T, E>>::expect
             at /checkout/src/libcore/result.rs:809
  10: <core::cell::RefCell<T>>::borrow_mut
             at /checkout/src/libcore/cell.rs:885
  11: fahrenheit::EventLoop::do_spawn
             at ./src/lib.rs:183
  12: fahrenheit::spawn::{{closure}}
             at ./src/lib.rs:43
  13: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  14: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  15: fahrenheit::spawn
             at ./src/lib.rs:43
  16: server::listen::{{closure}}
             at examples/server.rs:18
  17: <std::future::GenFuture<T> as core::future::future::Future>::poll::{{closure}}
             at /checkout/src/libstd/future.rs:46
  18: std::future::set_task_cx
             at /checkout/src/libstd/future.rs:82
  19: <std::future::GenFuture<T> as core::future::future::Future>::poll
             at /checkout/src/libstd/future.rs:46
  20: <alloc::boxed::Box<F> as core::future::future_obj::UnsafeFutureObj<'a, T>>::poll
             at /checkout/src/liballoc/boxed.rs:948
  21: <core::future::future_obj::LocalFutureObj<'a, T> as core::future::future::Future>::poll
             at /checkout/src/libcore/future/future_obj.rs:83
  22: <core::future::future_obj::FutureObj<'a, T> as core::future::future::Future>::poll
             at /checkout/src/libcore/future/future_obj.rs:133
  23: fahrenheit::Task::poll
             at src/lib.rs:87
  24: fahrenheit::EventLoop::run
             at ./src/lib.rs:273
  25: fahrenheit::run::{{closure}}
             at ./src/lib.rs:39
  26: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  27: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  28: fahrenheit::run
             at ./src/lib.rs:39
  29: server::main
             at examples/server.rs:28
  30: std::rt::lang_start::{{closure}}
             at /checkout/src/libstd/rt.rs:74
  31: std::panicking::try::do_call
             at libstd/rt.rs:59
             at libstd/panicking.rs:310
  32: __rust_maybe_catch_panic
             at libpanic_unwind/lib.rs:102
  33: std::rt::lang_start_internal
             at libstd/panicking.rs:289
             at libstd/panic.rs:392
             at libstd/rt.rs:58
  34: std::rt::lang_start
             at /checkout/src/libstd/rt.rs:74
  35: main
  36: __libc_start_main
  37: <unknown>

panicked again at 'already borrowed: BorrowMutError'

code

#![feature(futures_api,async_await,await_macro, pin, arbitrary_self_types)]
extern crate futures;
extern crate fahrenheit;

use futures::stream::{StreamExt};
use futures::channel::mpsc;
use futures::future::Future;
use futures::task::Poll;
use futures::task::Context;
use std::pin::PinMut;

async fn comsumer(mut rx:mpsc::UnboundedReceiver<i32>) {
    while let Some(v) = await!(rx.next()) {
        println!("recieved {}", v);
    }
}

fn main() {
    let (tx, rx) = mpsc::unbounded();

    let prod = Producer {chan:tx, index:0};
    fahrenheit::spawn(prod);
    fahrenheit::run(comsumer(rx))
}

struct Producer {
    chan : mpsc::UnboundedSender<i32>,
    index : i32,
}

impl Future for Producer {
    type Output = ();

    fn poll(mut self : PinMut<Self>, cx: & mut Context) -> Poll<Self::Output> {
        self.chan.unbounded_send(self.index).unwrap();
        self.index += 1;
        if self.index == 1000 {
            Poll::Ready(())
        } else {
            cx.waker().wake();
            Poll::Pending
        }
    }
}

stack backtrace:

   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1: std::sys_common::backtrace::print
             at libstd/sys_common/backtrace.rs:71
             at libstd/sys_common/backtrace.rs:59
   2: std::panicking::default_hook::{{closure}}
             at libstd/panicking.rs:211
   3: std::panicking::default_hook
             at libstd/panicking.rs:227
   4: std::panicking::rust_panic_with_hook
             at libstd/panicking.rs:477
   5: std::panicking::continue_panic_fmt
             at libstd/panicking.rs:391
   6: rust_begin_unwind
             at libstd/panicking.rs:326
   7: core::panicking::panic_fmt
             at libcore/panicking.rs:77
   8: core::result::unwrap_failed
             at /checkout/src/libcore/macros.rs:26
   9: <core::result::Result<T, E>>::expect
             at /checkout/src/libcore/result.rs:809
  10: <core::cell::RefCell<T>>::borrow_mut
             at /checkout/src/libcore/cell.rs:885
  11: fahrenheit::EventLoop::wake
             at src/lib.rs:157
  12: <fahrenheit::Token as alloc::task::if_arc::Wake>::wake::{{closure}}
             at src/lib.rs:63
  13: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  14: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  15: <fahrenheit::Token as alloc::task::if_arc::Wake>::wake
             at src/lib.rs:58
  16: <alloc::task::if_arc::ArcWrapped<T> as core::task::wake::UnsafeWake>::wake
             at /checkout/src/liballoc/task.rs:71
  17: core::task::wake::Waker::wake
             at /checkout/src/libcore/task/wake.rs:51
  18: <futures_channel::mpsc::Sender<T>>::signal
             at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:615
  19: <futures_channel::mpsc::Sender<T>>::queue_push_and_signal
             at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:544
  20: <futures_channel::mpsc::Sender<T>>::do_send_nb
             at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:520
  21: <futures_channel::mpsc::UnboundedSender<T>>::unbounded_send
             at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:745
  22: <haha::Producer as core::future::future::Future>::poll
             at examples/haha.rs:36
  23: <alloc::boxed::Box<F> as core::future::future_obj::UnsafeFutureObj<'a, T>>::poll
             at /checkout/src/liballoc/boxed.rs:783
  24: <core::future::future_obj::LocalFutureObj<'a, T> as core::future::future::Future>::poll
             at /checkout/src/libcore/future/future_obj.rs:83
  25: <core::future::future_obj::FutureObj<'a, T> as core::future::future::Future>::poll
             at /checkout/src/libcore/future/future_obj.rs:133
  26: fahrenheit::Task::poll
             at src/lib.rs:87
  27: fahrenheit::EventLoop::run
             at ./src/lib.rs:272
  28: fahrenheit::run::{{closure}}
             at ./src/lib.rs:39
  29: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  30: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  31: fahrenheit::run
             at ./src/lib.rs:39
  32: haha::main
             at examples/haha.rs:23
  33: std::rt::lang_start::{{closure}}
             at /checkout/src/libstd/rt.rs:74
  34: std::panicking::try::do_call
             at libstd/rt.rs:59
             at libstd/panicking.rs:310
  35: __rust_maybe_catch_panic
             at libpanic_unwind/lib.rs:103
  36: std::rt::lang_start_internal
             at libstd/panicking.rs:289
             at libstd/panic.rs:392
             at libstd/rt.rs:58
  37: std::rt::lang_start
             at /checkout/src/libstd/rt.rs:74
  38: main
  39: __libc_start_main
  40: <unknown>

Possible to do away with global REACTOR?

This is an awesome project, it's really helped me understand the mechanics of futures better! Thank you!

One thing I haven't quite wrapped my head around is why we need any sort of global REACTOR. Each task is polled from an EventLoop - and indeed we're already passing the EventLoop instance up the stack in the guise of a Handle (er... a Spawn, which of course can't be down-casted), as part of the Context. It seems very awkward to require using global state, when we have all the state we need, already passed around to the proper places through a Context object (just inaccessible โ˜น๏ธ).

It seems it would be much cleaner to be able to down-cast some part of the Context in order to call add_read_interest in the "leaf" Future implementations (like AsyncTcpStream). This would of course require a change to the Context API.

Thoughts?

One thread per call run run()?

Firstly, thanks for doing this.

I'm trying to get my head around Rust's async/await code, and this is very helpful.

If I read this correctly, every call to fahrenheit::run() creates a whole new select() look / thread for that sync code.

How would it change if you wanted a bunch of unrelated asyc{} blocks to use the same single thread / select() loop ?

Or am I just reading this all wrong? Good chance of that. I'm new to rust, but have used async/await in a few other languages.

lastly: Sorry if this is the wrong place to post/ask. I didn't see a comments option on your blog post.

Not currently building on Windows

error[E0433]: failed to resolve: could not find `unix` in `os`
  --> src\lib.rs:17:14
   |
17 | use std::os::unix::io::RawFd;
   |              ^^^^ could not find `unix` in `os`

and related errors :)

Removing unsafe by replacing libc with nix

Hi!

First off: thanks so much for making this project! I've learned a lot just by reading it, which is fantastic!

Observation

I was going through the source today (on the futures-0.3 branch actually โœจ) and something that stood out to me is that there's quite some unsafe sprinkled throughout the code to interact with the OS through libc. nix is a package which aims to make using libc's functionality safe, removing the need for unsafe.

I feel this would have a few benefits:

  • It would emphasize that it's possible to write executors in (mostly) safe code.
  • It's probably a bit more readable for most people (e.g. no need to worry unsafe and null pointers).

Example

For example the select(2) loop in the reactor currently looks like this:

let rv = unsafe {
    select(
        nfds,
        &mut read_fds,
        &mut write_fds,
        std::ptr::null_mut(),
        &mut tv,
    )
};

// don't care for errors
if rv == -1 {
    panic!("select()");
} else if rv == 0 {
    debug!("timeout");
} else {
    debug!("data available on {} fds", rv);
}

Using the nix select function this could be rewritten to something like:

// don't care for errors
let rv = select(
    Some(nfds),
    Some(&mut read_fds),
    Some(&mut write_fds),
    None,
    Some(&mut tv),
).unwrap();

debug!("data available on {} fds", rv);

Which might feel a bit more natural for most Rustaceans.

Conclusion

We made a case for replacing the usage of libc with the nix crate, citing potential benefits, and created an example conversion using existing code. I hope this issue might come in helpful. Thanks for your time!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.