Giter Site home page Giter Site logo

Comments (44)

headinthebox avatar headinthebox commented on August 18, 2024 10

IMHO whole back pressure idea was never desired for any version of Rx because it is a pull model instead of a push model for "first class event streams" what Rx was supposed to be. UI events and time-based sources do not naturally support the notion of "requesting" items/pagination, they produce new values at their on pace, and the consumer has to deal with it one way or another. Back pressure is interactive, not reactive.

Hence "back pressure" is really an alternative for IAsyncEnumerable that has existed in .NET since we shipped. Which is obviously useful, no doubt about that, but something completely different than what Rx is intended for. Since .NET has async await, leveraging that seems like the most user-friendly option even if t is not optimal performance-wise. But you can make probably things "vectorized" under the covers without changing the user API. Plus the whole point of using async is because there is a significant speed difference between producer and consumer so the glue can easily absorb some of that.

As a more fundamental issue, I don't buy the argument that unbounded buffers are evil. As a developer I find it condescending that an API decides for me what is good or bad. That turns an unopinionated API into an opinionated framework, again 180 degrees opposite of the core design principles of Rx. Rx is simply glue, and the user of the glue should decide if the system they glue together satisfies the constraints imposed on it. The glue should stay out of it.

To make a long story short, at this point, I do pretend that I have any control over what people call "Rx" anymore, and neither do I care. Everyone is free to come up with whatever model they want, and call it whatever they like. Dilution is the price you pay for taking a simple and clear conceptual idea mainstream. But like with "functional programming", I care more about the fact that "Rx" is now mainstream than about purity.

[In any case, I have switched to my own implementations of Rx in .NET and Scala/Java that remain true to my original vision of what Rx should be, and have not used, or felt any desire to use, any other implementation of "Rx" since then]

from reactive.

shiftkey avatar shiftkey commented on August 18, 2024 6

As one of the current "gatekeepers" I'll add a couple of notes here before I try and close this out:

  • At this point in the lifecycle of Rx.NET and Ix.NET (that is, relative API stability, but lots of concepts are still relatively new to users) I'd rather not add more concepts to the core unless it benefits the overall ecosystem.
  • It sounds like a few workarounds have been proposed here, and I have no strong feelings on which is preferable.
  • I'm kind of indifferent to the current backpressure APIs implemented from the ReactiveX website. To me at least they don't really feel naturally suited to Rx.NET's push model, and don't really address the unbounded queue problem beyond throwing away items (which you can do in other ways), but perhaps I've not encountered the right scenarios to feel this pain.

from reactive.

headinthebox avatar headinthebox commented on August 18, 2024 3

Unbounded buffers & back pressure are orthogonal issues.

Back pressure as encoded in reactive streams is really just asynchronous pull where the consumer is in charge of the pace of the computation. In .NET the "standard" approach for that is IAsyncEnumerable (which plays nicely with await, and allows for nice language integration).

IMHO the behavior of each kind of stream should be encoded in the type. So pure push should be different from asynchronous pull, and pure pull, with explicit conversions between them that implement buffering/dropping/blocking policies.

BTW, I always have been uncomfortable about the "across network boundaries" argument because it sucks you into http://en.wikipedia.org/wiki/Fallacies_of_distributed_computing. Never assume the other side of the network boundary cooperates.

from reactive.

khellang avatar khellang commented on August 18, 2024 1

I'd love to see something around back pressure for Rx.NET 3.0 👍

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024 1

my gut feeling says that if backpressure-at-the-core is possible, it's not really a push producer,

If backpressure means "stop emitting" then yes, only pull works. But the backpressure signal being propagated allows for push streams to choose what to do if the downstream can't handle the flow. This is how it gets used in RxJava on push streams.

The producing Observable can have flow control composed on or into it for what to do if a backpressure signal is received. For example:

  • fail (the default RxJava behavior, instead of buffering as Rx.Net/RxJS do)
  • buffer
  • drop
  • temporal flow control (sample, denounce, etc)
  • block
  • autoscale (stream processors for example)

The `onBackpressure*’ operators are examples of operators that get composed onto a push stream that can conditionally react based on the backpressure signal.

from reactive.

SeanFarrow avatar SeanFarrow commented on August 18, 2024 1

Hi,

ReactiveStreams seems to solve this, should we not do what RXJava have done and have an implementation of the ReactiveStreams interfaces.
I'm happy to work on this as I need it pretty soon in a project, the only question is which branch/repo does it belong in/should we have a new repository?

from reactive.

boekabart avatar boekabart commented on August 18, 2024 1

There are .NET implementations of ReactiveStreams already; I would say to keep Rx and Ix 'pure' to their interfaces, and to provide lots of proper 'conversions' (buffers/queues with various behaviours) between IObservable and IAsyncEnumerable. And of course, educate the general public when, where and how to switch from (hot) observable to ix-async .

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

Hi, do we have any movements to this direction?
I was thinking a little bit about back-pressure in Rx and believe there's no way to do it without contract modification/extending like RxJava did.
Especially for producers which are common pattern in pull-based collections should be the way to fallback to pull-based behavior (produce by request). Most of standard observable factories without timers (Range, Generate, ...) are useful in compositions.
Imagine we need to do some long-running asynchronous work for synchronously generated infinite (or a very long) sequence. Naive implementation (and pretty straightforward) would be:

Observable.Range(1, 10000) // not parameterized by time
.Select(i => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => i))  // some async work
.Concat() // or Merge(1)
.Take(TimeSpan.FromSeconds(5)) // let's limit with some time

To the time the observable will be completed Range already will have produced all 10000 values.
Need some way to say to producer to generate next value(s). I don't like RxJava's way with request(n).
Maybe it will be be better to introduce some interface like:

public interface IConsumer<T>: IObserver<T>
{
    void OnSubscribe(IDisposable subscription); // to be able unsubscribe before Subscribe exit
    IObservable<bool> IsReady; // is consumer ready to process new values
}

Of course, IsReady should be informative only.
In this case producers will be able to stop/resume generating based on IsReady stream.
And operators like Concat will be able to inform producers when to stop/resume.

from reactive.

RxDave avatar RxDave commented on August 18, 2024

Well, for your Range example I believe that an IAsyncObservable<T> would meet your needs in a more general way. The hypothetical IAsyncObserver<T> that you would pass to it would define Task OnNextAsync(T value). Thus, the observer could signal back to the observable when it's ready for the next value by simply completing the Task. This would also be a natively async variation on my "WindowIntrospective" operator that I described above, but as you've mentioned it would require a whole new monad. That's why I invented the "Introspective" family of operators in the first place -- it works right now with Rx 2.x.

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

I guess IAsyncObservable isn't released still. @RxDave do you know any plans of Rx team to make it? I saw on Bart's slides that they are going to make a preview of new Rx duality set. When will it be?

from reactive.

RxDave avatar RxDave commented on August 18, 2024

I'm in the dark on their plans like everyone else, sorry.

from reactive.

headinthebox avatar headinthebox commented on August 18, 2024

Why not simply use asyncenumerable.

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

I don't like RxJava's way with request(n). Maybe it will be be better to introduce some interface like:

We tried that approach (similarly related to the "pause" model) before landing on micro-leasing. It works well in-process but fails when going over truly async boundaries such as network connections. It fails because one of two things happens:

  1. Every onNext must first be preceded by a check of isReady which is very inefficient, particularly over network boundaries.
  2. The isReady/pause signal being sent to the producer races onNext events and thus the consumer must have an unbounded buffer as the race is non-deterministic. Having unbounded buffers defeats the point of backpressure.

These and other approaches were also discussed as part of the collaboration on Reactive Streams which solidified the request(n) semantics. See spec at https://github.com/reactive-streams/reactive-streams-jvm/.

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

Why not simply use asyncenumerable.

It is not just pull use cases that are negatively impacted by unbounded buffers in merge, flatMap, observeOn, zip, etc. Additionally, composing push and pull or hot and cold streams is a common reality and this does not work well without the ability of backpressure slowing down the cold/ pull stream to the velocity of the hot stream. Try zipping a hot event stream with a cold 'range' as an exercise.

So if everything is pull then yes, asyncenumerable is a great choice. If push is needed or wanted then I suggest that the benefits of adding backpressure to Observable have been worth the effort.

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

Another experiment from the Java side was having this signature:

Future<Void> onNext(T t)

This can work functionally but was too inefficient on the JVM. Allocation and garbage collection overhead on high throughput streams is non-trivial. Composing and waiting on 'n' futures is also non-trivial cost. Object pooling is also very hard to have reliable when a type like this is included in the public API and anyone can retain references to it.

Perhaps the CLR has a solution for these efficiency issues. If so it would be interesting to better understand how it is done without affecting performance.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

@mattpodwysocki doesn't the 'Introspection' approach you describe lead (in case of prolonged consumer undercapacity) to unbounded buffers anyway? Incoming OnNexts are buffered only while outgoing OnNext is blocking, but if for an outgoing OnNext with a list of 3 items, more than 3 are buffered, the next OnNext(4+ items) is going to take longer, so even more will be buffered. Ultimately, the number of waiting items doesn't get less, and that's what defines the buffer size.

So while I'm not a fan of the RxJava (setProducer) way at all (burdening the consumer with the queuing strategy), I don't think that the introspection approach by itself helps at all.

Only advantage I can think of, is that it gives the consumer complete insight in the size of the backlog, and that you can easily build strategies upon it. So maybe combining that with a samping/dropping strategy could be a solution. [ Eg. SelectMany(l => l.Take(n)) is a simple one that implicitly bounds the buffer size to 'whatever comes in during the processing of n elements' - it accepts certain peak volumes but not sustained overload. ]

But as for backpressure - I think what is needed is an ability to work with combinations of push and pull streams, IObservables and (async)enumerables. Reason: my gut feeling says that if backpressure-at-the-core is possible, it's not really a push producer, but an enumerable-in-observables-clothes instead. Observable.Range being the most obvious example.

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

@benjchristensen thank you for explaining why you made such decision.
Did you consider to do it with special kind of scheduler? If producer and consumer share one controllable scheduler (should be just a lightweight wrapper over some real scheduler) then consumer can lock scheduler until it be ready to consume new values. In this case there's no need to propagate interface implementation through all the operators chain.
With this approach, all that producer need is to schedule its work on this scheduler.

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

in addition:

interface IControllableScheduler: IScheduler
{
  bool IsLocked { get; } // not sure it is required
  IDisposable Lock(); // in this case, only locker has a way to unlock
}

The only problem I see is that scheduler should be known by a subscriber. And I'm not sure do we need specific channel (like OnSubscribe) for passing scheduler or not.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

And then I just realized something. The RxJava 'request(n)' combined with OnNext... is asyncEnumerable: request() is (a repeated) GetNextAsync(), onNext the 'completion'.
So actually, it injects into the observer/observable pattern/interface, the exact opposite pattern: iterator/iterable. That doesn't feel clean!

Additionally, composing push and pull or hot and cold streams is a common reality
Completely agree with that, except, I feel it's about push & pull only - because hot or cold observables, if true observables, should not have to (or be able to) react to backpressure. For them the only solution is a selection mechanism - BETWEEN producer and consumer based on use-case.
and this does not work well without the ability of backpressure slowing down the cold/ pull stream to the velocity of the hot stream. Try zipping a hot event stream with a cold 'range' as an exercise.
Exactly - range should be an enumerable, not an observable. It's a hack.

So, we need smth like IObservable<T> Zip<T>(this IObservable<T> x, IAsyncEnumerable<T> y)
etc. for the known situations where push/pull combinations currently don't work well.

And separately, completely unrelated, a solution where a selection strategy can be applied in case of overpressure.

from reactive.

headinthebox avatar headinthebox commented on August 18, 2024

https://www.youtube.com/watch?v=pOl4E8x3fmw

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

we need smth like IObservable Zip(this IObservable x, IAsyncEnumerable y)
etc. for the known situations where push/pull combinations currently don't work well.

We found this unsatisfactory as it meant Rx Observable async operators must still be unbounded and we risked buffer bloat. We preferred having the system fail and making the user choose a flow control strategy (temporal, buffer, drop, etc).

For example, 2 push streams like this:

Zip(Interval(1, TimeUnit.Seconds), Interval(1, TimeUnit.Milliseconds))

This will buffer bloat if there is not a bounded buffer inside zip. RxJava will instead fail via onError with a MissingBackpressureException and force a choice of flow control. This permits use of bounded buffers.

In other words, saying that buffer bloat was acceptable unless developers did exactly the right thing was not something we were okay with.

the RxJava way

This isn't just for RxJava. Reactive Streams was a collaboration across companies and is now being proposed for Java 9.

The Reactive Streams interfaces are cleaner and RxJava 2 will adjust to implement them directly. RxJava 1 fit the functionality into existing types additively. So I agree that the setProducer thing is awkward. The proper way is via onSubscribe as per the Reactive Stream interface definitions.

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

special kind of scheduler

We did yes. But we didn't pursue it because that meant the library would still have unbounded buffers and only work safely with this particular scheduler. Also, that would mean everything would always need to be scheduled which is unnecessary overhead in many cases. It also does not work well with actor style message passing or over network boundaries.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

What I'm saying is that you indeed DO need backpressure handling (or actually, bounded buffers), and indeed, fail fast(er) rather than having buffer bloat is a much better default, but my most important points are that backpressure:

  • shouldn't be used for producers that are actually pull (use (async)enumberable instead)
  • should be handled (well, set up/configured) by 'the pipeline', not by the observable or observer themselves. They should just do their simple task. And what I gather from your referenced slide deck, that's actually an option in RxJava today?

from reactive.

james-world avatar james-world commented on August 18, 2024

An asyncenumerable producer making decisions to fail/buffer/drop/sample etc. based on the pace of the consumer does feel much more natural than putting that logic into observables.

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

@headinthebox

If I do this:

AsyncObservable.Range(0, 1000)
  .ToObservable()
  .ToAsyncEnumerable()
  .ForEach(async i => { await task.Delay(TimeSpan.FromSeconds(1)); });

Will the Range still generate values relying on slow consumer?

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

Just tested and no, AsyncEnumerable.Range generates values very fast once it's converted to observable.

from reactive.

headinthebox avatar headinthebox commented on August 18, 2024

@Igorbek the current conversions where not designed with that behavior in mind. When going from observable to asyncenumerable, we'll have to specify a policy such as block, drop, buffer, ...

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

Sorry for the late response ... catching up on an unbounded email queue after 2 weeks of travel.

backpressure shouldn't be used for producers that are actually pull (use (async)enumberable instead)

Agreed about using the right type, and we generally do this by just using Iterable when that's the right type to represent pull data. We then use Observable.from(Iterable) to convert into the Observable world for composition. C# obviously offers better opportunities here because of extension methods and the fact that Enumerable and Observable are designed into the language for interop.

The other consideration for us was that we needed both push and pull to be composable AND non-blocking to work in fully async event-loop runtimes. Iterable can't be directly used in that type of use case since its APIs are blocking. Thus the RxJava/ReactiveStream state machine with request allows doing async/await style behavior in an environment that doesn't support it.

You have far different design opportunities because C# offers async/await whereas we don't have that in Java.

Importantly to us as well, the AsyncEnumerable API has non-trivial performance impact (at least on the JVM) since it is an extra object allocation per event and the overhead of managing the waiting on the Future/Task that is emitted on each event. The use of Future for each onNext was explored and quickly abandoned due to the significant impact on throughput.

Thus, the RxJava Observable and Reactive Streams Publisher support microbatching and that signal can then be used by either pull or push sources to make a decision about flow control in the push case or pulling in batches in the pull case. This is achieved without overhead of object allocation or management of a Future per onNext.

Something like mouse events that can't be backpressured are modeled to emit as they need to – true push events. The backpressure signal (the request count) is then used as needed for flow control. If flow control is ignored, all will work fine as long as the consumers with bounded buffers keep up. If they ever fall behind then an error will be emitted since a flow control strategy was not provided.

A pull use case, such as reading from an Iterable or a file on disk can behave differently. There the backpressure signal works instead to trigger pull against the queue of data and emit data at the rate the consumer requests it.

These are shown starting at this slide: https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90

should be handled (well, set up/configured) by 'the pipeline', not by the observable or observer themselves. They should just do their simple task. And what I gather from your referenced slide deck, that's actually an option in RxJava today?

Yes, if something (like the mouse events example) can't directly support backpressure, it just emits via onNext and leaves it to the composition of the Observable pipeline to deal with it. The backpressure signal is propagated so that flow control strategies can be applied when necessary. If backpressure never occurs then no flow control is applied.

Observable<ME> me = Observable.create(s -> {
  mouseEvents.addListener(me -> {
        s.onNext(me);
  });
});

Then flow control can be composed through the many operators Rx offers.

me.sample(1, TimeUnit.SECONDS)
me.debounce(500, TimeUnit.MILLISECONDS)
me.onBackpressureBuffer()
me.onBackpressureDrop()
etc

from reactive.

benjchristensen avatar benjchristensen commented on August 18, 2024

the current conversions where not designed with that behavior in mind

This was part of the solution we pursued with composing Iterable and Observable. For example, the following was always a problem:

Observable.zip(Observable.interval(1, TimeUnit.SECOND), 
               Observable.range(1, 10000000), 
               (a, b) -> { a+b })

Prior to supporting backpressure this would buffer bloat as range emitted as fast as it could.

The range operator was rewritten to work as a state machine within the request semantics so it emits only as fast as the consumer requests. With async/await this would be far easier. In Java we have had to implement that logic by hand in the RxJava library.

from reactive.

akarnokd avatar akarnokd commented on August 18, 2024

Is this still desired for Rx.NET?

I've been fiddling with a port of RxJava 2.0 preview to C# here. The good thing is that there seems to be no roadblock in doing that. The bad thing I have limited experience with C# as a platform and may not match Java features (or lack of features like primitive generics) and behavior properly. From what I've heard from Bart's talk is that Rx.NET 3.0 will focus on cloud integration instead.

from reactive.

masaeedu avatar masaeedu commented on August 18, 2024

@Igorbek I know you posted your original snippet a long time ago, but would an extension method like this solve your issue?

public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source, Func<T, Task<U>> selector, int limit)
{
    var sem = new SemaphoreSlim(limit);
    sem.Wait();

    return source
        .Select(async x =>
        {
            var result = await selector(x);
            sem.Release();

            return result;
        })
        .Do(_ => sem.Wait())
        .Concat();
}

Taking your original code (with stuff added for logging):

Observable.Range(1, 10000)
    .Do(x => WriteLine($"Produced {x}"))
    .Select(i => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => i))
    .Concat()
    .Do(x => WriteLine($"Consumed {x}"));

You would change it to this:

Observable.Range(1, 20)
    .Do(x => WriteLine($"Produced {x}"))
    .SelectAsync(async i => await Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => i), 5)
    .Do(x => WriteLine($"Consumed {x}"));

Imagining that Observable.Range produced some really memory intensive objects instead of just integers, the 5 passed as the limit argument of SelectAsync would now ensure that no more than 5 are pending consumption at any time.

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

@masaeedu Thanks for the suggestion. However, it doesn't seem to be a clear solution as it uses blocking API.
Actually, I came out with some sort of controlled schedulers that I can pass to a fast producer to prevent them from iteration. For example something like that:

public interface IControlledScheduler: IScheduler
{
  IDisposable Suspend();
}

var controlledScheduler = new ControlledScheduler(Scheduler.Default); // it redirects work to the inner scheduler until it gets suspended
Observable.Range(1, 100000, controlledScheduler)
  .SelectMany(async x =>
  {
    using (controlledScheduler.Suspend())
      await ConsumeAsync(x);
  })

from reactive.

Igorbek avatar Igorbek commented on August 18, 2024

I need to find the sources for that scheduler in order to share with you.

from reactive.

akarnokd avatar akarnokd commented on August 18, 2024

To support non-blocking backpressure, you need an architecture change. An example of it is the Reactive.Streams base API where the upstream IPublisher calls the downstream ISubscriber with an ISubscription that allows requesting more elements and cancelling the stream. There is already a working library with the same concept.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

from reactive.

SeanFarrow avatar SeanFarrow commented on August 18, 2024

There are some, but you can’t use RX with them.
That’s what I’m looking for!

From: Bart de Boer [mailto:[email protected]]
Sent: 21 September 2016 08:52
To: Reactive-Extensions/Rx.NET [email protected]
Cc: Sean Farrow [email protected]; Comment [email protected]
Subject: Re: [Reactive-Extensions/Rx.NET] Backpressure and Unbounded Queues (#19)

There are .NET implementations of ReactiveStreams already; I would say to keep Rx and Ix 'pure' to their interfaces, and to provide lots of proper 'conversions' (buffers/queues with various behaviours) between IObservable and IAsyncEnumerable. And of course, educate the general public when, where and how to switch from (hot) observable to ix-async .


You are receiving this because you commented.
Reply to this email directly, view it on GitHubhttps://github.com//issues/19#issuecomment-248537783, or mute the threadhttps://github.com/notifications/unsubscribe-auth/ABY1fnQYFGH877jAdXY3RsAufRAK3Uvaks5qsOIfgaJpZM4CuEYq.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

I'd say: write 'converters' that convert IObservable/IAsyncEnumerable to ReactiveSteam, and converter(s) that convert ReactiveStream to IAsyncEnumberable, and you'll be able to chain it all together. Converting ReactiveStream to IObservable is weird, because the converter would have to 'set the pace' which, well, it can't really do without knowing the 'consumption' capacity.
These converters will have to be specific to the exact ReactiveStream interface, but don't need to depend on Rx at all, since they just use the .NET IObservable interface. For Ix, obviously they do need a dependency in order to know IAsyncEnumerable...

from reactive.

SeanFarrow avatar SeanFarrow commented on August 18, 2024

good point, thanks!

From: Bart de Boer [mailto:[email protected]]
Sent: 21 September 2016 09:14
To: Reactive-Extensions/Rx.NET [email protected]
Cc: Sean Farrow [email protected]; Comment [email protected]
Subject: Re: [Reactive-Extensions/Rx.NET] Backpressure and Unbounded Queues (#19)

I'd say: write 'converters' that convert IObservable/IAsyncEnumerable to ReactiveSteam, and converter(s) that convert ReactiveStream to IAsyncEnumberable, and you'll be able to chain it all together. Converting ReactiveStream to IObservable is weird, because the converter would have to 'set the pace' which, well, it can't really do without knowing the 'consumption' capacity.
These converters will have to be specific to the exact ReactiveStream interface, but don't need to depend on Rx at all, since they just use the .NET IObservable interface. For Ix, obviously they do need a dependency in order to know IAsyncEnumerable...


You are receiving this because you commented.
Reply to this email directly, view it on GitHubhttps://github.com//issues/19#issuecomment-248542168, or mute the threadhttps://github.com/notifications/unsubscribe-auth/ABY1ftsIQDb33vOLDTEbDdeedv3visIxks5qsOcvgaJpZM4CuEYq.

from reactive.

bradphelan avatar bradphelan commented on August 18, 2024

@boekabart You say

In Rx/Ix, you can do a similar thing by switching from IObservable to
IAsyncEnumerable. The 'Buffer block' to perform this switch (implementing
IObserver and IAsyncEnumerable ) has a particular behaviour, eg.
buffer all (with a limit), or just buffer one value, or don't buffer at all
(only 'forward' if subscriber is waiting).

But I can only find in AsyncEnumerable.cs

public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>
    (this IObservable<TSource> source)

and when I dive into the code it uses an unbounded queue.

  public ToAsyncEnumerableObserver()
  {
    this.Values = new Queue<T>();
  }

I really need a single element buffer but I can't see any API that matche what you suggest. The Buffer extension generates a moving window over the input and doesn't relate.

from reactive.

boekabart avatar boekabart commented on August 18, 2024

The provided ToAsyncEnumberable extension indeed provides the conversion, but unbounded as for example most Subject s in Rx. But that shouldn't stop you from writing your own converter with exactly the behaviour you need...
I think I actually hinted in that direction:

I would say to keep Rx and Ix 'pure' to their interfaces, and to provide lots of proper 'conversions' (buffers/queues with various behaviours) between IObservable and IAsyncEnumerable.

from reactive.

bradphelan avatar bradphelan commented on August 18, 2024

Ok. That is what I am indeed doing. I kind of inferred from what you wrote that it already existed and considering the length of this thread I would have thought there would be some implementations lying around. I'll post back my solution when I think it works.

from reactive.

bradphelan avatar bradphelan commented on August 18, 2024

@boekabart. This is what I came up with. It works but I'm not sure if I should be putting any locks or sync in there. But it's super nice.

   public static class ToAsyncEnumerableSupport
{
    public class AsyncEnumeratorFromObservableWithBufferOneStrategy<T> : IAsyncEnumerator<T>
    {
        private TaskCompletionSource<T> _Tcs;
        private readonly IDisposable _Subscription;
        private bool _Canceled;

        public AsyncEnumeratorFromObservableWithBufferOneStrategy(IObservable<T> source)
        {
            _Tcs = new TaskCompletionSource<T>();
            _Subscription = source.Subscribe
                ( onNext: v => _Tcs.TrySetResult(v)
                  , onCompleted: Cancel);
        }

        private void Cancel()
        {
            _Tcs.TrySetCanceled();
            _Canceled = true;
        }

        public void Dispose() => _Subscription.Dispose();

        public async Task<bool> MoveNext(CancellationToken cancellationToken)
        {
            try
            {
                var tcs = _Tcs;

                if (_Canceled)
                    return false;

                Current = await tcs.Task.ConfigureAwait( false );
                _Tcs = new TaskCompletionSource<T>();
                return true;
            }
            catch(Exception)
            {
                return false;
            }
        }

        public T Current { get; private set; }
    }

    public class AsyncEnumerableFromObservableWithBufferOneStrategy<T> : IAsyncEnumerable<T>
    {
        private readonly IObservable<T> _Source;

        public AsyncEnumerableFromObservableWithBufferOneStrategy(IObservable<T> source)
        {
            _Source = source;
        }

        public IAsyncEnumerator<T> GetEnumerator()
        {
            return new AsyncEnumeratorFromObservableWithBufferOneStrategy<T>(_Source);
        }
    }


    public static IAsyncEnumerable<T> ToAsyncEnumerableWithBufferOneStrategy<T>(this IObservable<T> source)
    {
        return new AsyncEnumerableFromObservableWithBufferOneStrategy<T>(source);
    }
}

from reactive.

akarnokd avatar akarnokd commented on August 18, 2024

The preferred IAsyncEnumerable does this by default and otherwise libraries supporting the Reactive Streams .NET interfaces have been created by 3rd parties since this issue.

from reactive.

softlion avatar softlion commented on August 18, 2024

The preferred IAsyncEnumerable does this by default and otherwise libraries supporting the Reactive Streams .NET interfaces have been created by 3rd parties since this issue.

Could you post links of such libraries ? + links with examples using IAsyncEnumerable to resolve the following use case ? Ty.

Use case: prevent a timer from triggering an api call if the previous call took too much time.

                timerSubscription = this.WhenChanged(o => o.IsEnabled)
                    .Select(isEnabled => isEnabled ? Observable.Timer(DateTimeOffset.Now, timerPeriod) : Observable.Empty<long>())
                    .Switch()
                    //.OnBackpressureDrop()
                    .SelectMany(async _ => async () => await ApiCall())
                    .Subscribe();

Edit: a solution

        private readonly BehaviorSubject<bool> apiSubject = new(true);

            timerSubscription ??= this.WhenChanged(o => o.IsEnabled)
                .Select(isEnabled => isEnabled ? apiSubject : Observable.Empty<bool>())
                .Switch()
                .SelectMany(async _ =>
                {
                    await MainThread.InvokeOnMainThreadAsync(async () => await Api());
                    await Task.Delay(timerPeriod);
                    return true;
                })
                .Subscribe(_ => apiSubject.OnNext(true));

from reactive.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.