tokio-rs / async-stream Goto Github PK
View Code? Open in Web Editor NEWAsynchronous streams for Rust using async & await notation
License: Other
Asynchronous streams for Rust using async & await notation
License: Other
Yielding here inside tokio select!
doesn't work. I can bypass the error with a variable but I'm facing a compiler crash with assignments like this
exit = Notification::StreamEnd(e.into());
So I'm mixing yields and assignments to make the code compile.
As the description says, yielding inside select!
will be helpful for my use case :)
Hi, thx for this crate. I was able to easily create an async method stream_many() that accepts an impl IntoIter of indexes and returns an impl Stream of key/val pairs for only those indices. pretty cool.
Next I was wondering if there is any way I can write a method that returns a Stream that can mutate &mut self
? eg:
/// Returns a stream that allows modifying each value.
pub fn stream_mut(&mut self) -> StreamMut<'_, T>
Perhaps with some kind of lending iterator?
or any plans in this area?
Hello,
when I check the Cargo.toml file at v0.3.5 ( https://github.com/tokio-rs/async-stream/blob/v0.3.5/async-stream/Cargo.toml ), I see tokio_test in there:
tokio-test = "0.4"
However, crates.io does not list it: https://crates.io/crates/async-stream/0.3.5/dependencies
When I check the actual crate ( https://crates.io/api/v1/crates/async-stream/0.3.5/download ), the tokio_test is indeed not in the Cargo.toml.orig:
[dev-dependencies]
futures-util = "0.3"
rustversion = "1"
tokio = { version = "1", features = ["full"] }
trybuild = "1"
Sorry if this is a stupid question, I do not know rust much, but how was the crate generated? I would have assume it would match the source code at the git tag.
Thank you
This is my first time using this library, and I'm currently facing this issue. The following code fails to compile:
use async_stream::try_stream;
use futures_core::Stream;
fn test() -> impl Stream<Item = Result<u32, ()>> {
try_stream! {
yield Err(())?;
}
}
With the compiler showing this error:
|
5 | / try_stream! {
6 | | yield Err(())?;
| | ^ cannot use the `?` operator in an async block that returns `()`
7 | | }
| |_____- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `FromResidual<Result<Infallible, ()>>` is not implemented for `()`
While this code works:
use async_stream::try_stream;
use futures_core::Stream;
fn test() -> impl Stream<Item = Result<u32, ()>> {
try_stream! {
let temp = Err(())?;
yield temp;
}
}
This playground is an example of invoking UB using only the pub, safe API of this crate (pair()
, AsyncStream::new()
, and Sender::send()
): https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=8cf61ab15c81d7a946cdbf60a1fd4c46
The gist of this is inside of the "generator" passed to AsyncStream::new(receiver, generator)
, we can construct a Sender
, Receiver
pair for a different type from the Receiver the AsyncStream is yielding results from. We can use this Sender to send
a u8 while generating an AsyncStream, which results in the AsyncStream yielding a String value that causes a segmentation fault when printed.
#[tokio::main(flavor = "current_thread")]
async fn main() {
let (mut sndstr, mut rcvstr) = pair::<String>();
let stream = AsyncStream::new(rcvstr, async {
let (mut sndint, mut rcvint) = pair::<u8>();
let send_fut = sndint.send(5);
// hack to get tokio to wake again after Send::send
tokio::select! {
_ = send_fut => {}
_ = async {
for _ in 1..=10 {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
} => {}
}
});
for _ in 1..=10 {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
stream.for_each(|item| {
println!("about to segfault:");
println!("item: {item:?}");
futures::future::ready(())
}).await;
println!("done");
}
I'm not familiar enough with the crate implementation to say which part should be marked unsafe
, but I think this shows at least one of (AsyncStream::new
, Sender::send
, pair
) needs to be marked unsafe
.
I had async_stream pinned to "=0.3.0", and as a result, ran into the following error:
error[E0432]: unresolved imports `async_stream_impl::stream`, `async_stream_impl::try_stream`
--> external/raze__async_stream__0_3_0/src/lib.rs:171:29
|
171 | pub use async_stream_impl::{stream, try_stream};
| ^^^^^^ ^^^^^^^^^^ no `try_stream` in the root
| |
| no `stream` in the root
error: aborting due to previous error
We try to pin our main dependencies, to keep a little more control over how things update. Updating to 0.3.1 for async stream 0.3.1 isn't a problem, but should 0.3.1 have been 0.4?
Requiring pin_mut!(s);
at the usage site may be efficient, but it is an awkward requirement for users of such API.
I've found that wrapping returned stream in Box::pin
makes it "just work". I suspect that for network-bound streams the overhead will be negligible.
Could you mention this alternative in the docs?
Proc macros in expression position have been stable for a couple releases now, it shouldn't be necessary to use hacks anymore.
I'm working on a project where I use async_stream
for interfacing with an optional crate. I wish to propose a stub for platforms that don't have support for this crate.
The following naive attempt does not work (the compiler tells me I'd rather use unit type as return value), likely because the lack of a yield
does not allow the macro to derive an iterm type:
pub fn stream(&mut self) -> impl Stream<Item = io::Result<NetEvent>> + '_ {
try_stream! {
}
}
How can I achieve that ?
fn stream() -> impl Stream<Item = Result<u8, ()>>{
yield 1;
let Ok(x) = Err("a") else {
Err(())?;
};
}
expected !, found ()
rust-analyzer[type-mismatch](https://rust-analyzer.github.io/manual.html#type-mismatch)
`else` clause of `let...else` does not diverge
expected type `!`
found unit type `()`
try adding a diverging expression, such as `return` or `panic!(..)`
Take a simple example like
fn main() {
let s = async_stream::stream! {
for i in 0..3 {
yield i;
}
};
}
in a new rust project. Add some spaces to break the indentation somewhere in the macro invocation. Run cargo fmt
. See that Rustfmt completely ignores the code inside of the macro.
This already happens in a simple block like
let s = async_stream::stream! {
let a = 0u32;
};
I'm not sure if this is a bug in Rustfmt or something this crate can fix or if there are some Rustfmt options that can be changed to fix this.
My current workaround is to remove the async_stream::stream!
part, run Rustfmt, add it back.
Greetings!
Why Err
close stream? I can't use .filter(..)
in that case.
use async_stream::{stream, try_stream};
use futures_util::{pin_mut, Stream, StreamExt};
use std::future::ready;
fn error_or_ok(rcx: usize) -> Result<usize, String> {
if rcx % 2 == 0 {
Ok(rcx)
} else {
Err(format!("Error{rcx}"))
}
}
fn return_stream() -> impl Stream<Item = Result<usize, String>> {
//stream! {
try_stream! {
let mut rcx = 0usize;
loop {
//yield Ok(error_or_ok(rcx)?); // with stream! closes
//yield {move || { Ok(error_or_ok(rcx)?) }}(); // with stream! ok
yield error_or_ok(rcx)?; // with try_stream! closes
rcx += 1;
}
}
.filter(|data| {
let ret = if let Err(error) = data {
eprintln!("Got error: `{error}`.");
false
} else {
true
};
ready(ret)
})
}
#[tokio::main]
async fn main() {
let stream = return_stream();
pin_mut!(stream);
println!("Some: `{:?}`", stream.next().await);
println!("None: `{:?}`", stream.next().await);
println!("Some: `{:?}`", stream.next().await);
println!("None: `{:?}`", stream.next().await);
}
The intro reads:
Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation. The stream! macro works without unstable features.
From this wording, I assumed that the stream!
macro works on stable while try_stream!
requires unstable features. But they both seem to compile fine on stable.
Maybe that interpretation was unintended, in that case the wording could be changed to "this crate works without unstable features" ๐
Master is on 0.3.0-alpha.18
of futures-preview
but the released version is on 0.3.0-alpha.17
.
Some consumers of streams expect streams to implement Sync
. This is true, for example, of tonic GRPC servers when the result of a method is a stream.
Currently, an AsyncStream
type automatically gets Sync
if the generator also implicitly implements Sync
. But it's easy to write a generator that doesn't implement Sync
. Just call any async function on a trait that uses async_trait
. The return type of such a function is Pin<Box<dyn Future + Send + 'async>>
. Notice it doesn't include + Sync
.
My guess is that async_trait
is doing the right thing here, not adding unnecessary trait bounds. And I don't know why one would want to access a future simultaneously from multiple threads anyway.
For that matter, I don't know why one would want to access a stream simultaneously from multiple threads. But if I understand Sync
correctly, it would be safe to explicitly implement it on AsyncStream
(though it would require an unsafe block), because the generator is only ever accessed inside a method that takes a mutable reference to the AsyncStream
, so IIUC only one thread at a time should be able to call that method.
Note: I'm still pretty new to Rust, so my understanding may be way off. Currently I'm stuck trying to call a Rusoto function inside a try_stream!
block, and I'm getting a compiler error because the resulting generator doesn't implement Sync
and tonic needs Sync
on the stream.
I've been thinking that it would be possible for this crate to provide an API that doesn't use proc macros at all, which has a couple of benefits:
The API could look like this:
/// async-stream ///
pub fn stream<T, F, Fut>(f: F) -> impl Stream<Item = T>
where
F: FnOnce(Yielder<T>) -> Fut,
Fut: Future<Output = ()>,
{ /* ... */ }
pub fn try_stream<T, E, F, Fut>(f: F) -> impl Stream<Item = Result<T, E>>
where
F: FnOnce(Yielder<T>) -> Fut,
Fut: Future<Output = Result<(), E>>,
{ /* ... */ }
// This macro will shadow the yielder with a function that borrows from a local.
//
// It will panic if called after the first poll or from a different stream.
#[macro_export]
macro_rules! start_stream {
// $yielder must be of type Yielder<T>
($yielder:ident) => { /* ... */ };
}
/// Usage ///
let stream = async_stream::stream(|yielder| async move {
// Must be called in the first poll, otherwise the stream will panic
start_stream!(yielder);
yielder(1).await;
yielder(2).await;
yielder(3).await;
});
I'm pretty sure this would be sound. Ergonomically, we'd lose the nice for await
and yield
syntax as well as the ability to use ?
in regular streams (although users can always use a try_stream
and then flatten the results if they want something like that), but we'd also gain the ability to specify the type of stream with turbofish syntax. I think it might be nice to support both versions in the library, depending on users' preferences. Any thoughts on the design?
try_stream! {
{
let _guard = mutex.try_lock().unwrap();
Err(200)?;
}
{
yield 100;
}
}
Err(200)?
is expanded by try_stream! to something like __yield_tx.send(::core::result::Result::Err(200).await;
This makes the stream !Send
.
note: future is not `Send` as this value is used across an await
This can be fixed if async-stream captures the error and tx.send
in the outermost block.
let result = Result<_,_> = || {
.. user code ..
};
match result {
Ok(..) => ..
Err(err) => __yield_tx.send(..).await;
}
A workaround is to capture the error in result: Result<..>
and emit the error after MutexGuard
is dropped.
However, async-stream does not recognize ?
in try-blocks.
try_stream! {
let result = {
let _guard = mutex.try_lock().unwrap();
let result: Result<..> = try {
Err(200)?; // async-stream expands this to `tx.send(..)`.
Ok(())
};
result
};
result?;
{
yield 100;
}
}
fn a() -> impl Stream<Item = Result<u8, ()>> {
try_stream!{
yield 1;
}
}
fn b() -> impl Stream<Item = Result<u8, ()>> {
try_stream!{
yield 2;
}
}
fn decide(a: bool) -> impl Stream<Item = Result<u8, ()>> {
if true {
a()
} else {
b()
}
}
fn main(){
decide(false);
}
Will complain that a
and b
have incompatible types: distinct uses of "impl Trait" result in different opaque types
Boxing them will make me unable to pin them going forward, so what should be the way forward here?
async_stream::try_stream! {
let a = Ok::<_, String>(Ok::<_, String>(123))??;
for _ in (1..10) {
yield a;
}
}
Compiler is arguing - cannot use the ?
operator in an async block that returns ()
,
After decoupling to:
async_stream::try_stream! {
let a = Ok::<_, String>(Ok::<_, String>(123))?;
let a = a?;
for _ in (1..10) {
yield a;
}
}
It compiles.
Given
struct ErrorA(u8);
struct ErrorB(u8);
impl From<ErrorA> for ErrorB {
fn from(a: ErrorA) -> ErrorB {
ErrorB(a.0)
}
}
"Normal" (synchronous Rust) will convert error types:
fn sync() -> Result<&'static str, ErrorB> {
if true {
Err(ErrorA(1))?;
} else {
Err(ErrorB(2))?;
}
Ok("unreachable")
}
But try_stream!
does not:
fn async() -> impl Stream<Item = Result<&'static str, ErrorB>>
try_stream! {
if true {
Err(ErrorA(1))?;
} else {
Err(ErrorB(2))?;
}
yield "unreachable";
}
}
Gives (some variation of):
= note: expected type `ErrorA`
found type `ErrorB`
I have multiple streams I am joining in a select along with a timeout future that caused me to run into this. I tried to boil it down to a minimal code sample that can trigger the problem. It as follows:
use std::io;
use std::future::Future;
use socket2::{Socket, Domain, Type, Protocol};
use futures::Stream;
use async_stream::try_stream;
use futures::select;
use tokio::time::sleep;
use std::time::Duration;
use futures::FutureExt;
use futures::StreamExt;
fn await_results(
mut stream1: impl Stream<Item = io::Result<i32>> + Unpin,
) -> impl Stream<Item = io::Result<i32>> {
try_stream! {
let mut stream1_future = stream1.next().fuse();
select!{
result = stream1_future => {
stream1_future = stream1.next().fuse();
match result {
Some(x) => {
let y = x?;
yield y;
}
None => {
}
}
}
}
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
Ok(())
}
This fails with the error:
error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
--> src/main.rs:22:33
|
15 | / try_stream! {
16 | | let mut stream1_future = stream1.next().fuse();
17 | | select!{
18 | | result = stream1_future => {
... |
22 | | let y = x?;
| | ^^ cannot use the `?` operator in an async block that returns `()`
... |
30 | | }
31 | | }
| |_____- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `Try` is not implemented for `()`
= note: required by `from_error`
And I confirmed I'm on 0.3.2 so I have the fix for #27
Hi! Is it possible for async-stream
to have optional compatibility with #![no_std]
crates? async-stream
seems to have only one similar in idea crate - futures-async-stream
, which has the #![no_std]
support but, at least in my case, works really bad with macros.
How would a filter example look like, e.g. taking the example from the README yielding only odd numbers based on a source stream?
fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
for i in 0..3 {
yield i;
}
}
}
/// Stream of only odd numbers
fn odds<S: Stream<Item = u32>>(input: S)
-> impl Stream<Item = u32>
{
stream! {
pin_mut!(input);
while let Some(value) = input.next().await {
// ???
}
}
}
I'm new to rust and async-stream so pardon me if this is a silly question.
I compiled my program and it said:
error[E0308]: mismatched types
|
= note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)
error: aborting due to previous error; 1 warning emitted
So I re-compile with cargo rustc -- -Z macro-backtrace
, this time I get:
error[E0308]: mismatched types
--> /Users/can/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.2.1/src/lib.rs:222:22
|
218 | / macro_rules! stream {
219 | | ($($body:tt)*) => {{
220 | | let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
221 | | $crate::AsyncStream::new(__yield_rx, async move {
222 | | #[derive($crate::AsyncStreamHack)]
| | ^^^^^^^^^^^^^^^^^^^^^^^
| | |
| | expected struct `std::string::String`, found `&str`
| | help: try using a conversion method: `$crate::AsyncStreamHack.to_string()`
| | in this expansion of `stream_0!` (#8)
... |
227 | | $crate::dispatch!(($($body)*))
| | ------------------------------ in this macro invocation (#2)
228 | | })
229 | | }}
230 | | }
| |_- in this expansion of `async_stream::stream!` (#1)
...
286 | macro_rules! dispatch {
| _-
| |_|
| |_|
| |_|
| |_|
| |
287 | | (() $($bang:tt)*) => {
288 | | $crate::count!($($bang)*)
| | ------------------------- in this macro invocation (#7)
289 | | };
... |
297 | | $crate::dispatch!(($($first)* $($rest)*) $($bang)*)
| | --------------------------------------------------- in this macro invocation (#4)
... |
306 | | $crate::dispatch!(($($rest)*) $($bang)*)
| | ----------------------------------------
| | |
| | in this macro invocation (#3)
| | in this macro invocation (#5)
| | in this macro invocation (#6)
307 | | };
308 | | }
| | -
| |_|
| |_in this expansion of `$crate::dispatch!` (#2)
| |_in this expansion of `$crate::dispatch!` (#3)
| |_in this expansion of `$crate::dispatch!` (#4)
| |_in this expansion of `$crate::dispatch!` (#5)
| in this expansion of `$crate::dispatch!` (#6)
...
312 | / macro_rules! count {
313 | () => {
314 | stream_0!()
| ----------- in this macro invocation (#8)
315 | };
...
507 | };
508 | | }
| |_- in this expansion of `$crate::count!` (#7)
error: aborting due to previous error; 1 warning emitted
For more information about this error, try `rustc --explain E0308`.
Not sure what's going on but it seems to be a bug, please help.
When using the stream!
or try_stream!
macros autocomplete with rust analyzer works fine until a yield
statement is written, this is due to the macro no longer expanding if there is any invalid rust (but only once a yield
statement has been written). See the rust analyzer issue here explaining in more detail how autocomplete fails when macros fail to expand (specifically for this crate's macros): rust-lang/rust-analyzer#12759
I have no experience writing proc macros, I'm wondering is it possible to change the way the proc macros are written to always expand even if there is a yield
statement and the Rust code is invalid? This would allow rust analyzer auto complete to function properly in all cases. I tested and noticed the tokio select!
macro seems to consistently expand which allows autocomplete to function normally.
How can one exit early from stream! {}
?
It would be nice if a yield*
equivalent was supported.
Hi, I am trying to migrate a blocking code below to use async-stream
but I don't understand what is needed, the examples in the repo are not including a way to implement this macro against the tokio::stream::Stream
.
struct StreamBody {
file: tokio::fs::File,
buffer: Vec<u8>,
}
impl StreamBody {
fn new(file: tokio::fs::File, chunk_size: usize) -> Self {
let buffer = vec![0u8; chunk_size];
StreamBody {
file,
buffer,
}
}
}
impl tokio::stream::Stream for StreamBody {
type Item = Result<Bytes, ResponseError>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
use std::ops::DerefMut;
let stream = self.deref_mut();
match stream.file.read(&mut stream.buffer).await {
Ok(count) => {
if count > 0 {
Poll::Ready(Some(Ok(Bytes::copy_from_slice(&self.buffer[..count]))))
} else {
Poll::Ready(None)
}
},
Err(e) => Poll::Ready(Some(Err(Error::dump_read_failed(e).into())))
}
}
}
Could you please describe in the examples how to convert this to use this crate? Thank you!
Often I get compile errors when the compiler cannot deduce the type when using try_stream!
:
error[E0698]: type inside `async fn` body must be known in this context
--> src/server.rs:213:18
|
213 | let stream = try_stream! {
| __________________^
214 | | while let Some(chunk) = body.data().await {
215 | | let chunk = chunk?;
216 | | trace!("Body: {:?}", &chunk);
... |
219 | | }
220 | | };
| |_____^ cannot infer type for type parameter `E` declared on the enum `Result`
This can be resolved by moving the code into a function that returns the appropriate generic type. However, is it also possible to put a type annotation somewhere? (impl T
doesn't seem to work for annotating the type of a local variable.)
For performance reasons, I'd like to have one reusable bufffer buf: Vec<u8>
and yield a &buf[..]
. The user needs to drop the reference before another next().await
.
Unlike streaming-iterator which only requires &mut self
during next()
, this is not easy in async-stream because the generator loop keeps holding the mutable reference.
pub struct Reader<R: AsyncRead> {
r: R,
buf: Vec<u8>,
}
impl<R: AsyncRead + Unpin> Reader<R> {
pub fn stream(self: &mut Self) -> impl futures::Stream<Item = Result<&[u8]>> {
async_stream::try_stream! {
loop {
..
yield &self.buf[..];
}
}
}
error[E0502]: cannot borrow `self.buf` as mutable because it is also borrowed as immutable
--> src/main.rs:20:9
|
20 | / async_stream::try_stream! {
21 | |
22 | | loop {
... |
42 | | yield &self.buf[..];
| | -------- immutable borrow occurs here
43 | | }
44 | | }
| | ^
| | |
| | mutable borrow occurs here
| |_________lifetime `'1` appears in the type of `__yield_tx`
| argument requires that `self.buf` is borrowed for `'1`
|
It would be great to see an example of yielding borrowed data.
If using format_err! macro inside try_stream!:
use failure::*;
pub fn stream() -> impl Stream<Item = failure::Fallible<i32>> + Send {
async_stream::try_stream! {
Err(format_err!("adasdasd"))?;
for i in (1..10) {
yield 123;
}
}
}
It doesn't compile - *mut (dyn std::ops::Fn() + 'static)
cannot be shared between threads safely.
But when using it without format_err!:
use failure::*;
#[derive(Debug, Fail)]
#[fail(display = "my error")]
pub struct MyError;
pub fn stream() -> impl Stream<Item = Fallible<i32>> + Send {
async_stream::try_stream! {
Err(MyError)?;
for i in (1..10) {
yield 123;
}
}
}
Compiler says all Ok.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.