Giter Site home page Giter Site logo

Comments (5)

adamjones2 avatar adamjones2 commented on August 18, 2024

I also note

var source = Observable.Return(-1L).ObserveOn(ThreadPoolScheduler.Instance).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

works, but the windowing function can't modify the source that way without putting the entire thing on that scheduler, so this is only a hack of the input to workaround the function's deficiency and not a true solution. Perhaps useful for diagnosis though.

EDIT: I've also realised this causes the subscribe action of the original sequence (Observable.Interval(TimeSpan.FromSeconds(1))) to be offloaded onto another thread as well, which is causing issues for my use case, so I can't use it even as a workaround.

from reactive.

adamjones2 avatar adamjones2 commented on August 18, 2024

In the end I had to hand-spin the implementation of the function a different way, but I'd still appreciate some insight into this and would suggest the below as a candidate for inclusion into the library.

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isWindowStart) => 
    Observable.Create<IObservable<T>>(observer =>
    {
        Subject<T>? currentWindow = null;
        return source.Subscribe(Observer.Create<T>(
            next =>
            {
                if (currentWindow == null || isWindowStart(next))
                {
                    currentWindow?.OnCompleted();
                    currentWindow = new Subject<T>();
                    observer.OnNext(currentWindow);
                }
                currentWindow.OnNext(next);
            },
            ex => { currentWindow?.OnError(ex); observer.OnError(ex); },
            () => { currentWindow?.OnCompleted(); observer.OnCompleted(); }
        ));
    });

from reactive.

idg10 avatar idg10 commented on August 18, 2024

We can make a relatively simple change to your original example, i.e. this:

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow)
{
    var shared = source.Publish().RefCount();
    var windowEdge = shared.Where(isStartOfNewWindow).Publish().RefCount();
    return shared.Window(windowEdge, _ => windowEdge);
}

We can replace the last line with this:

return shared.Window(_ => windowEdge);

The significance of this single-argument Window overload is that it guarantees to partition the input elements. I.e., each input element will appear in exactly one window.

There is an inherent limitation with the openings/closing overload, which is what your example (and also the IntroToRx sample on which that was based) uses:

public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(
    this IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)

It's impossible for this overload to make that promise of always delivering each item into exactly one window. This turns out to be a necessary (although perhaps not entirely obvious) upshot of the fact that this overload is designed specifically to allow gaps or overlaps.

The only reason you would specify openings and closings separately is if you didn't necessarily want strict partitioning. This overload allows gaps between windows (in which case any items falling between windows will be dropped, by design), or overlaps (in which case the same element will appear in multiple windows if those windows overlap).

Since this openings/closings overload accepts two distinct observables, and since Rx does not offer any way for two distinct observables to emit items at exactly the same time, it's not actually possible to use this openings/closings overload as a more general version of the closings-only overload shown above. You might expect that in situations where the closings callback returns the same observable source as was passed as the openings source, it would be possible for Rx to detect that this is actually the same thing, and to therefore somehow detect when two events emerging from these two sources are somehow "the same" event. But in general that can't work because Window subscribes to each observable returned by the closing selector. In general in Rx, multiple subscriptions to the same source are allowed to produce different sequences. (Rx has no way of detecting when something is in fact a 'hot' source, where all subscribers receive exactly the same events at logically the same time.)

If events in Rx were timestamped, then it would be possible to say definitely whether two events occurred at exactly the same time, but since they are not, there is no concept of precisely simultaneous events.

So this openings/closings overload is inherently imprecise (because when what we might think of as 'the same' event emerges through multiple subscriptions, these multiple deliveries happen at slightly different times). Fundamentally, because there's no way to represent the idea that two distinct events happened at precisely the same time, this openings/closings operator is always going to be a bit fuzzy around the edges.

So why does introtorx.com suggest the openings/closings form? I can't provide a definitive answer to that because that example dates back to the original edition of the book, which I did not write.

When I made updates to the book to produce the 2nd (current) edition, I'm afraid I did not notice that the example you found can go wrong in this way.

I've created a new issue identifying that example as a doc bug: #2130

If you were able to try this change out and see if it works for you that would be great—if it turns out that there are other reasons you can't do it this way, it would be good to know.

from reactive.

adamjones2 avatar adamjones2 commented on August 18, 2024

Hi @idg10, many thanks for your detailed and thought out response, it's appreciated! That context all makes sense. I assume by

return shared.Window(_ => windowEdge);

you mean to say

return shared.Window(windowEdge);

as I can't find an overload like the former. I've tested it and it does appear to work, thanks! Simple solution in the end, don't know how I missed that one.

from reactive.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.