polachok / fahrenheit Goto Github PK
View Code? Open in Web Editor NEWtoy futures executor ๐๐๐ฅ
License: Other
toy futures executor ๐๐๐ฅ
License: Other
Line 24 in 3f2f133
I'm a newbie of Rust and kindly ask why we still need a rc here. For lifetime, it's 'static and is safe to use. For shared ownership, we can still have multiple immutable reference to it.
code
#![feature(futures_api,async_await,await_macro)]
extern crate futures;
extern crate fahrenheit;
use futures::io::{AsyncWriteExt,AsyncReadExt};
use futures::stream::{StreamExt};
use fahrenheit::AsyncTcpStream;
use fahrenheit::AsyncTcpListener;
use std::net::SocketAddr;
use std::io;
async fn listen(addr: &str) {
let addr: SocketAddr = addr.parse().unwrap();
let listener = AsyncTcpListener::bind(addr).unwrap();
let mut incoming = listener.incoming();
while let Some(stream) = await!(incoming.next()) {
fahrenheit::spawn(process(stream));
}
}
async fn process(mut stream: AsyncTcpStream) {
let mut buf = vec![0;1024];
await!(stream.read_exact(&mut buf));
}
fn main() {
fahrenheit::run(listen("127.0.0.1:12345"))
}
backtrace:
thread 'main' panicked at 'already borrowed: BorrowMutError', libcore/result.rs:983:5
stack backtrace:
0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::print
at libstd/sys_common/backtrace.rs:71
at libstd/sys_common/backtrace.rs:59
2: std::panicking::default_hook::{{closure}}
at libstd/panicking.rs:211
3: std::panicking::default_hook
at libstd/panicking.rs:227
4: std::panicking::rust_panic_with_hook
at libstd/panicking.rs:475
5: std::panicking::continue_panic_fmt
at libstd/panicking.rs:390
6: rust_begin_unwind
at libstd/panicking.rs:325
7: core::panicking::panic_fmt
at libcore/panicking.rs:77
8: core::result::unwrap_failed
at /checkout/src/libcore/macros.rs:26
9: <core::result::Result<T, E>>::expect
at /checkout/src/libcore/result.rs:809
10: <core::cell::RefCell<T>>::borrow_mut
at /checkout/src/libcore/cell.rs:885
11: fahrenheit::EventLoop::do_spawn
at ./src/lib.rs:183
12: fahrenheit::spawn::{{closure}}
at ./src/lib.rs:43
13: <std::thread::local::LocalKey<T>>::try_with
at /checkout/src/libstd/thread/local.rs:294
14: <std::thread::local::LocalKey<T>>::with
at /checkout/src/libstd/thread/local.rs:248
15: fahrenheit::spawn
at ./src/lib.rs:43
16: server::listen::{{closure}}
at examples/server.rs:18
17: <std::future::GenFuture<T> as core::future::future::Future>::poll::{{closure}}
at /checkout/src/libstd/future.rs:46
18: std::future::set_task_cx
at /checkout/src/libstd/future.rs:82
19: <std::future::GenFuture<T> as core::future::future::Future>::poll
at /checkout/src/libstd/future.rs:46
20: <alloc::boxed::Box<F> as core::future::future_obj::UnsafeFutureObj<'a, T>>::poll
at /checkout/src/liballoc/boxed.rs:948
21: <core::future::future_obj::LocalFutureObj<'a, T> as core::future::future::Future>::poll
at /checkout/src/libcore/future/future_obj.rs:83
22: <core::future::future_obj::FutureObj<'a, T> as core::future::future::Future>::poll
at /checkout/src/libcore/future/future_obj.rs:133
23: fahrenheit::Task::poll
at src/lib.rs:87
24: fahrenheit::EventLoop::run
at ./src/lib.rs:273
25: fahrenheit::run::{{closure}}
at ./src/lib.rs:39
26: <std::thread::local::LocalKey<T>>::try_with
at /checkout/src/libstd/thread/local.rs:294
27: <std::thread::local::LocalKey<T>>::with
at /checkout/src/libstd/thread/local.rs:248
28: fahrenheit::run
at ./src/lib.rs:39
29: server::main
at examples/server.rs:28
30: std::rt::lang_start::{{closure}}
at /checkout/src/libstd/rt.rs:74
31: std::panicking::try::do_call
at libstd/rt.rs:59
at libstd/panicking.rs:310
32: __rust_maybe_catch_panic
at libpanic_unwind/lib.rs:102
33: std::rt::lang_start_internal
at libstd/panicking.rs:289
at libstd/panic.rs:392
at libstd/rt.rs:58
34: std::rt::lang_start
at /checkout/src/libstd/rt.rs:74
35: main
36: __libc_start_main
37: <unknown>
Hi!
First off: thanks so much for making this project! I've learned a lot just by reading it, which is fantastic!
I was going through the source today (on the futures-0.3
branch actually โจ) and something that stood out to me is that there's quite some unsafe
sprinkled throughout the code to interact with the OS through libc
. nix
is a package which aims to make using libc
's functionality safe, removing the need for unsafe
.
I feel this would have a few benefits:
For example the select(2)
loop in the reactor currently looks like this:
let rv = unsafe {
select(
nfds,
&mut read_fds,
&mut write_fds,
std::ptr::null_mut(),
&mut tv,
)
};
// don't care for errors
if rv == -1 {
panic!("select()");
} else if rv == 0 {
debug!("timeout");
} else {
debug!("data available on {} fds", rv);
}
Using the nix select
function this could be rewritten to something like:
// don't care for errors
let rv = select(
Some(nfds),
Some(&mut read_fds),
Some(&mut write_fds),
None,
Some(&mut tv),
).unwrap();
debug!("data available on {} fds", rv);
Which might feel a bit more natural for most Rustaceans.
We made a case for replacing the usage of libc
with the nix
crate, citing potential benefits, and created an example conversion using existing code. I hope this issue might come in helpful. Thanks for your time!
code
#![feature(futures_api,async_await,await_macro, pin, arbitrary_self_types)]
extern crate futures;
extern crate fahrenheit;
use futures::stream::{StreamExt};
use futures::channel::mpsc;
use futures::future::Future;
use futures::task::Poll;
use futures::task::Context;
use std::pin::PinMut;
async fn comsumer(mut rx:mpsc::UnboundedReceiver<i32>) {
while let Some(v) = await!(rx.next()) {
println!("recieved {}", v);
}
}
fn main() {
let (tx, rx) = mpsc::unbounded();
let prod = Producer {chan:tx, index:0};
fahrenheit::spawn(prod);
fahrenheit::run(comsumer(rx))
}
struct Producer {
chan : mpsc::UnboundedSender<i32>,
index : i32,
}
impl Future for Producer {
type Output = ();
fn poll(mut self : PinMut<Self>, cx: & mut Context) -> Poll<Self::Output> {
self.chan.unbounded_send(self.index).unwrap();
self.index += 1;
if self.index == 1000 {
Poll::Ready(())
} else {
cx.waker().wake();
Poll::Pending
}
}
}
stack backtrace:
0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::print
at libstd/sys_common/backtrace.rs:71
at libstd/sys_common/backtrace.rs:59
2: std::panicking::default_hook::{{closure}}
at libstd/panicking.rs:211
3: std::panicking::default_hook
at libstd/panicking.rs:227
4: std::panicking::rust_panic_with_hook
at libstd/panicking.rs:477
5: std::panicking::continue_panic_fmt
at libstd/panicking.rs:391
6: rust_begin_unwind
at libstd/panicking.rs:326
7: core::panicking::panic_fmt
at libcore/panicking.rs:77
8: core::result::unwrap_failed
at /checkout/src/libcore/macros.rs:26
9: <core::result::Result<T, E>>::expect
at /checkout/src/libcore/result.rs:809
10: <core::cell::RefCell<T>>::borrow_mut
at /checkout/src/libcore/cell.rs:885
11: fahrenheit::EventLoop::wake
at src/lib.rs:157
12: <fahrenheit::Token as alloc::task::if_arc::Wake>::wake::{{closure}}
at src/lib.rs:63
13: <std::thread::local::LocalKey<T>>::try_with
at /checkout/src/libstd/thread/local.rs:294
14: <std::thread::local::LocalKey<T>>::with
at /checkout/src/libstd/thread/local.rs:248
15: <fahrenheit::Token as alloc::task::if_arc::Wake>::wake
at src/lib.rs:58
16: <alloc::task::if_arc::ArcWrapped<T> as core::task::wake::UnsafeWake>::wake
at /checkout/src/liballoc/task.rs:71
17: core::task::wake::Waker::wake
at /checkout/src/libcore/task/wake.rs:51
18: <futures_channel::mpsc::Sender<T>>::signal
at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:615
19: <futures_channel::mpsc::Sender<T>>::queue_push_and_signal
at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:544
20: <futures_channel::mpsc::Sender<T>>::do_send_nb
at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:520
21: <futures_channel::mpsc::UnboundedSender<T>>::unbounded_send
at /mnt/f/.cargo/git/checkouts/futures-rs-4ca77cb4f4f05ac4/1baf8a0/futures-channel/src/mpsc/mod.rs:745
22: <haha::Producer as core::future::future::Future>::poll
at examples/haha.rs:36
23: <alloc::boxed::Box<F> as core::future::future_obj::UnsafeFutureObj<'a, T>>::poll
at /checkout/src/liballoc/boxed.rs:783
24: <core::future::future_obj::LocalFutureObj<'a, T> as core::future::future::Future>::poll
at /checkout/src/libcore/future/future_obj.rs:83
25: <core::future::future_obj::FutureObj<'a, T> as core::future::future::Future>::poll
at /checkout/src/libcore/future/future_obj.rs:133
26: fahrenheit::Task::poll
at src/lib.rs:87
27: fahrenheit::EventLoop::run
at ./src/lib.rs:272
28: fahrenheit::run::{{closure}}
at ./src/lib.rs:39
29: <std::thread::local::LocalKey<T>>::try_with
at /checkout/src/libstd/thread/local.rs:294
30: <std::thread::local::LocalKey<T>>::with
at /checkout/src/libstd/thread/local.rs:248
31: fahrenheit::run
at ./src/lib.rs:39
32: haha::main
at examples/haha.rs:23
33: std::rt::lang_start::{{closure}}
at /checkout/src/libstd/rt.rs:74
34: std::panicking::try::do_call
at libstd/rt.rs:59
at libstd/panicking.rs:310
35: __rust_maybe_catch_panic
at libpanic_unwind/lib.rs:103
36: std::rt::lang_start_internal
at libstd/panicking.rs:289
at libstd/panic.rs:392
at libstd/rt.rs:58
37: std::rt::lang_start
at /checkout/src/libstd/rt.rs:74
38: main
39: __libc_start_main
40: <unknown>
This is an awesome project, it's really helped me understand the mechanics of futures better! Thank you!
One thing I haven't quite wrapped my head around is why we need any sort of global REACTOR
. Each task is polled from an EventLoop - and indeed we're already passing the EventLoop instance up the stack in the guise of a Handle
(er... a Spawn
, which of course can't be down-casted), as part of the Context
. It seems very awkward to require using global state, when we have all the state we need, already passed around to the proper places through a Context
object (just inaccessible
It seems it would be much cleaner to be able to down-cast some part of the Context
in order to call add_read_interest
in the "leaf" Future implementations (like AsyncTcpStream
). This would of course require a change to the Context
API.
Thoughts?
Line 18 in d85b86d
You can collect the data into Vec<u8>
, and after the loop ends you cast the data into String. Because the chunks may contain invalid UTF-8 if you try to cast each chunk to UTF8.
error[E0433]: failed to resolve: could not find `unix` in `os`
--> src\lib.rs:17:14
|
17 | use std::os::unix::io::RawFd;
| ^^^^ could not find `unix` in `os`
and related errors :)
Hi i am playing around with async/await and i have got some issue with reading bytes into vec
with with_capacity or just new.
This is my code for just read scenario where it always return empty return 0 read bytes for some reason:
async fn process(mut stream: AsyncTcpStream) {
let mut buf = Vec::with_capacity(100);
await!(stream.read(&mut buf));
println!("{}", String::from_utf8_lossy(&buf));
}
the same applies for read_exact:
async fn process(mut stream: AsyncTcpStream) {
let mut buf = Vec::with_capacity(100);
await!(stream.read_exact(&mut buf));
println!("{}", String::from_utf8_lossy(&buf));
}
However read_to_end works alright (unfortunately i can needs to wait till connection ended)
Can some one explain me if i do something wrong if not then why does it behave like that. Thanks :)
Firstly, thanks for doing this.
I'm trying to get my head around Rust's async/await code, and this is very helpful.
If I read this correctly, every call to fahrenheit::run() creates a whole new select() look / thread for that sync code.
How would it change if you wanted a bunch of unrelated asyc{} blocks to use the same single thread / select() loop ?
Or am I just reading this all wrong? Good chance of that. I'm new to rust, but have used async/await in a few other languages.
lastly: Sorry if this is the wrong place to post/ask. I didn't see a comments option on your blog post.
The project is unaffiliated with Tokio and has a name that leads to confusion.
To be safe and fair to all, I would request that the project is renamed to something that makes it clear that it is not affiliated w/ Tokio.
See https://www.reddit.com/r/rust/comments/97zzig/toykio_and_futures_03/ for discussion.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.