Giter Site home page Giter Site logo

async-stream's People

Contributors

anguslees avatar carllerche avatar darrentsung avatar davidpdrsn avatar goffrie avatar ids1024 avatar kestrer avatar kpp avatar luciofranco avatar peng1999 avatar sabrinajewson avatar sergiobenitez avatar taiki-e avatar trevyn avatar vorot93 avatar wolthom avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-stream's Issues

yield inside tokio select! doesn't work

Yielding here inside tokio select! doesn't work. I can bypass the error with a variable but I'm facing a compiler crash with assignments like this

exit = Notification::StreamEnd(e.into());

So I'm mixing yields and assignments to make the code compile.

As the description says, yielding inside select! will be helpful for my use case :)

stream_mut() equivalent to iter_mut()?

Hi, thx for this crate. I was able to easily create an async method stream_many() that accepts an impl IntoIter of indexes and returns an impl Stream of key/val pairs for only those indices. pretty cool.

Next I was wondering if there is any way I can write a method that returns a Stream that can mutate &mut self? eg:

/// Returns a stream that allows modifying each value.
pub fn stream_mut(&mut self) -> StreamMut<'_, T> 

Perhaps with some kind of lending iterator?

or any plans in this area?

Dependency on tokio_test is missing in the crate

Hello,

when I check the Cargo.toml file at v0.3.5 ( https://github.com/tokio-rs/async-stream/blob/v0.3.5/async-stream/Cargo.toml ), I see tokio_test in there:

tokio-test = "0.4"

However, crates.io does not list it: https://crates.io/crates/async-stream/0.3.5/dependencies

When I check the actual crate ( https://crates.io/api/v1/crates/async-stream/0.3.5/download ), the tokio_test is indeed not in the Cargo.toml.orig:

[dev-dependencies]
futures-util = "0.3"
rustversion = "1"
tokio = { version = "1", features = ["full"] }
trybuild = "1"

Sorry if this is a stupid question, I do not know rust much, but how was the crate generated? I would have assume it would match the source code at the git tag.

Thank you

Cannot use "yield" and "?" on the same line

This is my first time using this library, and I'm currently facing this issue. The following code fails to compile:

use async_stream::try_stream;
use futures_core::Stream;

fn test() -> impl Stream<Item = Result<u32, ()>> {
    try_stream! {
        yield Err(())?;
    }
}

With the compiler showing this error:

    |
5   | /     try_stream! {
6   | |         yield Err(())?;
    | |                      ^ cannot use the `?` operator in an async block that returns `()`
7   | |     }
    | |_____- this function should return `Result` or `Option` to accept `?`
    |
    = help: the trait `FromResidual<Result<Infallible, ()>>` is not implemented for `()`

While this code works:

use async_stream::try_stream;
use futures_core::Stream;

fn test() -> impl Stream<Item = Result<u32, ()>> {
    try_stream! {
        let temp = Err(())?;
        yield temp;
    }
}

safe doc(hidden) APIs (async_stream::AsyncStream::new, async_stream::Sender::send, async_stream::pair) allow UB

This playground is an example of invoking UB using only the pub, safe API of this crate (pair(), AsyncStream::new(), and Sender::send()): https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=8cf61ab15c81d7a946cdbf60a1fd4c46

The gist of this is inside of the "generator" passed to AsyncStream::new(receiver, generator), we can construct a Sender, Receiver pair for a different type from the Receiver the AsyncStream is yielding results from. We can use this Sender to send a u8 while generating an AsyncStream, which results in the AsyncStream yielding a String value that causes a segmentation fault when printed.

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (mut sndstr, mut rcvstr) = pair::<String>();
    let stream = AsyncStream::new(rcvstr, async {
        let (mut sndint, mut rcvint) = pair::<u8>();
        let send_fut = sndint.send(5);
        
        // hack to get tokio to wake again after Send::send
        tokio::select! {
            _ = send_fut => {} 
            _ = async {
                for _ in 1..=10 {
                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
                }
            } => {}
        }
    });
    
    for _ in 1..=10 {
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    }

    stream.for_each(|item| {
        println!("about to segfault:");
        println!("item: {item:?}");
        futures::future::ready(())
    }).await;
    
    println!("done");
}

I'm not familiar enough with the crate implementation to say which part should be marked unsafe, but I think this shows at least one of (AsyncStream::new, Sender::send, pair) needs to be marked unsafe.

async_stream_impl 0.3.1 removes stream, try_stream, breaking async_stream =0.3.0

I had async_stream pinned to "=0.3.0", and as a result, ran into the following error:

error[E0432]: unresolved imports `async_stream_impl::stream`, `async_stream_impl::try_stream`
   --> external/raze__async_stream__0_3_0/src/lib.rs:171:29
    |
171 | pub use async_stream_impl::{stream, try_stream};
    |                             ^^^^^^  ^^^^^^^^^^ no `try_stream` in the root
    |                             |
    |                             no `stream` in the root

error: aborting due to previous error

We try to pin our main dependencies, to keep a little more control over how things update. Updating to 0.3.1 for async stream 0.3.1 isn't a problem, but should 0.3.1 have been 0.4?

Box::pin vs pin_mut!(s);

Requiring pin_mut!(s); at the usage site may be efficient, but it is an awkward requirement for users of such API.

I've found that wrapping returned stream in Box::pin makes it "just work". I suspect that for network-bound streams the overhead will be negligible.

Could you mention this alternative in the docs?

Example for an empty stream for stubbing

I'm working on a project where I use async_stream for interfacing with an optional crate. I wish to propose a stub for platforms that don't have support for this crate.

The following naive attempt does not work (the compiler tells me I'd rather use unit type as return value), likely because the lack of a yield does not allow the macro to derive an iterm type:

    pub fn stream(&mut self) -> impl Stream<Item = io::Result<NetEvent>> + '_ {
        try_stream! {
        }
    }

How can I achieve that ?

`let` - `else` expressions don't work with try stream

fn stream() -> impl Stream<Item = Result<u8, ()>>{
  yield 1;

  let Ok(x) = Err("a") else {
    Err(())?;
  };
}
expected !, found ()
rust-analyzer[type-mismatch](https://rust-analyzer.github.io/manual.html#type-mismatch)
`else` clause of `let...else` does not diverge
  expected type `!`
found unit type `()`
try adding a diverging expression, such as `return` or `panic!(..)`

Rustfmt cannot format stream! blocks

Take a simple example like

fn main() {
    let s = async_stream::stream! {
        for i in 0..3 {
            yield i;
        }
    };
}

in a new rust project. Add some spaces to break the indentation somewhere in the macro invocation. Run cargo fmt. See that Rustfmt completely ignores the code inside of the macro.

This already happens in a simple block like

    let s = async_stream::stream! {
         let a = 0u32;
    };

I'm not sure if this is a bug in Rustfmt or something this crate can fix or if there are some Rustfmt options that can be changed to fix this.
My current workaround is to remove the async_stream::stream! part, run Rustfmt, add it back.

`try_stream!` / `stream!` with error close stream

Greetings!

Why Err close stream? I can't use .filter(..) in that case.

use async_stream::{stream, try_stream};
use futures_util::{pin_mut, Stream, StreamExt};
use std::future::ready;

fn error_or_ok(rcx: usize) -> Result<usize, String> {
    if rcx % 2 == 0 {
        Ok(rcx)
    } else {
        Err(format!("Error{rcx}"))
    }
}

fn return_stream() -> impl Stream<Item = Result<usize, String>> {
    //stream! {
    try_stream! {
        let mut rcx = 0usize;
        loop {
            //yield Ok(error_or_ok(rcx)?); // with stream! closes
            //yield {move || { Ok(error_or_ok(rcx)?) }}(); // with stream! ok
            yield error_or_ok(rcx)?; // with try_stream! closes
            rcx += 1;
        }
    }
    .filter(|data| {
        let ret = if let Err(error) = data {
            eprintln!("Got error: `{error}`.");
            false
        } else {
            true
        };

        ready(ret)
    })
}

#[tokio::main]
async fn main() {
    let stream = return_stream();
    pin_mut!(stream);

    println!("Some: `{:?}`", stream.next().await);
    println!("None: `{:?}`", stream.next().await);
    println!("Some: `{:?}`", stream.next().await);
    println!("None: `{:?}`", stream.next().await);
}

Clarification regarding unstable features

The intro reads:

Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation. The stream! macro works without unstable features.

From this wording, I assumed that the stream! macro works on stable while try_stream! requires unstable features. But they both seem to compile fine on stable.

Maybe that interpretation was unintended, in that case the wording could be changed to "this crate works without unstable features" ๐Ÿ™‚

Maybe AsyncStream should explicitly implement Sync

Some consumers of streams expect streams to implement Sync. This is true, for example, of tonic GRPC servers when the result of a method is a stream.

Currently, an AsyncStream type automatically gets Sync if the generator also implicitly implements Sync. But it's easy to write a generator that doesn't implement Sync. Just call any async function on a trait that uses async_trait. The return type of such a function is Pin<Box<dyn Future + Send + 'async>>. Notice it doesn't include + Sync.

My guess is that async_trait is doing the right thing here, not adding unnecessary trait bounds. And I don't know why one would want to access a future simultaneously from multiple threads anyway.

For that matter, I don't know why one would want to access a stream simultaneously from multiple threads. But if I understand Sync correctly, it would be safe to explicitly implement it on AsyncStream (though it would require an unsafe block), because the generator is only ever accessed inside a method that takes a mutable reference to the AsyncStream, so IIUC only one thread at a time should be able to call that method.

Note: I'm still pretty new to Rust, so my understanding may be way off. Currently I'm stuck trying to call a Rusoto function inside a try_stream! block, and I'm getting a compiler error because the resulting generator doesn't implement Sync and tonic needs Sync on the stream.

Experiment with a proc-macro-free API

I've been thinking that it would be possible for this crate to provide an API that doesn't use proc macros at all, which has a couple of benefits:

  • IDEs will be happier
  • Rustfmt will work better (#68)

The API could look like this:

/// async-stream ///

pub fn stream<T, F, Fut>(f: F) -> impl Stream<Item = T>
where
    F: FnOnce(Yielder<T>) -> Fut,
    Fut: Future<Output = ()>,
{ /* ... */ }

pub fn try_stream<T, E, F, Fut>(f: F) -> impl Stream<Item = Result<T, E>>
where
    F: FnOnce(Yielder<T>) -> Fut,
    Fut: Future<Output = Result<(), E>>,
{ /* ... */ }

// This macro will shadow the yielder with a function that borrows from a local.
//
// It will panic if called after the first poll or from a different stream.
#[macro_export]
macro_rules! start_stream {
    // $yielder must be of type Yielder<T>
    ($yielder:ident) => { /* ... */ };
}

/// Usage ///

let stream = async_stream::stream(|yielder| async move {
    // Must be called in the first poll, otherwise the stream will panic
    start_stream!(yielder);
    yielder(1).await;
    yielder(2).await;
    yielder(3).await;
});

I'm pretty sure this would be sound. Ergonomically, we'd lose the nice for await and yield syntax as well as the ability to use ? in regular streams (although users can always use a try_stream and then flatten the results if they want something like that), but we'd also gain the ability to specify the type of stream with turbofish syntax. I think it might be nice to support both versions in the library, depending on users' preferences. Any thoughts on the design?

try_stream is !Send with ? and MutexGuard or try-block

  1. MutexGuard
try_stream! {
  {
    let _guard = mutex.try_lock().unwrap();
    Err(200)?;
  }
  {
    yield 100;
  }
}

Err(200)? is expanded by try_stream! to something like __yield_tx.send(::core::result::Result::Err(200).await;
This makes the stream !Send.

note: future is not `Send` as this value is used across an await

This can be fixed if async-stream captures the error and tx.send in the outermost block.

let result = Result<_,_> = || {
 .. user code ..
};
match result {
  Ok(..) => ..
  Err(err) => __yield_tx.send(..).await;
}
  1. try-block

A workaround is to capture the error in result: Result<..> and emit the error after MutexGuard is dropped.
However, async-stream does not recognize ? in try-blocks.

try_stream! {
  let result = {
    let _guard = mutex.try_lock().unwrap();
    let result: Result<..> = try {
          Err(200)?; // async-stream expands this to `tx.send(..)`.
          Ok(())
    };
    result
  };
  result?;
  {
    yield 100;
  }
}

Please issue a new release

As of this writing, the most recent release is v0.3.3, published in March 2022. Of particular interest are some changes that reduce unsafe usage (d48ec2c) and make the remaining usages easier to audit in code review (e1d440f).

Thanks!

Return a stream conditionally

fn a() -> impl Stream<Item = Result<u8, ()>> {
    try_stream!{
        yield 1;
    }
}

fn b() -> impl Stream<Item = Result<u8, ()>> {
    try_stream!{
        yield 2;
    }
}

fn decide(a: bool) -> impl Stream<Item = Result<u8, ()>> {
    if true {
        a()
    } else {
        b()
    }
}

fn main(){
    decide(false);
}

Will complain that a and b have incompatible types: distinct uses of "impl Trait" result in different opaque types

Boxing them will make me unable to pin them going forward, so what should be the way forward here?

try_stream! does not compile when using few ? operators in a row.

async_stream::try_stream! {
     let a = Ok::<_,  String>(Ok::<_,  String>(123))??;
     for _ in (1..10) {
            yield a;
     }
 }

Compiler is arguing - cannot use the ? operator in an async block that returns (),
After decoupling to:

async_stream::try_stream! {
     let a = Ok::<_,  String>(Ok::<_,  String>(123))?;
     let a = a?;
     for _ in (1..10) {
         yield a;
     }
 }

It compiles.

`try_stream! {foo?}` doesn't convert Error types like regular `foo?`

Given

struct ErrorA(u8);
struct ErrorB(u8);
impl From<ErrorA> for ErrorB {
    fn from(a: ErrorA) -> ErrorB {
        ErrorB(a.0)
    }
}

"Normal" (synchronous Rust) will convert error types:

fn sync() -> Result<&'static str, ErrorB> {
        if true {
            Err(ErrorA(1))?;
        } else {
            Err(ErrorB(2))?;
        }
        Ok("unreachable")
}

But try_stream! does not:

fn async() -> impl Stream<Item = Result<&'static str, ErrorB>>
    try_stream! {
            if true {
                Err(ErrorA(1))?;
            } else {
                Err(ErrorB(2))?;
            }
            yield "unreachable";
    }
}

Gives (some variation of):

   = note: expected type `ErrorA`
              found type `ErrorB`

Re-land #53

#53 was reverted in #55, but I'd like to consider re-landing it in the next major version.

#55 (comment)

#53 contained improvements, but at the same time caused breaking changes.

I tend to accept it at 0.4, but I feel I need some testing to see if it has any other impact.
So for now I'd like to revert it.

Can't use ? if try_stream! wraps select!

I have multiple streams I am joining in a select along with a timeout future that caused me to run into this. I tried to boil it down to a minimal code sample that can trigger the problem. It as follows:

use std::io;
use std::future::Future;
use socket2::{Socket, Domain, Type, Protocol};
use futures::Stream;
use async_stream::try_stream;
use futures::select;
use tokio::time::sleep;
use std::time::Duration;
use futures::FutureExt;
use futures::StreamExt;

fn await_results(
    mut stream1: impl Stream<Item = io::Result<i32>> + Unpin,
) -> impl Stream<Item = io::Result<i32>> {
    try_stream! {
        let mut stream1_future = stream1.next().fuse();
        select!{
            result = stream1_future => {
                stream1_future = stream1.next().fuse();
                match result {
                    Some(x) => {
                        let y = x?;
                        yield y;
                    }
                    None => {

                    }
                }
            }
        }
    }
}


#[tokio::main]
async fn main() -> io::Result<()> {
    Ok(())
}

This fails with the error:

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
  --> src/main.rs:22:33
   |
15 | /     try_stream! {
16 | |         let mut stream1_future = stream1.next().fuse();
17 | |         select!{
18 | |             result = stream1_future => {
...  |
22 | |                         let y = x?;
   | |                                 ^^ cannot use the `?` operator in an async block that returns `()`
...  |
30 | |         }
31 | |     }
   | |_____- this function should return `Result` or `Option` to accept `?`
   |
   = help: the trait `Try` is not implemented for `()`
   = note: required by `from_error`

And I confirmed I'm on 0.3.2 so I have the fix for #27

`#![no_std]` compatibility

Hi! Is it possible for async-stream to have optional compatibility with #![no_std] crates? async-stream seems to have only one similar in idea crate - futures-async-stream, which has the #![no_std] support but, at least in my case, works really bad with macros.

Filter example

How would a filter example look like, e.g. taking the example from the README yielding only odd numbers based on a source stream?

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

/// Stream of only odd numbers
fn odds<S: Stream<Item = u32>>(input: S)
    -> impl Stream<Item = u32>
{
    stream! {
        pin_mut!(input);
        while let Some(value) = input.next().await {
            // ???
        }
    }
}

Mismatched type for $crate::AsyncStreamHack

I'm new to rust and async-stream so pardon me if this is a silly question.
I compiled my program and it said:

error[E0308]: mismatched types
  |
  = note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error; 1 warning emitted

So I re-compile with cargo rustc -- -Z macro-backtrace, this time I get:

error[E0308]: mismatched types
   --> /Users/can/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.2.1/src/lib.rs:222:22
    |
218 | / macro_rules! stream {
219 | |     ($($body:tt)*) => {{
220 | |         let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
221 | |         $crate::AsyncStream::new(__yield_rx, async move {
222 | |             #[derive($crate::AsyncStreamHack)]
    | |                      ^^^^^^^^^^^^^^^^^^^^^^^
    | |                      |
    | |                      expected struct `std::string::String`, found `&str`
    | |                      help: try using a conversion method: `$crate::AsyncStreamHack.to_string()`
    | |                      in this expansion of `stream_0!` (#8)
...   |
227 | |             $crate::dispatch!(($($body)*))
    | |             ------------------------------ in this macro invocation (#2)
228 | |         })
229 | |     }}
230 | | }
    | |_- in this expansion of `async_stream::stream!` (#1)
...
286 |   macro_rules! dispatch {
    |  _-
    | |_|
    | |_|
    | |_|
    | |_|
    | |
287 | |     (() $($bang:tt)*) => {
288 | |         $crate::count!($($bang)*)
    | |         ------------------------- in this macro invocation (#7)
289 | |     };
...   |
297 | |         $crate::dispatch!(($($first)* $($rest)*) $($bang)*)
    | |         --------------------------------------------------- in this macro invocation (#4)
...   |
306 | |         $crate::dispatch!(($($rest)*) $($bang)*)
    | |         ----------------------------------------
    | |         |
    | |         in this macro invocation (#3)
    | |         in this macro invocation (#5)
    | |         in this macro invocation (#6)
307 | |     };
308 | | }
    | | -
    | |_|
    | |_in this expansion of `$crate::dispatch!` (#2)
    | |_in this expansion of `$crate::dispatch!` (#3)
    | |_in this expansion of `$crate::dispatch!` (#4)
    | |_in this expansion of `$crate::dispatch!` (#5)
    |   in this expansion of `$crate::dispatch!` (#6)
...
312 | / macro_rules! count {
313 |       () => {
314 |           stream_0!()
    |           ----------- in this macro invocation (#8)
315 |       };
...
507 |       };
508 | | }
    | |_- in this expansion of `$crate::count!` (#7)

error: aborting due to previous error; 1 warning emitted

For more information about this error, try `rustc --explain E0308`.

Not sure what's going on but it seems to be a bug, please help.

Change Macros to Support Rust Analyzer Autocomplete

When using the stream! or try_stream! macros autocomplete with rust analyzer works fine until a yield statement is written, this is due to the macro no longer expanding if there is any invalid rust (but only once a yield statement has been written). See the rust analyzer issue here explaining in more detail how autocomplete fails when macros fail to expand (specifically for this crate's macros): rust-lang/rust-analyzer#12759

I have no experience writing proc macros, I'm wondering is it possible to change the way the proc macros are written to always expand even if there is a yield statement and the Rust code is invalid? This would allow rust analyzer auto complete to function properly in all cases. I tested and noticed the tokio select! macro seems to consistently expand which allows autocomplete to function normally.

Add example of how to replace "tokio::stream::Stream::poll_next" function with async-strem

Hi, I am trying to migrate a blocking code below to use async-stream but I don't understand what is needed, the examples in the repo are not including a way to implement this macro against the tokio::stream::Stream.

struct StreamBody {
    file: tokio::fs::File,
    buffer: Vec<u8>,
}

impl StreamBody {
    fn new(file: tokio::fs::File, chunk_size: usize) -> Self {
        let buffer = vec![0u8; chunk_size];
        StreamBody {
            file,
            buffer,
        }
    }
}

impl tokio::stream::Stream for StreamBody {
    type Item = Result<Bytes, ResponseError>;
    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        use std::ops::DerefMut;
        let stream = self.deref_mut();
        match stream.file.read(&mut stream.buffer).await {
            Ok(count) => {
                if count > 0 {
                    Poll::Ready(Some(Ok(Bytes::copy_from_slice(&self.buffer[..count]))))
                } else {
                    Poll::Ready(None)
                }
            },
            Err(e) => Poll::Ready(Some(Err(Error::dump_read_failed(e).into())))
        }
    }
}

Could you please describe in the examples how to convert this to use this crate? Thank you!

Type annotations

Often I get compile errors when the compiler cannot deduce the type when using try_stream!:

error[E0698]: type inside `async fn` body must be known in this context
   --> src/server.rs:213:18
    |
213 |       let stream = try_stream! {
    |  __________________^
214 | |         while let Some(chunk) = body.data().await {
215 | |             let chunk = chunk?;
216 | |             trace!("Body: {:?}", &chunk);
...   |
219 | |         }
220 | |     };
    | |_____^ cannot infer type for type parameter `E` declared on the enum `Result`

This can be resolved by moving the code into a function that returns the appropriate generic type. However, is it also possible to put a type annotation somewhere? (impl T doesn't seem to work for annotating the type of a local variable.)

Example for Streaming Borrowed Item

For performance reasons, I'd like to have one reusable bufffer buf: Vec<u8> and yield a &buf[..]. The user needs to drop the reference before another next().await.

Unlike streaming-iterator which only requires &mut self during next(), this is not easy in async-stream because the generator loop keeps holding the mutable reference.

pub struct Reader<R: AsyncRead> {
    r: R,
    buf: Vec<u8>,
}

impl<R: AsyncRead + Unpin> Reader<R> {
    pub fn stream(self: &mut Self) -> impl futures::Stream<Item = Result<&[u8]>> {
        async_stream::try_stream! {
          loop {
            ..
            yield &self.buf[..];
         }
  }
}
error[E0502]: cannot borrow `self.buf` as mutable because it is also borrowed as immutable
  --> src/main.rs:20:9
   |
20 | /         async_stream::try_stream! {
21 | |
22 | |             loop {
...  |
42 | |                 yield &self.buf[..];
   | |                        -------- immutable borrow occurs here
43 | |             }
44 | |         }
   | |         ^
   | |         |
   | |         mutable borrow occurs here
   | |_________lifetime `'1` appears in the type of `__yield_tx`
   |           argument requires that `self.buf` is borrowed for `'1`
   |

It would be great to see an example of yielding borrowed data.

failure::format_err! inside try_stream! strange behavior

If using format_err! macro inside try_stream!:

use failure::*;

pub fn stream() -> impl Stream<Item = failure::Fallible<i32>> + Send {
    async_stream::try_stream! {
        Err(format_err!("adasdasd"))?;
        for i in (1..10) {
            yield 123;
        }
    }
}

It doesn't compile - *mut (dyn std::ops::Fn() + 'static) cannot be shared between threads safely.
But when using it without format_err!:

use failure::*;

#[derive(Debug, Fail)]
#[fail(display = "my error")]
pub struct MyError;

pub fn stream() -> impl Stream<Item = Fallible<i32>> + Send {
    async_stream::try_stream! {
        Err(MyError)?;
        for i in (1..10) {
            yield 123;
        }
    }
}

Compiler says all Ok.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.