Giter Site home page Giter Site logo

Comments (5)

mattpodwysocki avatar mattpodwysocki commented on August 18, 2024

malayeri wrote Oct 16, 2013 at 2:39 PM

I asked Bart de Smet for some more context, since he designed some of these operators, and here's his reply:

It’s a matter of defaults once more, but some that you can’t always control right now as a user. Given that the completion of a Task that’s being converted to an observable can trigger the whole downstream processing pipeline (which can take a long time to run), ExecuteSynchronously was deemed inappropriate (cf. MSDN “Only very short-running continuations should be executed synchronously.”). I’d only introduce synchronous execution if we have a scheduler around and use that one to schedule the continuation on:
task.ContinueWith(t => … scheduler.Schedule(…) …, ExecuteSynchronously)
It’s fine to invoke, synchronously, the continuation that makes a call to Schedule which effectively is asynchronous (unless the immediate scheduler would be used). IIRC, theToObservable conversion for Task<T> doesn’t take a scheduler, but it could, effectively doing what’s shown above. It’d be great for the overload without an IScheduler to retain the original behavior though.

That leaves us with SelectMany (etc. whose Task-returning methods are basically macros for using ToObservable. We wouldn’t want a bunch of overloads of those that add an IScheduler parameter for this reason alone, so I’d rather keep those untouched, stating they have the default ToObservable behavior with no parameter, and if one wants to control what’s going on, use the IScheduler-based overload by hand:
xs.SelectMany(x => ftask(x)) == xs.SelectMany(x => ftask(x).ToObservable(/* default behavior */))
If you want control, you have to convert yourself:
xs.SelectMany(x => ftask(x).ToObservable(/* specify some scheduler here */))
I’m curious though why ObserveOnDispatcher didn’t help in Dave’s case.
Does that help explain things?

from reactive.

mattpodwysocki avatar mattpodwysocki commented on August 18, 2024

Yes, it does. Please thank Bart for me.

However, I'm not entirely convinced yet that it's the correct default behavior.

I now agree that Rx must respect ContinueWith's contract by only executing synchronously if the continuation is fast, per the cited MSDN documentation.

I also like the proposal for a new overload of ToObservable accepting a scheduler.

But I still don't like the default behavior for the parameterless ToObservable or SelectMany. Forcing a continuation to use the task pool when it's just going to turn around and marshal back to the UI thread anyway seems wasteful and for unsuspecting developers that fail to do so it could potentially introduce race conditions or other threading bugs in programs unexpectedly. (I'm proof that it happens :)

It seems that the primary use case nowadays for Task is to be used with await, in which we expect the synchronization context to be preserved unless it's explicitly avoided via ConfigureAwait, so I feel that converting a Task<T> into an IObservable<T> should respect the principal of least surprise and not force unnecessary concurrency.

Therefore, how about a compromise? The parameter-less ToObservable operator could attempt to get a scheduler for the current synchronization context and pass it to Bart's proposed overload; otherwise, it would fallback to the existing behavior in Rx 2.0 (whatever ContinueWith decides). Furthermore, the proposed overload is perhaps sufficient for applying ConfigureAwait(false)-like behavior for conversions by simply passing in Scheduler.TaskPool.

At the very least, even if one could argue that this isn't a sensible default behavior for ToObservable, then would it at least be sensible for SelectMany?

For example, and to answer Bart's question about why ObserveOnDispatcher (alone) didn't help in my particular scenario, imagine a query that uses multiple consecutive SelectMany queries. I see this as a declarative program using observables analogous to an imperative program that uses await:

var q = from x in t1.ToObservable()
        from y in t2
        from z in t3
        select z;

var x = await t1;
var y = await t2;
var z = await t3;

All t* are Task<T>.

The problem now is that these two programs seem to be semantically the same, yet depending upon whether a synchronization context currently exists they'll behave differently. I find it really strange that Rx is actually the thing introducing concurrency, in this case. It seems like TPL was classically all about introducing concurrency to solve problems, while Rx has always been about taming concurrency. Async/await is now also about taming concurrency, yet interoperating with Rx introduces concurrency.

It's true that q could be "fixed" by applying ObserveOnDispatcher before subscribing; however, in reality it's not always so simple, or intuitive.

For example:

var q = from x in t1.ToObservable()
        from y in DoSomethingAsync()
        from z in DoSomethingElseAsync()
        select z;

The *Async methods return Task<T>.

In reality, it's often necessary to execute async methods on the UI thread, just like the default behavior of the await keyword when an async method begins executing on the UI thread.

To solve this problem in the above example, we have to explicitly force all observables onto the UI thread, even when we know that all of our async methods will complete on the UI thread. That's counterintuitive, IMHO.

var q = from x in t1.ToObservable().ObserveOnDispatcher()
        from y in DoSomethingAsync().ToObservable().ObserveOnDispatcher()
        from z in DoSomethingElseAsync().ToObservable().ObserveOnDispatcher()
        select z;

-OR, with the proposed overload-

var q = from x in t1.ToObservable().ObserveOnDispatcher()
        from y in DoSomethingAsync().ToObservable(DispatcherScheduler.Current)
        from z in DoSomethingElseAsync().ToObservable(DispatcherScheduler.Current)
        select z;

It's especially counter-intuitive when we consider what's really happening with this "fix". The current Rx marshaling behavior goes from UI->ThreadPool->ThreadPool->ThreadPool->ThreadPool->ThreadPool because each use of ToObservable apparently marshals to a new pooled thread, even if the current continuation is already executing on a pooled thread. However, we want to begin each continuation on the UI instead, so we're avoiding the default behavior by introducing a no-op scheduling operation: UI->UI->UI->UI->UI->UI. In fact, what we actually want (and what I had expected) is synchronous behavior for continuations: UI->UI->UI.

Given async/await, it no longer seems like a sensible default behavior to require developers to add extra code on each line when converting from Task-based asynchrony to observables, which I suspect is the common case nowadays. At least, it certainly doesn't seem like it guides developers into a pit of success.

That example also illustrates the reason that ObserveOnDispatcher didn't work for me. I wasn't aware that Rx was introducing concurrency, so I was surprised to discover that adding ObserveOnDispatcher in one place, rather than in every place, wasn't enough to fix the problem. Of course, ObserveOnDispatcher is useless if the Dispatcher isn't available in the context of a subsequent from clause, and I should've realized that; although, ObserveOn(control.Dispatcher) still works. But it's all about expectations: I knew that my *Async methods were completing on the UI thread, but I was getting an error in Subscribe, so I was confused that applying ObserveOnDispatcher before Subscribe didn't work. After I applied it, I started getting threading errors in the other Async methods too (perhaps it was race conditions), until I realized that Rx was introducing concurrency in SelectMany. This was counter-intuitive for me.

I guess it boils down to these questions:
1.How do most developers use the conversion from Task to Observable, including overloads of SelectMany et. al.?
2.Do they expect await semantics or do they expect concurrency to be introduced automatically regardless of the thread on which the Task completes?

Thanks for considering the matter,
Dave

from reactive.

mattpodwysocki avatar mattpodwysocki commented on August 18, 2024

weq wrote Sep 28 at 11:31 PM

+1 for ToObservable(IScheduler) overload

This is a big gap in the API for me! I use it alot in testing.

from reactive.

mattpodwysocki avatar mattpodwysocki commented on August 18, 2024

davedev wrote Sep 29 at 5:04 AM

You know, I never followed up with Bart's email on this. The last thing we discussed was that capturing the SyncContext to achieve a similar affect as async/await isn't ideal although it's certainly reasonable, similar to how FromEventPattern does the same thing now. It's all about user expectations. But Bart cautioned that it's worth considering the consequences and the specifics about the actual behavior before implementing it.

I'll reconsider this feature when I get some time and check in a fix (that is, unless the Rx team has some other plans - please let me know!). Thanks for bringing this back to my attention.

from reactive.

bartdesmet avatar bartdesmet commented on August 18, 2024

We added a ToObservable(IScheduler) overload which will use TaskContinuationOptions.Synchronous and use the specified scheduler to run the continuation. If Scheduler.Immediate is used, this typically means it will run where the task is completed from, though the TPL may still reschedule it on the task pool in case the stack is too deep already (as to avoid stack overflow exceptions).

The default overload ToObservable() stays as-is with its v2.x behavior for compatibility reasons. Similarly, we didn't change the shortcuts in SelectMany that use ToObservable() underneath. More elaborate mechanisms using synchronization contexts etc. were considered but deemed too subtle, and the stack probing behavior of ContinueWith does not allow us to make any guarantee about synchronous completion; one has to be aware a thread jump may still occur, so it'd always have to be tamed using an explicit ObserveOn, e.g. using a SynchronizationContextScheduler.

For a future Rx.NET release, we may revisit some of this, but for the upcoming refresh of v2.x, we'll simply go for the addition of the IScheduler-based overload.

from reactive.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.