Giter Site home page Giter Site logo

raquo / airstream Goto Github PK

View Code? Open in Web Editor NEW
235.0 13.0 25.0 1.39 MB

State propagation and event streams with mandatory ownership and no glitches

License: MIT License

Scala 100.00%
functional-reactive-programming reactive-streams state-management scala scalajs scala-js

airstream's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airstream's Issues

Improve/replace combineWith

Consider EventStream.combineWith:

  def combineWith[AA >: A, B](otherEventStream: EventStream[B]): EventStream[(AA, B)] = {
    new CombineEventStream2[AA, B, (AA, B)](
      parent1 = this,
      parent2 = otherEventStream,
      combinator = CombineObservable.guardedCombinator((_, _))
    )
  }

// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.combineWith($y).map2(Point)

This combinator can be replaced with the more generalized form:

// simply exposing the additional parameter avoids unnecessary tuple allocation
// also, type parameter AA is redundant
  def zipWith[B, C](that: EventStream[B])(f: (A, B) => C): EventStream[C] =
    new CombineEventStream2(
      parent1 = this,
      parent2 = that,
      combinator = CombineObservable.guardedCombinator(f)
    )

// replacement for combineWith
  def zip[B](that: EventStream[B]): EventStream[(A, B)] =
    zipWith(that)(_ -> _)

// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.zipWith(y)(Point)

What do you think?

Test: Verify Signal's initialValue laziness

The following code and comment in Signal.scala:

/** Here we need to ensure that Signal's default value has been evaluated.
    * It is important because if a Signal gets started by means of its .changes
    * stream acquiring an observer, nothing else would trigger this evaluation
    * because initialValue is not directly used by the .changes stream.
    * However, when the events start coming in, we will need this initialValue
    * because Signal needs to know when its current value has changed.
    */
  override protected[this] def onStart(): Unit = {
    tryNow() // trigger setCurrentValue if we didn't initialize this before
    super.onStart()
  }

is seemingly inconsistent with the behaviour initial value Failure() when .changes is the only consumer test and the documentation written based on the tested behaviour.

Figure out who is right, and if it's the documentation, figure out why the code isn't doing what it says it's doing. Perhaps the test behaviour is due to the very immediate nature of EventStream.fromSeq?

Also, we need to decide which behaviour is actually desirable, and document our reasons.

  • If we needlessly evaluate initialValue, how do we report errors from it? State needed special treatment for that.
  • If we don't evaluate initialValue, how is that a problem? Make a test that demonstrates the problem.

Rename the concept of Transaction?

I can never explain the meaning of a transaction without heavily referring the concept of propagation, and that tells me it's named incorrectly.

Update multiple Var-s in the same transaction

Calling Var.writer.onNext(event) emits the event in a new transaction. So, currently you can't update multiple Var-s "simultaneously". Which is to say, you will introduce an entirely avoidable glitch if you do, if you have a downstream observable that synchronously depends on both of the updated Vars.

We should have a method like Var.onNext(var1 -> value1, var2 -> value2, ...) that would let us update multiple Vars in the same transaction.

I feel like I've done this before, but I can't find any evidence of that.

PeriodicEventStreamSpec randomly failed (just once so far)

Just got this test fail randomly (just once out of many runs). Not sure if it's fine or maybe some sneaky race condition there.

[info] PeriodicEventStreamSpec:
[info] - emitInitial=true, resetOnStop=true *** FAILED ***
[info]   WrappedArray(Effect(obs1,4), Effect(obs1,1)) did not equal WrappedArray(Effect(obs1,1)) (PeriodicEventStreamSpec.scala:49)
[info]   Analysis:
[info]   WrappedArray(0: Effect(obs1,4) -> Effect(obs1,1), 1: Effect(obs1,1) -> )

...

[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	com.raquo.airstream.eventstream.PeriodicEventStreamSpec

Investigate Observable Completion

Currently Airstream has no concept of "completing" observables.

Completion is a special message emitted by an observable that notifies all observers – both internal and external – that the observable is done and will not produce any more events. Imagine onComplete() in addition to onNext(event) and onError(err).

This concept is a natural fit for event streams, but it needs to be adapted to signals. Unlike streams, Signals carry a current value, so for Signals completion also means that their current value will no longer change.

For example, Val-s would be completed on initialization. stream.take(1) would complete after one event. If Airstream had a take operator, that is. It doesn't, and I think we must at least have that much before we go all the way to implement completion.

The benefits of completion as a feature are not quite clear to me at the moment. I'm yet to see a real life pattern that requires completion. Which is not to say that such patterns don't exist, I just haven't run into them myself yet, or maybe I just tend to structure my code differently because I have Signals in Airstream. Not quite sure.

As for performance...

Completion will allow all child observables to remove themselves as internal observers from the completed observable. The completed observable can actually do this by itself without the completion feature, but what the completion feature allows is propagating this completion down the chain of observables – since an observable that only depends on completed observables is (generally) considered completed.

This chain reaction can result in early disposal of completed observables and subscriptions, which could potentially reduce memory usage (a subscription that looks at a completed observable can be killed without waiting for the owner to kill it).

One practical application where this could be useful is a pattern where an a parent component renders a dynamic list of children, and has an event bus that is sourced from streams generated by those children. With completion feature, we would complete the streams exposed by a child component when said child gets unmounted, the parent's event bus would be notified about that and would stop listening to this completed stream.

However, we already have a solution for this: eventBus.writer.addSource requires an Owner which will remove the source observable from event bus (e.g. when the component it belongs to is unmounted). Granted, this is a rather ad-hoc solution whereas completion would be a generalized solution to this problem.

But I'm not sure how useful completion is outside of this pattern. All the cases I ran into where I wished for completion for a performance gain involved firing one event before completing. That does not feel like much of an Observable to me, and I'm not sure if Observables should be optimized for what essentially is a singular callback pattern.

Is component.$mountEvents.collect { case ev: NodeDidMount => ev }.foreach(doStuff) all that better than a simple component.onMount(doStuff) callback? Is it such a big a problem that this stream does not stop after a NodeDidMount event was published (let's assume there will only ever be one such event)? Should this be solved differently, perhaps with a .once(doStuff), a new method that will unsubscibe the observer after one event?

Ultimately, observable completion is a big feature, and has complex interactions with other features. Without a clear need, it will remain unimplemented for now. I just wanted to get this out of my head.

Cats effect support

I would love to see some way of using an effect type with this library. I understand that the library itself doesn't need it, but interfacing with pure fp code, it would make sense to allow observers to pass wrapped effects instead of effectful functions.

Var#updater

Var#writer is great when you want to set a new value:

stream --> myVar.writer

But updating a Var in a similar manner is more annoying:

stream --> Observer[Ev](ev => myVar.update(currValue => mod(currValue, ev)))

I think it could just be:

stream --> myVar.updater[Ev]((ev, currValue) => mod(currValue, ev))

Or, well, something like that. The updater would return an Observer, similar to writer.

map or contramap on Observer?

The signature of map on Observer[A] is

def map[B](project: (B)  A): Observer[B]

which looks similar to the concept of contramap

Would it be more intuitive to rename it?

Glitch when updating a Var while reading its state

While Airstream solves vast majority of FRP glitches, a particular grief-causing pattern has emerged:

var n = 0
val clickBus = new EventBus[Unit]
val state = Var[List[Int]](Nil)
clickBus
  .events
  .map(_ => n = n + 2)
  .flatMap(_ => EventStream.seq(List(n -2, n - 1), emitOnce = true))
  .withCurrentValueOf(state.signal)
  .foreach { (next, currState) =>
    state.set(currState :+ next)
  }
}
clickBus.writer.onNext(())
println(state.now())

source: gitter

User expects state to contain List(0, 1) at the end, however in fact state will contain only List(1) because when the second event happens, currState will still be Nil because every state.set call will only update Var's state in a new transaction, which is created and thus scheduled AFTER both EventStream.seq transactions were created & scheduled, so it will also run AFTER they have completed.

To simplify, here is a similar pattern that results in the same issue for similar reasons:

var n = 0
val clickBus = new EventBus[Unit]
val state = Var[List[Int]](Nil)
clickBus
  .events
  .foreach { _ =>
    n = n + 2
    state.set(state.now() :+ (n - 2)
    state.set(state.now() :+ (n - 1)
  }
}
clickBus.writer.onNext(())
println(state.now())

source: also gitter, but older, can't find it now

In the example above, the call to state.now() will return Nil both times, because the transaction kicked off by state.set(state.now() :+ (n - 2) hasn't processed yet, because the current transaction hasn't finished yet.

One solution could be to replace state.set(state.now() :+ (n - 1) with new Transaction(_ => state.set(state.now() :+ (n - 1)), which would delay evaluation of state.now() until a new transaction which is scheduled after the transaction created by state.set(state.now() :+ (n - 2). However, this is ugly both visually and conceptually. This call will actually create two transactions. Maybe we can add a nowSync(currValue => ()) to Var which would trigger the callback only after the current transaction or even ALL transactions have been cleared?

Alternatively, we already have update(mod: A => A) method on Var, however it evaluates the next state immediately when called. Perhaps it should instead delay the evaluation until the transaction that it creates actually starts executing.

Currently we have:

def update(mod: A => A): Unit = {
    val unsafeValue = now()
    val nextValue = Try(mod(unsafeValue))
    new Transaction(setCurrentValue(nextValue, _))
  }

It would be pretty simple to move unsafeValue and nextValue evaluation inside the Transaction callback.

Yet another alternative is for the user to introduce an asynchronous delay somewhere between two events that they fire. So add delay(0) to EventStream.seq(List(n -2, n - 1), emitOnce = true) in the first example, or wrap state.set(state.now() :+ (n - 1) in js.timers.setTimeout in the second one.

Also I should document this issue and workarounds more clearly in Laminar & Airstream docs.

If anyone has other ideas or concerns about proposed solutions, please speak up. This issue will likely require breaking changes to the API where it will still compile but the timing logic would be different, requiring manual review of your usages of Var.update and such.

Error Handling

As documented, currently an exception thrown anywhere in observables or observers is undefined behaviour. Obviously this is a non-starter for any production application, so the next version of Airstream will include error handling.

I haven't thought much about the specifics, so no special design decisions just yet, but in broad strokes I think it'll look something like this:

  • the failure in an observable can be recovered from using methods like recover(err => value)
  • the failure should be toxic (if unhandled, dependent observables should fail as well)
    • All of them, or just some? Does e.g. EventBus need an ErrorHandlingStrategy param?
  • observers should be notified of errors with an optional onError
  • it should be possible to print out all unhandled errors to dev console. Maybe a global stream exposing unhandled errors?

I will consider introducing observable completion at the same time, but no promises that I'll decide to implement it (see #1)

Bug: Observable.removeObserver can cause a (usually minor) memory leak

Adding an Observer to an Observable requires you provide an Owner, and produces a Subscription. Normally the owner should take care of killing this subscription, but you can also kill it manually before the owner decides to do that.

The right way to do this is to call subscription.kill().

However, Airstream also provides a removeObserver(observer) method on Observable. While it does correctly remove the observer from the observable, it does not call subsription.kill() because it has no reference to the subscription. This means the Owner continues to keep a reference to the subscription (and thus to both observable and observer) until the Owner decides to kill them.

This is an error in my API design, and is contrary to users expectations. I will fix this one way or another, likely by removing the removeObserver method. However, the exact solution depends on how I decide to implement raquo/Laminar#33.

For now, avoid using the removeObserver method in favour of subscription.kill

InvalidStateError: XMLHttpRequest state must be OPENED.

Setting headers in AjaxEventStream.initRequest results in this error at runtime.
This can be fixed by setting the headers in AjaxEventStream.sendRequest instead.

For reference,

  /**
   * Sets the value of an HTTP request header. You must call setRequestHeader()
   * after open(), but before send(). If this method is called several times with the
   * same header, the values are merged into one single request header.
   *
   * MDN
   */
  def setRequestHeader(header: String, value: String): Unit = js.native

Custom operator : a non commutative variant of combineWith

Hi I am trying to implement a custom operator that has the same type as combineWith but with different semantics.It emits only when the first stream emits ,combining with the latest element of the second stream.This can be think of as an non commutative version of current combineWith.However the required modification seems not obvious :

/** Stream that combines the latest values from two streams into a tuple.
  * Only fires after both streams have sent a value.
  *
  * Note: this stream forgets any previous values of parent streams once it's stopped
  *
  * @param combinator Note: Must not throw. Must have no side effects. Can be executed more than once per transaction.
  */
class CombineEventStream2[A, B, O](
  parent1: EventStream[A],
  parent2: EventStream[B],
  combinator: (Try[A], Try[B]) => Try[O]
) extends EventStream[O] with CombineObservable[O] {

  override protected[airstream] val topoRank: Int = (parent1.topoRank max parent2.topoRank) + 1

  private[this] var maybeLastParent1Value: Option[Try[A]] = None
  private[this] var maybeLastParent2Value: Option[Try[B]] = None

  parentObservers.push(
    InternalParentObserver.fromTry[A](parent1, (nextParent1Value, transaction) => {
      // println(s"> updated p1 value to $nextParent1Value")
      maybeLastParent1Value = Some(nextParent1Value)
      maybeLastParent2Value.foreach { lastParent2Value =>
        internalObserver.onTry(combinator(nextParent1Value, lastParent2Value), transaction)
      }
    }),
    InternalParentObserver.fromTry[B](parent2, (nextParent2Value, transaction) => {
      // println(s"> updated p1 value to $nextParent1Value")
      maybeLastParent2Value = Some(nextParent2Value)
      maybeLastParent1Value.foreach { lastParent1Value =>
        internalObserver.onTry(combinator(lastParent1Value, nextParent2Value), transaction)
      }
    })
  )

  override protected[this] def onStop(): Unit = {
    maybeLastParent1Value = None
    maybeLastParent2Value = None
    super.onStop()
  }

}

Create more combineN / tupleN classes

We need combineWith and map that supports more than two params.

Find the most compact way to do this. For mapN perhaps switch from implicit classes to methods with evidence restricting the class's type params. For combine, perhaps implement a generic class than can handle N parents.

JVM support

@raquo
I would like to use Airstream to build Java FX applications.
Would you be open to cross publishing the library? I can submit a PR for the same.

Settle on a Signal `==` performance solution

Signals only emit values that are different from their current state. "different" means Signal does an == check. This is very convenient, and in general improves performance by eliminating redundant DOM calls, but can potentially be expensive in certain situations, e.g. when comparing large, almost identical Map-s.

Currently we have com.raquo.airstream.util.Ref to deal with this, but the wrapping is annoying to deal with. Perhaps I should provide some helpers to deal with wrapped values easier, but I'm not convinced that this is the best solution.

I will look more into this when I get the time to work on memoization in Laminar (raquo/Laminar#38), I think there might be some common ground between these issues.

Re-evaluate signal's current value when restarting signal

val state =  Var(0)
val x10signal = state.signal.map(_ * 10)
val x100signal = x10signal.map(_ * 10)

Currently, when you first start x100signal, it initializes its initial value (not a typo – signals' initial values are lazy) by pulling the current value of x10signal, which initializes the initial value of x10signal by pulling state.signal's current value. However, if afterwards you stop x100signal, then after some time start it again, this pulling-current-value chain will not happen, x100signal will simply start listening to new x10signal updates from this point on. This means that if state was updated while x100signal was stopped, x100signal will not receive this update when it's started again.

This is documented behavior – observables are lazy, so you shouldn't stop the ones you care about – but it would be more useful if x100signal could recursively pull its parent's current value not just when initializing the signal's initial value, but every time it's started.

This would have convenient practical effects in Laminar when re-mounting unmounted elements – those would be brought up to date when they're re-mounted.

However, very importantly, this measure can't cancel out laziness per se, and specifically this can not possibly replay the EventStream events that fired while the signal was stopped, so we will not be able to pull an updated current value for signals that depend on event streams, such as stream.foldLeft(...) or stream.startWith(...).

This caveat will make undesired behavior more rare, but more specialized and harder to debug. However, it is probably a good tradeoff, since it should be more obvious that any events you've missed will not be re-emitted, than how your signal won't pull a parent's current value when it's reactivated.

gitter context

Should flatMap restart the inner observable if it's re-emitted?

Consider the following code for both SwitchStreamStrategy and SwitchSignalStrategy:

parentObservable.flatMap(parentEvent => makeChildObservable(parentEvent))

if parentObservable emits 1 then 2, and makeChildObservable(parentEvent) returns the exact same childObservable both times, that childObservable will be stopped and then started again when the parent emits 2.

I'm inclined to change the behaviour so that childObservable is not restarted in this case, so that it just continues running, but I'm not sure. Some streams have side effects when they start (such as the new AjaxEventStream), so while this should be a rare issue, it's not just a minor performance concern, but a potentially breaking behaviour change.

My thinking is that if the reference to childObservable is exactly the same in both cases, the user likely intended to cache it for some reason, probably to avoid recreating / restarting it.

The main question is, what do users expect to happen in this case? Do you expect nothing to happen when switching from childObservable to the exact same childObservable (same reference, not just ==), or do you expect it to be stopped and immediately started again? Do you have code that would be affected by this?

I could make this configurable if there is demand for that, but I still need to figure out which default would make the most sense.

Consider keep-alive mechanisms to extend ownership duration

Observable completion (#23) issue talks about killing subscriptions from the stream / producer side, usually to terminate the subscription early, such as the take / takeWhile operator (#33).

On the other hand, sometimes we actually want to extend the duration of the subscription for longer than the owner would normally allow. For example, in Laminar, element owners kill their subscriptions on unmount. So if you make an Ajax request and unmount the component before the response comes in, and nothing else listens to the response, the response will be ignored. Usually this is desired, but if the response observer has a side effect that you want to run regardless of the element's mount status, that side effect won't run.

To mitigate this we could mark this observer with something like keepAlive(maxEvents = 1, timeoutMs = 30000), meaning that any subscription created from it will not terminate immediately but will remain active for a maximum of one incombing event or a maximum of 30 seconds after the owner killed it, whichever comes first.

However, I'm not 100% sure if this is a good idea, because I think maybe the real solution is to use an observer owned by a global owner rather than by an element in such cases. Currently it's not really an option because the global owner would then accumulate many subscriptions from now-unmounted components over time, but maybe if we implement observable completion, we would be able to solve this (e.g. an ajax response stream would complete after emitting the result and any subscriptions listening to it would be killed since no new updates are expected).

Just some thoughts to get this out of my head.

Implement EventStream.periodic

Need a stream that emits values at an interval when it's active. it's possible to emulate with a DIY EventBus hack, but we should have a proper way to do this.

For API design, bare minimum is to emit Unit-s. But it might be quite cheap and useful to emit an index-like number instead. But then we need an option to reset the number on deactivation. Maybe make the counter a separate method.

Also, consider putting a limit on number of emitted values? But that will be more useful as a take() or until(), and well that's more of a #23 thing, so maybe another day.

Add spacedWith combinator

def spaced(millis: Int): EventStream[I] =
  spacedWith(_ => millis)

def spacedSome(f: PartialFunction[I, Int]): EventStream[I] =
  spacedWith(f.applyOrElse(_, (_: Any) => 0))

/* Introduces time intervals between events */
def spacedWith(f: I => Int): EventStream[I]

Behavior:

time          0s        1s         2s         3s

events        ----------e1e2-------e3

delay(1s)     ---------------------e1e2-------e3

spaced(1s)    ----------e1---------e2---------e3

I can submit a PR if this looks useful.

Provide a way to update Var-s

Desired API:

val counter = Var(0)
counter.writer.update(_ + 1)

Whereas currently you need to mess with creating a SignalViewer of Var's signal, and read current value from there.

Note: this will expose the Var's current value to anyone with a Var reference. This is not desirable. writer exists as a separate entity because of that, so maybe if we are fine violating that we should merge writer into the Var itself, but if we do that, do we do the same to EventBus?

Alternatively, we could put the update method on the Var itself, AND also copy over onNext / onError / onTry methods to it that would just call into Writer.

Concatenation of EventStreams

Can we concatenate multiple EventStreams?

The use case is that I have two streams that should emit elements after each other:

val e1: EventStream = ???
val e2: EventStream = ???

 // all elements in e2 should come only after e1 has finished emitting items
val combined = e1 concat e2

In the readme, I saw that there is no concept of closing Observables. So, I was wondering whether this is possible?

How to use or override protected[airstream] members when extending observables?

When extending Observable you need to provide an implementation for topoRank. But it's protected[airstream], which is annoying, because you either need to make that field public (ew), or put your subclass into the com.raquo.airstream package (double ew).

Instead what we should do is make topoRank simply protected (without [airstream]), and offer a private[airstream] def topoRank(o: Observable[_]): Int on the Observable companion object, or something like that.

Now, there are a bunch of other methods which are protected[airstream] and also have a concrete implementation. This includes tryNow, now, onNext, onError, onTry. I'm not sure if you can call them or even override them from your subclass. I think I need to fix those too, similarly to topoRank.

Semantics of `toSignal`

What's the semantics of toSignal?

I guess that it's an operator that takes an initial values, and creates a signal that, whenever an event occurs in the source event stream, switches to that event.

Like FRP stepper.

stepper

(Image from to Reactive Banana documentation)

But how does toSignal achieve this effect, taking into consideration that both EventStream and Signal are "lazy"? How does it ensure that it doesn't "miss" any event (which would be reflected in its state)?

Add support for Either[Throwable, *]

This can be achieved with the following additions.

object EventStream {

  def fromEither[A](ea: Either[Throwable, A]): EventStream[A] =
    EventStream.fromCustomSource[A](_ == 1, (fa, fe, _, _) => ea.fold(fe, fa), _ => ())
}

implicit object SwitchEitherEventStream extends FlattenStrategy[Observable, Either[Throwable, *], EventStream] {

  def flatten[A](parent: Observable[Either[Throwable, A]]): EventStream[A] =
    new SwitchEventStream[Either[Throwable, A], A](parent, EventStream.fromEither)
}

This is particularly useful for decoding JSON using libraries like Circe.

Should mapTo take its parameter by name?

mapTo(value) is very useful to reduce the map(_ => value) boilerplate, however it accepts the parameter by value, i.e. value is evaluated once, when the stream is defined. However, it is often desirable to pass value by name, i.e. re-evaluate it on every fired event, for example to grab current values from the DOM in Laminar: clickStream.mapTo(myInput.value).

I faced this question when I implemented mapTo the very first time, and decided to use a by-value parameter because that's what Monix does, and that's effectively what all JS libraries do (because JS has no concept of by-name parameters). So the only reason was to satisfy people's expectations. This actually went against my personal desires.

@lolgab mentioned that the current behaviour was not intuitive to him either, so I think it's time to review it.

Personally I am willing to overlook the difference in behaviour with Monix because Airstream is specifically designed as a reactive layer for UI libraries, and thus needs to be convenient to use around mutable JS DOM references. We simply can't get away from them, they're part of the JS platform.

I think mapTo accepting its parameter by name would yield a net reduction in unexpected results. However, it would also make the current mapTo functionality unavailable, so maybe we should also introduce a mapToValue(value) method that would accept its param by value like mapTo does today.

Looking for comments and feedback on this proposal.

Airstream users, do you use emitOnce = true / false? How much and why?

Question prompted by @ajaychandran's suggestion in another issue / pr to make this part of the API smoother.

The methods EventStream.fromSeq, EventStream.fromValue and EventStream.fromTry currently require you to specify this parameter. It's not optional because originally I wasn't sure what the default should be. My impression right now is that emitOnce = false is the most reasonable default.

So I'm inclined to add a default value for this paramtere, emitOnce = false, or, if people are actually using emitOnce = true for good reasons, to provide separate fromSeqOnce / fromValueOnce / fromTryOnce methods.

Make topoRank field public

val topoRank being protected[airstream] prevents users from extending EventStream or Signal, which is silly. The field should probably just be public.

EDIT: Is this actually true? I think I confused something. People are extending EventStream just fine it seems.

Add splitByIndex

Airstream provides a split operator that can used to improve the rendering performance of a list of elements.
However this only works if the element can be mapped to a unique key (or in other words, the list has no duplicates).

A list with duplicates could be handled by using the index of an element as a key.
WDYT?

Throttle timing logic is wrong

ThrottleEventStream is supposed to update lastEventTimeMillis only when the it emits, not every time the parent emits.

The way it is, it behaves more like debounce where it waits for a pause in upstream events.

Easy fix, I'll get to it some time this week.

Sampling a signal shouldn't cause the overhead of observers

Currently we add empty / noop observers to keep the sampled signal started. That's fine, but it inflates the array of the sampled signal's observers for no reason. Since these noop observers have no payload, we could instead increment / decrement a noopObservers counter. We'll need to provide public noop Observer and InternalObserver instances to compare against when adding an observer, and make sure to use them.

This will cost us one Javascipt Number (64-bit) per every observable. Well, maybe we make it js.UndefOr[Int] to avoid that.

Anyway, just a small perf improvement for certain uses, for when I'm in the mood.

akka stream 's `Flow` like abstractions?

Flow in akka stream allows processing pipelines to be reused both for source and sink,which are correspondingly called events and writer in airstream. I have worked with akka-stream and really find this powerful,Can we implement this ? Would like to help

Add EventStream.mapError

This can be implemented using:

final class MapErrorEventStream[A](val parent: Observable[A], f: PartialFunction[Throwable, Throwable])
    extends EventStream[A]
    with SingleParentObservable[A, A]
    with InternalNextErrorObserver[A] {

  protected[airstream] val topoRank: Int = parent.topoRank + 1

  protected[airstream] def onNext(nextValue: A, transaction: Transaction): Unit =
    fireValue(nextValue, transaction)

  protected[airstream] def onError(nextError: Throwable, transaction: Transaction): Unit =
    try fireError(f.applyOrElse(nextError, identity[Throwable]), transaction)
    catch { case NonFatal(t) => fireError(AirstreamError.ObserverErrorHandlingError(t, nextError), transaction) }
}

This is useful when using Ajax streams with services that can return specific errors.

Sort out doc linking warnings

[warn] <...>src/main/scala/com/raquo/airstream/eventstream/DebounceEventStream.scala:29:3: Could not find any member to link for "delayFromLastEventMillis".
[warn]   /** Every time [[parent]] emits an event, we clear the previous timer and set a new one.
[warn]   ^
[warn] <...>src/main/scala/com/raquo/airstream/ownership/Owned.scala:42:3: Could not find any member to link for "Owned".
[warn]   /** This method will be called when this [[Owned]] resource is killed, either manually or by the owner.
[warn]   ^

This mildly annoys me, but not enough to spend time looking for a proper solution (rather than just silencing the warnings). IntelliJ picks up the links just fine to jump to definition, don't know why sbt itself wants fully qualified names.

I'm not yet generating docs so this is not a priority, but if anyone knows how to deal with this please chime in.

Dynamic merge of streams?

Is it possible to achieve sth like that? (Sorry if I missed some part of readme)

val globalStream=???

def newLocal()={
val s1 = ???
globalStream add s1
}

and globalStream would merge all streams inside newLocal() whenever this method is called.

Var#zoom

If we have a Var[A] we should be able to get Var[B] given A => B and B => A (or (B, A) => A for an "updater" version).

Implementation concerns:

  • Timing of transactions must be consistent with "Var#update" and other methods
  • For the purpose of hasDuplicateTupleKeys used in Var.set et al, the zoomed var should be considered to be the original underlying var (even after several zooming layers) to make sure we don't allow updating the same Var more than once in the same transaction.

Regarding Functional Reactive Programming

Hello,

thank you for creating this library! It looks very interesting and promising.

I've noticed you used the term "FRP" in the description, and obviously the library seems to be an FRP implementation (or at very least, strongly inspired by FRP).

What's your FRP background? What are your opinions on FRP? Are you basing your work on Conal Elliot's FRP semantics? What other FRP solutions is Airstream inspired by?

I'm asking out of interest; I'm very interested in practical FRP implementations, especially all efforts that incorporate mixing reactive operations with I/O.

Errors not propagated in flatMap

In the following example, errors are not propagated to the observers of step3.
Is this the expected behavior?

val bus  = new EventBus[Int]
val once = false
val src  = bus.events

val step1 = src.flatMap(i => EventStream.fromTry(Try(1 / i), once))
val step2 = step1.map(identity)
val step3 = step1.flatMap(EventStream.fromValue(_, once))

def logger(tag: Any): Observer[Any] = new Observer[Any] {
  def onNext(nextValue: Any): Unit     = dom.console.info("[%s] %s", tag, nextValue)
  def onError(err: Throwable): Unit    = dom.console.error("[%s] %s", tag, err)
  def onTry(nextValue: Try[Any]): Unit = nextValue.fold(onError, onNext)
}

step1.addObserver(logger(1))(unsafeWindowOwner)
step2.addObserver(logger(2))(unsafeWindowOwner)
step3.addObserver(logger(3))(unsafeWindowOwner)

bus.writer.onNext(0)
bus.writer.onNext(1)
bus.writer.onNext(0)
bus.writer.onNext(1)
[1] java.lang.ArithmeticException: / by zero
[2] java.lang.ArithmeticException: / by zero
[1] 1
[2] 1
[3] 1
[1] java.lang.ArithmeticException: / by zero
[2] java.lang.ArithmeticException: / by zero
[1] 1
[2] 1
[3] 1

How should error handlers treat Fatal errors?

As per Scala JS semantics,

UndefinedBehaviorErrors are fatal in the sense that they are not matched by case NonFatal(e) handlers. This makes sure that they always crash your program as early as possible, so that you can detect and fix the bug. It is never OK to catch an UndefinedBehaviorError (other than in a testing framework), since that means your program will behave differently in fullLinkJS stage than in fastLinkJS.

Does this imply that error handlers such as the one in EventStream.fireValue should be modified to handle only NonFatal errors?

Make observers handle their own errors

As discussed in gitter with @mattjacobus

  • Add handleObserverErrors to Observer factories that let you handle errors like withRecover and fromTry
  • Default that param to true (breaking change!) so that Observer's onError callback can process an error originating in its onNext callback
  • When processing an observer error, internally set handleObserverErrors flag to false, so that errors in error handling logic fall through as unhandled (to prevent infinite loops)
  • Not sure if this will make it into v0.12.0, swamped with other prs and issues atm
  • Note: check the Airstream error handling docs first

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.