Giter Site home page Giter Site logo

Comments (4)

jlaanstra avatar jlaanstra commented on August 18, 2024

Awaiting an IObservable<T> is equal to doing a LastOrDefaultAsync() on the observable. When an Observable is awaited the subscription starts.

In the AsyncWithObservable method, the observable is awaited after the OnNext(1)call which is why you see 2 as the result.

In the AsyncWithTask and WithTask case the ToTask() call creates the subscription which is why you see 1.

I am not entirely sure why removing the sleep causes blocking but it might have to do with the fact that both s.First() and t.Wait() block the current thread and therefore might cause deadlock issues.

from reactive.

slorion avatar slorion commented on August 18, 2024

Ok, thank you for the fast answer! I now understand why the behavior is like that, but I still find it very surprising and it is quite easy to get caught by it. From an API consumer perspective, I would expect AsyncWithObservable and AsyncWithTask to have the same behavior.

I included the tests with s.First() for completeness sake, but we are not using that method. It was just a curiosity I saw while making the repro code.

from reactive.

rxteam avatar rxteam commented on August 18, 2024

There are no unexpected results in any of these examples; everything behaves as it was designed.

Regular subjects don’t have memory for OnNext messages. Whoever is tuned in when the message is produced will see it. Hard cheese for late subscribers.

var s = new Subject();

var t = Task.Run(() => s.First());

s.OnNext(42);

var f = await t;

is a textbook race condition. The result depends on when the task’s body runs relative to the main thread’s call to OnNext. It’s possible for the thread to get switched between the Task.Run call and the s.OnNext call; your addition of a sleep of 100ms just made such a switch very likely (but still no guarantee the task gets picked up within that time).

Behaviorally, this is equivalent to:

var a = default(Action);

var e = new ManualResetEvent(false);
var f = 0;
new Thread(() =>
{
var h = default(Action);
h = x => { f = x; e.Set(); a -= h; };
a += h;
}).Start();

if (a != null)
a(42);

e.WaitOne();

The += on the event is similar to the Subscribe call made by First. The -= on the event is similar to the Dispose call made by First upon receiving the first element. The null check on the delegate is similar to the subject’s internal use of a NopObserver in case no-one is listening.

For the other samples, FirstAsync returns a cold observable. It’s not until you subscribe to it or await that you’ll receive messages. The Async suffix may be a misnomer and has a long history:

  1.   First there were First/Last/Single and their OrDefault friends that were all blocking, i.e. IO<T> -> T. We felt this was the right thing to do:
    

a. Decomposing a sequence a la head/tail is a common thing to do in functional circles.

b. There are asynchronous counterparts of most of these, e.g. Take(1) for an asynchronous first and TakeLast(1) for an asynchronous last.

c. All of this was before there was a “future” type in the BCL, i.e. Task. We’d likely have made these things return Task if there was.

  1.   The blocking behavior of these turned out to be something users (and us) weren’t able to judge well in various contexts.
    

a. For example:
xs.Window(n).Select(w => w.Last())
will deadlock, because the OnNext of a message in xs causes an OnNext of a window and the blocking subscription made by Last in the selector function causes the thread to be stalled, hence no further messages of xs can be processed.

b. We had to add non-blocking variants to make this obvious, and we chose to use the Async suffix for these variants. We couldn’t remove the old ones because it’d be a breaking change and this was after v1.0 RTM shipped.

c. All of this was before there was the TAP convention of using the “Async” suffix for “async methods” introduced with .NET 4.5.

  1.   Then the async/await features came:
    

a. The “Async” naming suffix for our methods was merely an accident (2c), but folks would expect to be able to await the outcome of these methods.

b. Other than the “Async” suffix, these aren’t any different from other operators.

c. So we added await support for every IObservable.

If we’d do it again, we’d likely keep the First/Last/Single family async from the get-go, i.e. IO -> IO, providing just a single blocking way to escape the monad (which is where the unsafePerformIOhttp://www.haskell.org/ghc/docs/latest/html/libraries/base/System-IO-Unsafe.html danger of the operation – namely the reduction of concurrence, hence the chance of deadlocks – is obvious). But that’s hindsight now ☺.

ToTask makes a sequence hot, because one rarely expects to see a cold task (they exist, cf. the constructor on Task). In fact, it’d cause unexpected behavior if you’d await ithttp://stackoverflow.com/questions/24237266/why-awaiting-cold-task-does-not-throw. Other than that, the call to GetAwaiter on an IObservable is also an act that makes it hot. It’s similar to the typical example of building custom awaiters for e.g. a button Click event, so you can await the button being clicked. It’s not until the await is started that an event handler is added to the button. Any click events that happen before that point in time are not observed.

From: slorion [mailto:[email protected]]
Sent: Tuesday, August 26, 2014 7:57 AM
To: Reactive-Extensions/Rx.NET
Subject: Re: [Rx.NET] Unexpected behavior with async/await on IObservable (#16)

Ok, thank you for the fast answer! I now understand why the behavior is like that, but I still find it very surprising and it is quite easy to get caught by it. From an API consumer perspective, I would expect AsyncWithObservable and AsyncWithTask to have the same behavior.

I included the tests with s.First() for completeness sake, but we are not using that method. It was just a curiosity I saw while making the repro code.


Reply to this email directly or view it on GitHubhttps://github.com//issues/16#issuecomment-53433980.

from reactive.

slorion avatar slorion commented on August 18, 2024

OK, thank you for the very detailed and informative answer. As you point out, the confusion comes from the method name. We will just have to keep that in mind in the future. Just for the record, our use case is that we have a device (NI CompactRIO) that will immediately returns a value upon connection, so we have to subscribe to the Event before actually connecting to it.

from reactive.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.