Comments (19)
Thanks for the report! The API of channel
I do agree isn't the most ergonomic right now. The intention is to model backpressure, so the sender can't send too many messages until the consumer has caught up. I think also the best solution here would indeed be to just solve this at the mio layer, but failing that channel
is perhaps the best way to go for now.
The latter example of yours unfortunately may not quite behave as expected, though. Functions and closures in part of and_then
are run as part of poll
, which is intended to return quickly, but unfortunately this implementation may block (the read_line
). Additionally, right now after the send
plus and_then
the futures are dropped which means that execution won't actually happen. You'll need to tag futures with .forget()
to ensure they execute.
I believe though, that this should work for you?
extern crate futures;
extern crate futures_cpupool;
use std::io::{self, BufRead};
use futures::{Future, BoxFuture};
use futures::stream::{Stream, BoxStream, Sender, channel};
use futures_cpupool::CpuPool;
fn stdin() -> BoxStream<String, io::Error> {
let pool = CpuPool::new(1);
let (tx, rx) = channel();
read_line(tx, pool).forget();
return rx.boxed();
fn read_line(tx: Sender<String, io::Error>,
pool: CpuPool) -> BoxFuture<(), io::Error> {
let line = pool.execute(|| {
let input = io::stdin();
let mut lines = input.lock().lines();
lines.next().unwrap()
}).map_err(|_| io::Error::new(io::ErrorKind::Other, "panic"));
let tx = line.and_then(|line| {
tx.send(line)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "send error"))
});
tx.and_then(|tx| {
read_line(tx, pool)
}).boxed()
}
}
That creates a sender which executes all of the blocking reads in a separate "thread pool", and then the next line isn't read until the first has been received and the consumers is ready for another.
Does that make sense?
from futures-rs.
Ok I've added await
in 1f51080 to the Future
and Stream
traits, and otherwise I'm going to close this in favor of tokio-rs/mio#321. Note that my example from before can now be written as:
extern crate futures;
use std::io::{self, BufRead};
use std::thread;
use futures::Future;
use futures::stream::{Stream, BoxStream, channel};
fn stdin() -> BoxStream<String, io::Error> {
let (mut tx, rx) = channel();
thread::spawn(move || {
let input = io::stdin();
for line in input.lock().lines() {
match tx.send(line).wait() {
Ok(s) => tx = s,
Err(_) => break,
}
}
});
return rx.boxed();
}
@Stebalien Hm I'm curious about the avoidance of the name await
? I would expect this to have a clear analog to async/await which is essentially what futures are compiling to?
from futures-rs.
Really, there needs to be a way to read lines from stdin as a stream. There's probably some way to do this but I couldn't find it. That is, futures_io::stdin().lines() -> impl Stream<Item=String, Error=io::Error>
.
Also, there should be a way to feed a Stream
into a Sender
. But, if you have a Stream
, you don't really need a channel.
Finally, it might be useful to have a some form of y-combinator to make recursion simpler but streams really are the correct answer here.
from futures-rs.
@jimmycuadra try:
return rx.then(|e| e.unwrap()).boxed()
That unwrap
can never fail because the receiver will never receive an error (we need to improve that), but that should get back to the original signature.
from futures-rs.
Actually, Sender
could implement Stream
like so:
let (tx, rx) = channel();
// `put` is basically a bucket.
tx.for_each(|put| put(Ok("thing"))).forget(); // if we drop instead of forgetting, it closes the stream.
rx.take(3).collect().map(|v| assert!(v, vec!["thing", "thing", "thing"]));
from futures-rs.
@Stebalien oh that's a neat idea! I'd have to think on it but seems plausible at least
from futures-rs.
After thinking about this a bit, I think the real problem here (ignoring the fact that there should be some async way to read from stdin) is the sync/async interface. That is, there needs to be a way to interface between async code and sync code. IMO, the correct way to do this is some form of wait function (e.g. #20). That would allow one to write to and read from a channel both synchronously and asynchronously.
The problem of feeding a channel with function is better addressed by, well, not using a channel:
stream::generate(|| /* returns some promise */)
// or, with existing features (yuck)
stream::iter(iter::repeat(())).and_then(|_| /* return some promise */)
from futures-rs.
Hm I'm not sure I quite follow at this point, is there actually a bug here? Should this be closed?
from futures-rs.
There are two missing features:
- A way to read from stdin asynchronously (dependent on the missing mio feature).
- A way to easily communicate between synchronous and asynchronous code. That is, there needs to be a way to write to a channel synchronously and read from a channel synchronously from "worker" threads.
from futures-rs.
We could close this and open two separate tracking issues for those points, if it'd be easier to grok/manage.
from futures-rs.
Jumping in, I don't know how to solve 1) because STDIN is global and would require globally switching the fd to non-blocking.
currently you can work around this on *nix platforms by manually switching the fd to non-blocking and then using it w/ EventedFd
. However, I don't know the best way to provide a portable strategy that works w/ windows as well.
from futures-rs.
It's doable on windows but potentially racy. You can check if there are any pending console events and then read them but if someone else tries to read stdin in-between the check and the read, you'll end up blocking...
from futures-rs.
If STDIN has been redirected to a file, I believe you end up in a situation where read() may block even if you set the fd to non-blocking: http://www.remlab.net/op/nonblock.shtml
from futures-rs.
Ok, let's defer the asynchronous stdin to tokio-rs/mio#321 (agreed it's likely very tricky, I remember reams of code to handle this in libuv+libstd back in the day).
I'd like to drill more into "a way to easily communicate between synchronous and asynchronous code", though, to understand what exactly we'd need here. @Stebalien are you basically thinking of a function like await
(#20) which resolves a future? Or do you have more bits and pieces you think is necessary?
from futures-rs.
@jcsoo Not quite the same thing. Reading from a file will block when there is data available to read but the OS is busy but won't block otherwise.
@alexcrichton Yes (and maybe a simple way to convert a stream to a blocking iterator). However, I wouldn't call it await
because, in most languages, await yields to the event loop (doesn't block).
from futures-rs.
Hm yeah that's true, turning a stream into an iterator would also be nice. Shouldn't be too hard to do though with a similar implementation!
from futures-rs.
Hm I'm curious about the avoidance of the name await? I would expect this to have a clear analog to async/await which is essentially what futures are compiling to?
Yes but, in most languages, await
yields to the event loop while here await
blocks. I'm worried that someone will think let value = future.await()
is be equivalent to let value = await future;
(doesn't exist in rust but this is the syntax one uses in python).
from futures-rs.
Hm yeah that's a good point! I've renamed to wait
and Wait
to hopefully head off that confusion.
from futures-rs.
Finally getting back to this.
@alexcrichton, how would your last code example change with the deprecation of futures::stream::channel
in favor of futures::sync::mpsc::channel
? They don't seem to be identical. This code:
extern crate futures;
use std::io::{self, BufRead};
use std::thread;
use futures::{Future, Sink, Stream};
use futures::stream::BoxStream;
use futures::sync::mpsc::channel;
fn stdin() -> BoxStream<String, io::Error> {
let (mut tx, rx) = channel(1);
thread::spawn(move || {
let input = io::stdin();
for line in input.lock().lines() {
match tx.send(line).wait() {
Ok(s) => tx = s,
Err(_) => break,
}
}
});
return rx.boxed();
}
fn main() {
stdin().for_each(|string| {
println!("{}", string);
Ok(())
});
}
results in this error:
error[E0308]: mismatched types
--> src/main.rs:21:12
|
21 | return rx.boxed();
| ^^^^^^^^^^ expected struct `std::io::Error`, found ()
|
= note: expected type `Box<futures::Stream<Error=std::io::Error, Item=std::string::String> + std::marker::Send + 'static>`
= note: found type `Box<futures::Stream<Error=(), Item=std::result::Result<std::string::String, std::io::Error>> + std::marker::Send + 'static>`
If I change the line causing the error to:
return rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "boom")).boxed();
then I get:
error[E0308]: mismatched types
--> src/main.rs:21:12
|
21 | return rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "boom")).boxed();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::string::String`, found enum `std::result::Result`
|
= note: expected type `Box<futures::Stream<Error=std::io::Error, Item=std::string::String> + std::marker::Send + 'static>`
= note: found type `Box<futures::Stream<Error=std::io::Error, Item=std::result::Result<std::string::String, std::io::Error>> + std::marker::Send + 'static>`
I don't understand futures well enough to understand why that change makes the concrete type of Stream::Item
change. I only transformed Stream::Error
to be the expected type, why would that change Stream::Item
?
If I then change line 15 to unwrap the line from stdin so that a String
is passed to tx.send
instead of a Result
:
match tx.send(line.unwrap()).wait() {
Then the program compiles, but with a warning:
warning: unused result which must be used: streams do nothing unless polled, #[warn(unused_must_use)] on by default
--> src/main.rs:25:5
|
25 | stdin().for_each(|string| {
| _____^ starting here...
26 | | println!("{}", string);
27 | | Ok(())
28 | | });
| |_______^ ...ending here
But of course I don't want to unwrap the line, I want to handle possible errors correctly.
I don't think there are any bugs here, it's just me not being able to grok futures well enough yet.
from futures-rs.
Related Issues (20)
- Analogue of .last() method
- parse error in `select!`/`select_biased!` macro HOT 2
- Error on OSX by futures-executir HOT 1
- Consider removing ArcWake, re-export std::task::Wake HOT 3
- Fine-tune the Ordering for num_senders
- Feature Request: make FuturesUnordered splitable.
- `FuturesUnordered` guaranties
- Unbounded memory use of `futures::channel::mpsc` with `SinkExt::feed`
- Question: futures-rs::channels implement Send trait HOT 1
- Non-send future produced by chaining `Stream` combinators HOT 1
- Behavior of any() / all() / try_any() / try_all() is not documented for empty stream
- Feature request: add `StreamExt::eq` like `Iterator::eq`
- Reusing `AbortRegistration`
- ConcurrentStream usage with tokio leads to ACCESS_VIOLATION HOT 3
- [Discussion] `Shared` seems to wake up the same waker that was polling it HOT 1
- Add `StreamExt::map_while`
- io: impl AsyncWrite for Empty
- Stream & Sink Error types could benefit from core::fmt::Debug bound HOT 1
- implement OwnedMappedMutexGuard
- `StreamExt::scan` lacks a non-Option version
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from futures-rs.