Giter Site home page Giter Site logo

Comments (19)

alexcrichton avatar alexcrichton commented on June 12, 2024 4

Thanks for the report! The API of channel I do agree isn't the most ergonomic right now. The intention is to model backpressure, so the sender can't send too many messages until the consumer has caught up. I think also the best solution here would indeed be to just solve this at the mio layer, but failing that channel is perhaps the best way to go for now.

The latter example of yours unfortunately may not quite behave as expected, though. Functions and closures in part of and_then are run as part of poll, which is intended to return quickly, but unfortunately this implementation may block (the read_line). Additionally, right now after the send plus and_then the futures are dropped which means that execution won't actually happen. You'll need to tag futures with .forget() to ensure they execute.

I believe though, that this should work for you?

extern crate futures;
extern crate futures_cpupool;

use std::io::{self, BufRead};

use futures::{Future, BoxFuture};
use futures::stream::{Stream, BoxStream, Sender, channel};
use futures_cpupool::CpuPool;

fn stdin() -> BoxStream<String, io::Error> {
    let pool = CpuPool::new(1);
    let (tx, rx) = channel();
    read_line(tx, pool).forget();
    return rx.boxed();

    fn read_line(tx: Sender<String, io::Error>,
                 pool: CpuPool) -> BoxFuture<(), io::Error> {
        let line = pool.execute(|| {
            let input = io::stdin();
            let mut lines = input.lock().lines();
            lines.next().unwrap()
        }).map_err(|_| io::Error::new(io::ErrorKind::Other, "panic"));

        let tx = line.and_then(|line| {
            tx.send(line)
              .map_err(|_| io::Error::new(io::ErrorKind::Other, "send error"))
        });

        tx.and_then(|tx| {
            read_line(tx, pool)
        }).boxed()
    }
}

That creates a sender which executes all of the blocking reads in a separate "thread pool", and then the next line isn't read until the first has been received and the consumers is ready for another.

Does that make sense?

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024 2

Ok I've added await in 1f51080 to the Future and Stream traits, and otherwise I'm going to close this in favor of tokio-rs/mio#321. Note that my example from before can now be written as:

extern crate futures;

use std::io::{self, BufRead};
use std::thread;

use futures::Future;
use futures::stream::{Stream, BoxStream, channel};

fn stdin() -> BoxStream<String, io::Error> {
    let (mut tx, rx) = channel();
    thread::spawn(move || {
        let input = io::stdin();
        for line in input.lock().lines() {
            match tx.send(line).wait() {
                Ok(s) => tx = s,
                Err(_) => break,
            }
        }
    });
    return rx.boxed();
}

@Stebalien Hm I'm curious about the avoidance of the name await? I would expect this to have a clear analog to async/await which is essentially what futures are compiling to?

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024 1

Really, there needs to be a way to read lines from stdin as a stream. There's probably some way to do this but I couldn't find it. That is, futures_io::stdin().lines() -> impl Stream<Item=String, Error=io::Error>.

Also, there should be a way to feed a Stream into a Sender. But, if you have a Stream, you don't really need a channel.

Finally, it might be useful to have a some form of y-combinator to make recursion simpler but streams really are the correct answer here.

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024 1

@jimmycuadra try:

return rx.then(|e| e.unwrap()).boxed()

That unwrap can never fail because the receiver will never receive an error (we need to improve that), but that should get back to the original signature.

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

Actually, Sender could implement Stream like so:

let (tx, rx) = channel();
// `put` is basically a bucket.
tx.for_each(|put| put(Ok("thing"))).forget(); // if we drop instead of forgetting, it closes the stream.
rx.take(3).collect().map(|v| assert!(v, vec!["thing", "thing", "thing"]));

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024

@Stebalien oh that's a neat idea! I'd have to think on it but seems plausible at least

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

After thinking about this a bit, I think the real problem here (ignoring the fact that there should be some async way to read from stdin) is the sync/async interface. That is, there needs to be a way to interface between async code and sync code. IMO, the correct way to do this is some form of wait function (e.g. #20). That would allow one to write to and read from a channel both synchronously and asynchronously.


The problem of feeding a channel with function is better addressed by, well, not using a channel:

stream::generate(|| /* returns some promise */)
// or, with existing features (yuck)
stream::iter(iter::repeat(())).and_then(|_|  /* return some promise */)

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024

Hm I'm not sure I quite follow at this point, is there actually a bug here? Should this be closed?

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

There are two missing features:

  1. A way to read from stdin asynchronously (dependent on the missing mio feature).
  2. A way to easily communicate between synchronous and asynchronous code. That is, there needs to be a way to write to a channel synchronously and read from a channel synchronously from "worker" threads.

from futures-rs.

jimmycuadra avatar jimmycuadra commented on June 12, 2024

We could close this and open two separate tracking issues for those points, if it'd be easier to grok/manage.

from futures-rs.

carllerche avatar carllerche commented on June 12, 2024

Jumping in, I don't know how to solve 1) because STDIN is global and would require globally switching the fd to non-blocking.

currently you can work around this on *nix platforms by manually switching the fd to non-blocking and then using it w/ EventedFd. However, I don't know the best way to provide a portable strategy that works w/ windows as well.

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

It's doable on windows but potentially racy. You can check if there are any pending console events and then read them but if someone else tries to read stdin in-between the check and the read, you'll end up blocking...

from futures-rs.

jcsoo avatar jcsoo commented on June 12, 2024

If STDIN has been redirected to a file, I believe you end up in a situation where read() may block even if you set the fd to non-blocking: http://www.remlab.net/op/nonblock.shtml

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024

Ok, let's defer the asynchronous stdin to tokio-rs/mio#321 (agreed it's likely very tricky, I remember reams of code to handle this in libuv+libstd back in the day).

I'd like to drill more into "a way to easily communicate between synchronous and asynchronous code", though, to understand what exactly we'd need here. @Stebalien are you basically thinking of a function like await (#20) which resolves a future? Or do you have more bits and pieces you think is necessary?

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

@jcsoo Not quite the same thing. Reading from a file will block when there is data available to read but the OS is busy but won't block otherwise.

@alexcrichton Yes (and maybe a simple way to convert a stream to a blocking iterator). However, I wouldn't call it await because, in most languages, await yields to the event loop (doesn't block).

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024

Hm yeah that's true, turning a stream into an iterator would also be nice. Shouldn't be too hard to do though with a similar implementation!

from futures-rs.

Stebalien avatar Stebalien commented on June 12, 2024

Hm I'm curious about the avoidance of the name await? I would expect this to have a clear analog to async/await which is essentially what futures are compiling to?

Yes but, in most languages, await yields to the event loop while here await blocks. I'm worried that someone will think let value = future.await() is be equivalent to let value = await future; (doesn't exist in rust but this is the syntax one uses in python).

from futures-rs.

alexcrichton avatar alexcrichton commented on June 12, 2024

Hm yeah that's a good point! I've renamed to wait and Wait to hopefully head off that confusion.

from futures-rs.

jimmycuadra avatar jimmycuadra commented on June 12, 2024

Finally getting back to this.

@alexcrichton, how would your last code example change with the deprecation of futures::stream::channel in favor of futures::sync::mpsc::channel? They don't seem to be identical. This code:

extern crate futures;

use std::io::{self, BufRead};
use std::thread;

use futures::{Future, Sink, Stream};
use futures::stream::BoxStream;
use futures::sync::mpsc::channel;

fn stdin() -> BoxStream<String, io::Error> {
    let (mut tx, rx) = channel(1);
    thread::spawn(move || {
        let input = io::stdin();
        for line in input.lock().lines() {
            match tx.send(line).wait() {
                Ok(s) => tx = s,
                Err(_) => break,
            }
        }
    });
    return rx.boxed();
}

fn main() {
    stdin().for_each(|string| {
        println!("{}", string);
        Ok(())
    });
}

results in this error:

error[E0308]: mismatched types
  --> src/main.rs:21:12
   |
21 |     return rx.boxed();
   |            ^^^^^^^^^^ expected struct `std::io::Error`, found ()
   |
   = note: expected type `Box<futures::Stream<Error=std::io::Error, Item=std::string::String> + std::marker::Send + 'static>`
   = note:    found type `Box<futures::Stream<Error=(), Item=std::result::Result<std::string::String, std::io::Error>> + std::marker::Send + 'static>`

If I change the line causing the error to:

return rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "boom")).boxed();

then I get:

error[E0308]: mismatched types
  --> src/main.rs:21:12
   |
21 |     return rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "boom")).boxed();
   |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::string::String`, found enum `std::result::Result`
   |
   = note: expected type `Box<futures::Stream<Error=std::io::Error, Item=std::string::String> + std::marker::Send + 'static>`
   = note:    found type `Box<futures::Stream<Error=std::io::Error, Item=std::result::Result<std::string::String, std::io::Error>> + std::marker::Send + 'static>`

I don't understand futures well enough to understand why that change makes the concrete type of Stream::Item change. I only transformed Stream::Error to be the expected type, why would that change Stream::Item?

If I then change line 15 to unwrap the line from stdin so that a String is passed to tx.send instead of a Result:

match tx.send(line.unwrap()).wait() {

Then the program compiles, but with a warning:

warning: unused result which must be used: streams do nothing unless polled, #[warn(unused_must_use)] on by default
  --> src/main.rs:25:5
   |
25 |       stdin().for_each(|string| {
   |  _____^ starting here...
26 | |         println!("{}", string);
27 | |         Ok(())
28 | |     });
   | |_______^ ...ending here

But of course I don't want to unwrap the line, I want to handle possible errors correctly.

I don't think there are any bugs here, it's just me not being able to grok futures well enough yet.

from futures-rs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.