rust-lang / futures-rs Goto Github PK
View Code? Open in Web Editor NEWZero-cost asynchronous programming in Rust
Home Page: https://rust-lang.github.io/futures-rs/
License: Apache License 2.0
Zero-cost asynchronous programming in Rust
Home Page: https://rust-lang.github.io/futures-rs/
License: Apache License 2.0
The issue is that streams require recursion in order to loop. The current assumptions in the futures library are that, at some point, a future / stream will not be ready, forcing a callback to be registered (essentially a defer). However, this assumption is not always true. The following example demonstrates the problem w/ immediately ready futures but the issue exists in any situation where futures become ready faster than they are consumed.
Example:
extern crate futures;
use futures::Future;
use futures::stream::{channel, Stream, Sender};
fn count_down(i: u32, f: Sender<u32, ()>)
-> Box<Future<Item = (), Error = ()>>
{
let busy = f.send(Ok(i));
if i > 0 {
busy
.map_err(|_| ())
.and_then(move |sender| {
count_down(i - 1, sender)
})
.boxed()
} else {
Box::new(futures::finished::<(), ()>(()))
}
}
fn main() {
let (tx, rx) = channel::<u32, ()>();
rx.for_each(move |v| {
Ok(())
}).forget();
count_down(100_000, tx).forget();
}
the current futures code doesnt compile with stable due to use of the scoped_tls.
I tried to use the code with just future-rs and futures-cpupool.
Question for the comment in locks.rs
" As a futures library the eventual call to epoll
should be the only thing
that ever blocks"
Does that mean that only epoll will be supported for network calls (at least on Linux)?
A Future could hold a blocking network call or any other longer-running computation.
e.g.
let str: Future = count_words_in_rust_doc();
Maybe I do not understand the comment correctly. I assume that futures are meant to be generic.
I like this project a lot and I also like that that Rust community is not rushing this. It took Java many years and JDK 8 to finally get usable futures (CompletableFuture).
Similar for Python and C+++.
And Scala is still improving it's futures:
https://github.com/viktorklang/blog/blob/master/Futures-in-Scala-2.12-part-1.md
So it is good to design this carefully, maybe together with non-blocking IO. It will be crucial for Rust and very important for anyone working on stuff like databases, web servers, big data analytics and many other distributed systems.
The documentation talks about Some
/None
which are nowhere in sight. Instead, it returns NotReady
if the value has already been returned. That's probably fine although returning Some(value)
and then None
forever would be more in-line with Iterator::fuse
.
I know this issue is a bit vague but, beyond fixing the documentation, I don't really know the correct thing to do here.
Just to be clear, this is talking about this Fuse.
xx.rs:
use futures::*;
pub fn yy() -> BoxFuture<u32, ::std::io::Error> {
done(Ok(1)).boxed()
}
main.rs:
extern crate futures;
mod xx;
use futures::Future;
fn main() {
let f = xx::yy();
println!("wait: {:?}", f.wait());
}
This code works.
If use futures::Future
is commented out in main.rs
, compiler complains:
src/main.rs:10:30: 10:34 error: the trait bound `futures::Future<Error=std::io::Error, Item=u32> + Send: std::marker::Sized` is not satisfied [E0277]
src/main.rs:10 println!("wait: {:?}", f.wait());
^~~~
<std macros>:2:27: 2:58 note: in this expansion of format_args!
<std macros>:3:1: 3:54 note: in this expansion of print! (defined in <std macros>)
src/main.rs:10:5: 10:38 note: in this expansion of println! (defined in <std macros>)
src/main.rs:10:30: 10:34 help: run `rustc --explain E0277` to see a detailed explanation
src/main.rs:10:30: 10:34 note: `futures::Future<Error=std::io::Error, Item=u32> + Send` does not have a constant size known at compile-time
error: aborting due to previous error
error: Could not compile `futures-rs-td`.
I'm not sure if is it a bug, or just a usability problem, is it in futures-rs or in rust language, but it is hard to understand error message.
rustc 1.11.0 (9b21dcd6a 2016-08-15), futures-rs from master
This is a feature that's missing from mio
and futures-rs
. All the APIs take a SocketAddr
, and do not offer a way of resolving host names. The lookup_host
method from the standard library is only available on nightly, making it "technically" impossible to resolve host names on stable. I found a hack around this using TcpStream::connect
followed by a call to peer_addr()
but it's far from ideal and it's not asynchronous.
I think a really simple implementation could be offered using a thread pool and a call to getaddrinfo
. Eventually there could be a better solution such as bindings to the getdns
library, or a home made solution.
Be warned: I didn't do much research before writing this and was just wondering if the abstractions provided here allow implementing some of the nice features of node's streams. Commencing brain dump…
I saw that the only place where back pressure in streams is addressed is the documentation for futures::stream::channel
, which says
This channel is unique in that it implements back pressure to ensure that the sender never outpaces the receiver. The Sender::send method will only allow sending one message and the next message can only be sent once the first was consumed.
This sounds like its very specific to that one implementation. Is there a plan to generalize this? I/O streams using internal buffers would also need some kind of 'high water mark' (node.js calls it that way IIRC) or a way to pause streams they depend on when their buffer is full.
This brings me to my next question: Is there a plan to introduce a pipe
method? I.e., a method that works like pipe in a shell by combining multiple stream processors? and_then
allows transforming each item of a stream, but pipe
would go a bit further than that, using the current stream's output as input for the next stream.
For example: A stream of data read from an HTTP request (the body of the request) is piped into a (streaming) JSON parser whose output is piped into CSV serialization which then writes to a file. Ideally, this would 'just' be http::get("https://some.example/data").pipe(Json::array_deserializer::<SomeStruct>()).pipe(Csv::from).pipe(File::writeStream(filename))
. I guess there is a lot of work involved to make something like this work (and a lot more crates, and implicit worker/event loop handling somehow).
A Pipeable
trait might become pretty complicated if you wanted it to handle back pressure across n
piped streams. If I'm not mistaken, it could abstract over map
and and_then
as well (implementing it for closures that return Self::Item
or Future<Item, Error>
resp.).
When experimenting with futures and task, I got the weird panic:
thread 'main' panicked at 'assertion failed: `(left == right)` (left: `4567965872`, right: `4567966032`)', /Users/admytrenko/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task.rs:202
...
7: 0x10fc46e01 - futures::task::Task::get::h11c78cea13602c1a
8: 0x10fc44056 - _<futures_io..task..TaskIoTake<'a, 'b, T>>::new::h733b3b84a5e2dd45
9: 0x10fc460b6 - _<futures_io..task..TaskIoRead<T> as futures_io..ReadTask>::read::h8e19612329de43fb
10: 0x10fc4a451 - _<futures_io..read_exact..ReadExact<A, T> as futures..Future>::poll::ha3f0ac7f0692a919
I understand, that "forgetting" task here probably does not make much sense, but "safe" rust should not panic unless I'm calling some functions that may panic (unwrap, etc).
futures-rs version: f672963 (latest master at this time).
The smallest example I came up with, which crashes when you try to connect to the server:
extern crate futures;
extern crate futures_io;
extern crate futures_mio;
use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures_io::{copy, TaskIo, read_exact, write_all};
use futures::stream::Stream;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut l = futures_mio::Loop::new().unwrap();
let server = l.handle().tcp_listen(&addr);
let done = server.and_then(move |socket| {
// Once we've got the TCP listener, inform that we have it
println!("Listening on: {}", addr);
socket.incoming().for_each(|(socket, addr)| {
let io = TaskIo::new(socket);
let pair = io.map(|io| io.split());
let amt = pair.map(|(reader, writer)| {
read_exact(reader, vec![0; 1]).forget()
}).forget();
Ok(())
})
});
l.run(done).unwrap();
}
Hi,
The Future trait has no more "schedule" method, it was removed in
d07c397
So this section in README.md must be rewritten https://github.com/alexcrichton/futures-rs#the-future-trait
The following sorts of things would be nice:
poll
themselves. I'm not sure how this API would work precisely.wait
blocks indefinitely.Straem.iter_timeout(duration)
.My goal here is to potentially rewrite my UDP networking protocol project on top of futures. At the moment, it's using an API based around callbacks that get called on a background thread, leaving any inter-thread communication up to the end user. This obviously makes for a horrible API. I could force all projects that want to use it to opt into Mio instead, but mio doesn't appear to have game-friendly timing and games are one of the primary applications.
I'm not sure which of the above are possible, but this does appear to be a set of missing functionality. Even my first item would make Channel drastically more useful.
When I first saw the forget
method, I immediately assumed that this was a way to cancel the future (forget about it). While I understand this was named after the std function mem::forget
and it means "don't clean this is up but let it finish", assuming that the user will know about this relatively obscure std method is probably not a good idea.
Personally, I recommend renaming it to ensure
like the python method asyncio.ensure_future
as it seems to do something fairly similar.
Currently CpuPool::new
accepts u32
but num_cpus::get
returns usize
.
Unfortunately the forget()
function requires Self: Sized
, which Box<Future>
doesn't implement.
Hello,
Just seen
One of the most powerful features of Guava futures and Akka is the possibility to attach callbacks to a future, which are called immediately as soon as the future operation completes. Lack of this operation in the original Java futures libraries made the library almost useless for us as we wanted a full non-blocking system.
Is this feature implemented or planned? It's definitely a deal-breaker otherwise.
Thanks!
The core futures
crate is very close to being buildable in no_std
crates (with liballoc
and libcollections
). I've applied all necessary changes in an experimental branch and it compiles without problems.
There are only two rough spots:
LIMITED
executor can't be used as it relies on a thread local variable. To circumvent this, we can set the default executor to Inline
.catch_unwind
.I'd love to use this library in my no_std
crates. Are there any plans to add no_std
support, e.g. through a cargo feature?
The promise
function returns a tuple of (Promise, Complete)
.
The Promise
object is similar to the Receiver
object of the channels in the stdlib, while the Complete
object is similar to the Sender
object.
However in the stdlib, the channel()
function returns a tuple of (Sender, Receiver)
.
I find it confusing that the order of what promise()
returns is the opposite of what channel()
returns.
The timer wheel used for timeouts is suboptimal in a few cases:
If I understand the specifics of std::mem::drop(_)
, this only drops the &mut
reference, not the Task
. Unfortunately, it is impossible to drop a value by its mutable borrow. I am not sure how to handle this, perhaps the Task
needs a drop(&mut self)
method that can be called instead.
I'm getting a bizarre error at runtime:
thread 'main' panicked at 'assertion failed: (left == right)
(left: 140241379868944
, right: 140241379869200
)', /home/srwalter/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/lib.rs:208
If this is caused by an error in my program, I'm at a loss to understand what I'm doing wrong. Attached is a test case that should reproduce the error. At a high level I'm trying to adapt a socket into a Stream of messages. From the backtrace it seems to be failing in read_exact() called by my poll() implementation, but I can't find anything that looks like the assertion printed.
Note that you'll need a listening socket at localhost:1234 for the program to connect to; I just used netcat: nc -l 127.0.0.1:1234
I guess it probably should.
I get the following error:
src/lib.rs:427:5: 431:6 error: reached the recursion limit while instantiating `<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<empty::Empty<(), ()>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as Future>::boxed````
Right now the safety of TaskData
relies on the fact that the id
field of Task
is never reused. Thinking through this now I'm not actually sure that there's any reasonable way we can reuse it, however, as we would have to otherwise invalidate all existing TaskData
handles.
We may need to alter the scheme to make TaskData
resilient when used with a recycled Task
. I have a few ideas.
cc @aturon
Hi,
first of all great work, futures was what I was waiting for! Is there any plan to keep the HTTP lib, maybe as a separate lib, in a stable way instead of proof-of-concept?
It would be interesting and I can also try to help on that!
Thnx.
Bye,
Paolo
I need to send messages to the event loop from other threads. Event loop works like this (in pseudocode):
loop {
let item = channel.receive();
write(socket, serialize(item));
}
futures-rs provides only synchronous channel, which it not suitable for this case: because sender should not block indefinitely if socket is not available for writing.
Seems like futures-rs could provide another building block: channel with unbounded size and non-blocking sender (as in std::sync::mpsc::channel
) and with Stream
interface to the receiver.
One thing which I noticed was missing from the Streams implementation was some sort of "broadcast" transformer which allowed a stream to be subscribed to multiple times (i.e. polled/scheduled by multiple tasks in the language of futures-rs).
I was wondering if there was any interest in having this as I've been prototyping this and would love to upstream it!
When using read_to_end on streams like UnixStream
, the A
type parameter of read_to_end could be reused.
Hi Alex
You probably know about this already, but when trying to use futures-tls the compilation fails with this message:
futures-tls-0.1.0/src/openssl.rs:66:32: 66:63 error: failed to resolve. Could not find `HandshakeError` in `openssl::openssl::ssl` [E0433]
Digging into it, it looks like it's because the changes you made to the openssl crate (HandshakeError) haven't made it into a release yet. Give me a shout when there's an updated dependency that fixes this, or if you know of an easy workaround (just learning Rust). Looks like a really promising library, so thanks!
Regards
Tom
Channel
and Oneshot
are multithreaded "message passing" constructs. Both include some level of internal storage for in-flight messages.
There would be utility in providing a way for the receiving end to atomically drain & drop (shutdown) their handles. This would provide a way to guarantee that values don't get "lost" in the internal storage.
I implemented this initially as a try_release
fn, but this may or may not be the best way to expose the functionality:
https://github.com/tokio-rs/tokio/blob/master/src/util/future/channel.rs#L126
@alexcrichton, in the tutorial (and other places) you liken Future
to Iterator
, and Stream
to… errr, nothing, IIRC.
But isn't the correlation more like this?
# items | Sync | Async | Common operations |
---|---|---|---|
1 | Result<T, E> |
Future<T, E> |
map , and_then , joinN (a.k.a. zip !) |
∞ | Iterator<_> |
Stream<_> |
map , fold , collect , flatten |
Many operators on Option
/Result
/Iterator
has the same/similar names and semantics (… monads…), so the key differentiator is the number of items they can yield. (With respect to Stream
's error handling, the sync side should be Iterator<Item=Result<T,E>
.)
Or am I missing some key insight of the implementation? I started reading the tutorial on this earlier, and my background is JS async primitives. (That Streams are actually Futures yielding (val, next_future)
is a pretty nice property, btw.)
Consider the following program:
extern crate futures;
extern crate futures_io;
extern crate futures_mio;
use futures::Future;
use futures::stream::Stream;
use futures_io::{copy, TaskIo};
pub fn main() {
let mut l = futures_mio::Loop::new().unwrap();
let srv = l.handle().tcp_listen(&"127.0.0.1:5000".parse().unwrap());
let future = srv.and_then(move |server| {
server.incoming()
.and_then(|s| TaskIo::new(s.0))
.map(|i| i.split())
.map(|(a,b)| copy(a,b).map(|_| ()))
.buffered(10)
.collect()
});
let _ = l.run(future);
}
Before 9804cec, this program successfully acted as a TCP echo server. Now it hangs, pegs a CPU, and fails to echo any data sent to it.
Need lots of docs around who actually calls these methods.
Compiling and executing techempower1
(cargo run --release
) works fine, but getting the following error message from curl:
$ curl https://localhost:8080/json
curl: (35) gnutls_handshake() failed: The TLS connection was non-properly terminated.
In order to conveniently deal with loops which may have a body that completes asynchronously, we should ensure that there are a suite of combinators for working with multiple kinds of loops. Especially in light of #77 where tailcall
is being removed (it never really worked anyway) it'll be important to express looping not through recursion.
Also see #62
Current ideas for loops:
stream::iter(some_iterator).fold(...)
stream::iter(iter::repeat(()).map(Ok)).fold(...)
trampoline
function in clojureAn RFC was recently approved for the !
(Never
) type, which represents the type of a value that can never exist.
Several structs in of futures-rs
use PhantomData
to mark the type of values that can't exist, such as the Error
type of Finished
, the Item
type of Failed
, and the Item
and Error
types of Empty
. I believe that, once this PR lands (and then makes its way to stable), these structs should be rewritten to use the Never
type. Empty
, for example, could be written as follows:
pub struct Empty {}
pub fn empty() -> Empty { // Is this even necessary any more?
Empty {}
}
impl Future for Empty {
type Item = !;
type Error = !;
fn poll(&mut self, _: &mut Task) -> Poll<Self::Item, Self::Error> {
Poll::NotReady
}
fn schedule(&mut self, task: &mut Task) {
drop(task);
}
}
Unfortunately, this feature has yet to land on nightly, let alone stable. Therefore, a transition to Never
is likely a ways off. As this library is already getting lots of attention, I believe there should be a plan for how and if to structure the library so that a future (hehe 😃 ) transition to Never
is possible.
In order to be more consistent with terminology, I also think that Empty
should be renamed to Never
. !
is called Never
rather than Empty
or Void
as a result of the discussion at the end of the Never
PR. The gist of it is that Empty
isn't a future for an Empty
value, it's a future for a value that can Never
exist (i.e. it Never
completes).
P.S. Great work on this library, @alexcrichton & company! I'm excited to use this in my future Rust projects.
Will be UnixStream
supported in futures_mio
?
let tls_handshake = socket.and_then(|socket| {
let mut cx = ClientContext::new().unwrap();
{
let backend_cx = cx.ssl_context_mut();
backend_cx.set_CA_file(ca);
backend_cx.set_certificate_file(crt, X509FileType::PEM);
backend_cx.set_certificate_file(key, X509FileType::PEM);
}
cx.handshake(host_name, socket)
});
I need to use this type
use futures_tls::backend::openssl::x509::X509FileType;
But seems like not everything from backend is exposed.
http://alexcrichton.com/futures-rs/futures_tls/backend/openssl/index.html
Do you think these should be exposed?
I have implemented an alternative implementation of Promise
that provides a cancellation future, but it would be preferable to expose this functionality upstream.
https://github.com/tokio-rs/tokio/blob/master/src/util/future/val.rs#L143
I am not sure if that is the right place to ask this question, so feel free to ignore it.
I have created a proof of concept task system before I even knew about future-rs
. It is not even remotely polished and I haven't touched it in a while. I had a look at future-rs
and I am not sure if it would satisfy my needs.
Let me quickly show how my task system works
fn main() {
let res: Future<i32> = TASK.submit(move || {
println!("Before long running task");
let short_running_task: Future<i32> = TASK.submit(|| 24);
// Submits a long running task
let long_running_task: Future<i32> = TASK.submit(|| {
std::thread::sleep(Duration::from_secs(10));
return 42;
});
// Waits for the short running task to complete, does not block other tasks!
println!("After short running task {}", short_running_task.await());
// Waits for the long running task to complete, does not block other tasks!
println!("After long running task {}", long_running_task.await());
42
});
let v: Vec<Future<()>> = (0 .. 20).map(|i|{
TASK.submit(move || println!("Another Task {}", i))
}).collect();
println!("{}", res.await());
}
which prints
Another Task 2
Another Task 0
Before long running task
Another Task 3
Another Task 5
Another Task 8
Another Task 6
Another Task 17
Another Task 7
Another Task 9
Another Task 1
Another Task 10
Another Task 4
Another Task 13
Another Task 11
Another Task 14
Another Task 12
Another Task 19
Another Task 15
Another Task 16
Another Task 18
After short running task 24
After long running task 42
42
I basically create n-1 local task queues which receives a task, it then wraps that task in a fiber/ coroutine and continuously tries to do some work.
Now I had a very rough look at future-rs
. http://alexcrichton.com/futures-rs/src/futures/src/lib.rs.html#260
And you require a 'static
lifetime, I think that means that I can basically not have references in my task right?
I am currently designing a Vulkan library and I want to be able to do some work on different threads. To satisfy a 'static
would basically mean that I would have to wrap a lot of stuff in Arc
. I made also a reddit post recently about this problem.
The thing is, it complicates the library and I don't think it is actually needed. In my head I want to do something like this
let physical_device = ...;
let ref_physical_device = &physical_device;
let future_something: Future<Something> = TASK.submit(move || {
ref_physical_device.create_something()
};
//Blocks
let something: Something = future_something.await();
The only thing I have to worry about is that the reference lives long enough. But I know that it will live long enough because I am calling await
which will block the main thread or reschedule the fiber (otherwise I could call await in the destructor to make sure that it always lives long enough). I haven't currently implemented this because I wanted to see if future-rs
might also allow this.
What's the reasoning behind using a custom reference counting in Inner in futures-cpupool? The inner is already wrapped in an Arc and sending the close messages on drop can be implemented for Inner itself instead of CpuPool.
In https://github.com/alexcrichton/futures-rs/blob/master/TUTORIAL.md#user-content-send-and-static
As a bonus, these bounds allow for some [interesting optimizations][tailcall].
But there is no URL associated with tailcall
.
(BTW, I know what a tail call optimization is but I don’t see how these bounds enable it. I hope the intended link explains the latter and not just the former :))
Suppose a library wants to do some work on a thread pool and return a future for the result. For concreteness, it might be:
It does not sound like a good idea for each such library to create its own thread pool (CpuPool
). Hence, the library should take the thread pool by dependency injection. This is good for sharing a thread pool among multiple libraries, testing, leaving control in the hands of the library user, etc.
My question is: what type should the library take for the thread pool? Taking a CpuPool
is an option but seems overly restrictive. Does it make sense to have a trait for that, and the library can then take the trait instead of a concrete struct?
For example, in Java the standard method for this is the "executor framework" - particularly the interfaces Executor
, ExecutorService
etc. This was also stolen by python and ruby-concurrency and maybe others.
I was trying to implement a non-http server using future-mio
and the abstractions in io2.rs
look quite useful for any sort of server, and none of its code is specific to http.
I was wondering if there's a plan to extract those outside of minihttp and make other structs like ParseStream
publicly available, or if there's a better suggestion for handling these cases.
Thanks
Memory grows linearly with number of requests, observing about 1.5 K per request in growth.
The futures machinery makes it really hard to look at a heap profiler's output. Here's output from running
valgrind --tool=massif target/release/http
(and compiling with alloc_system
): https://gist.github.com/kamalmarhubi/377504123faef12365ca01e28f1e82ed
The 1.5 K per request is with either jemalloc or system allocator, and is quite linear.
needs more rationale for use cases and such
Dependencies in lots of the sub-libraries are specified as absolute numbers down to the patch. This can pose problems when new versions of the libraries are released or the downstream user needs to use an older one because of various reasons.
For example, in Cargo.toml
of futures-curl
, the version string of curl
should be changed to 0.3
if that works. If not that then ^0.3.3
since 0.3.4
is already out. At the moment I'm unable to compile by specifying 0.3.4
in the Cargo.toml
of my binary because of a version conflict. Similarly, the version of crossbeam
in futures-cpupool
should be changed to ^0.2.10
. Same for scoped-tls
and slab
in futures-mio
. And the Cargo.toml
of futures
itself specifies the version for various sub-libraries as 0.1.0
whereas the sub-libraries reference futures
as 0.1
. I think the former should be changed to 0.1
.
extern crate futures_mio;
extern crate futures;
use std::io;
use futures::*;
use futures_mio::Sender;
fn foo(s: Sender<()>) {
let result: Result<(), io::Error> = Ok(());
done(result)
.map(|x| {
s.send(x);
()
})
.forget();
}
Error is:
tests/channel.rs:16:10: 16:16 error: the trait bound `std::sync::mpsc::Sender<()>: std::marker::Sync` is not satisfied [E0277]
tests/channel.rs:16 .forget();
^~~~~~
tests/channel.rs:16:10: 16:16 help: run `rustc --explain E0277` to see a detailed explanation
tests/channel.rs:16:10: 16:16 note: `std::sync::mpsc::Sender<()>` cannot be shared between threads safely
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `mio::channel::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `futures_mio::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because of the requirements on the impl of `std::marker::Send` for `&futures_mio::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `[closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `std::option::Option<[closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `futures::Map<futures::Done<(), std::io::Error>, [closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]>`
error: aborting due to previous error
I'm working on a small program that reads lines from stdin and passes them off to another function, using a stream. From the tutorial document and the rustdoc, I gather that the way to implement this is to use futures::stream::channel
and spawn a thread that loops through lines from stdin and sends them through the Sender
.
Sender
is consumed when a value is sent, so you must use a future combinator to send another value. It'd be really helpful to show an example of this. From what I can tell, this behavior means that the loop cannot be an iteration, but must use recursion because the logic must be passed as a closure/function to the combinator.
The ergonomics of that aren't ideal, but I'm not even sure I'm going about it the right way because I can't get my program to compile using that approach. I'm also concerned about memory usage. If I'm allocating a string for each line read, then recursively calling a function for the next iteration, will the strings from previous loops stay in memory until the loop finally ends? Or is this optimized somehow?
To explain this in pseudo-Rust, I'd like to do something like this:
fn run() -> Box<Stream<Item = String, Error = ::std::io::Error>{
let (tx, rx) = channel();
spawn(move || {
for line in stdin.lock().lines() {
match line {
Ok(line) => {
tx.send(Ok(line));
}
Err(error) => {
tx.send(Err(error));
}
}
}
});
rx.boxed()
}
But I think I have to do something like this:
fn run(self) -> Box<Stream<Item = String, Error = ::std::io::Error> {
let (tx, rx) = channel();
spawn(move || {
fn dispatch_line(tx: Sender<String, ::std::io::Error>) {
let mut buffer = String::new();
match stdin().read_line(&mut buffer) {
Ok(bytes_read) if bytes_read > 0 => {
tx.send(Ok(buffer)).and_then(|tx| dispatch_line(tx));
}
Err(error) => {
tx.send(Err(Error::Io(error))).and_then(|tx| dispatch_line(tx));
}
}
}
dispatch_line(tx);
});
rx.boxed()
}
I can't get the latter form to compile exactly as written because dispatch_line
should be returning a future, but it's getting ugly quickly, so I'm probably going about this really wrong.
I use CpuPool
to perform parallel computations in streams, here is my snippet of code:
let entires: Vec<DirEntry> = ...
let cpu_pool = CpuPool::new(num_cpus::get() as u32);
let stream = futures::stream::iter(entries.into_iter().map(|entry| Ok(entry)))
.map(|entry| {
cpu_pool.execute(move || {
// - open file using `File::open` - returns `Result`
// - read file using `futures_io::read_to_end` - returns `ReadToEnd` future
// - parse content of a file using `ReadToEnd::and_then` (this is a heavy task)
})
})
.buffered(num_cpus::get() * 2)
.filter(...)
.collect();
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.