Giter Site home page Giter Site logo

akka-stream-contrib's Introduction

Akka Stream Contrib

This project provides a home to Akka Streams add-ons which does not fit into the core Akka Streams module. There can be several reasons for it to not be included in the core module, such as:

  • the functionality is not considered to match the purpose of the core module
  • it is an experiment or requires iterations with user feedback before including into the stable core module
  • it requires a faster release cycle

This repository is not released as a binary artifact and only shared as sources.

Caveat Emptor

A component in this project does not have to obey the rule of staying binary compatible between releases. Breaking API changes may be introduced without notice as we refine and simplify based on your feedback. A module may be dropped in any release without prior deprecation. The Lightbend subscription does not cover support for these modules.

akka-stream-contrib's People

Contributors

2m avatar andreas-sa-schroeder avatar andreas-schroeder avatar atamborrino avatar ennru avatar ffettinger avatar hochgi avatar ignasi35 avatar jbgi avatar johanandren avatar ktoso avatar liff avatar lostiniceland avatar lysandergg avatar manonthegithub avatar marcin-rzeznicki avatar mkubala avatar naferx avatar otto-ringhofer avatar patriknw avatar raboof avatar radusw avatar rucek avatar samueltardieu avatar shiv4nsh avatar tkawachi avatar tkroman avatar tomasmikula avatar zaharidichev avatar zhxiaogg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-stream-contrib's Issues

An initially-opened Pulse Stage should emit the first available element

It's actually not that clear to me why this test has an initialTimeout (and why the source repeats '1', wouldn't it be a more reliable test with Source(1 to 10) or so?

An initially-opened Pulse Stage
[info] - should emit the first available element *** FAILED *** (118 milliseconds)
[info]   The future returned an exception of type: java.util.concurrent.TimeoutException, with message: The first element has not yet passed through in 2 milliseconds.. (PulseSpec.scala:63)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.concurrent.Futures$FutureConcept.tryTryAgain$1(Futures.scala:531)
[info]   at org.scalatest.concurrent.Futures$FutureConcept.futureValueImpl(Futures.scala:550)
[info]   at org.scalatest.concurrent.Futures$FutureConcept.futureValueImpl$(Futures.scala:479)
[info]   at org.scalatest.concurrent.ScalaFutures$$anon$1.futureValueImpl(ScalaFutures.scala:275)
[info]   at org.scalatest.concurrent.Futures.whenReady(Futures.scala:676)
[info]   at org.scalatest.concurrent.Futures.whenReady$(Futures.scala:675)
[info]   at akka.stream.contrib.PulseSpecAutoFusingOff.whenReady(PulseSpec.scala:13)
[info]   at akka.stream.contrib.PulseSpec.$anonfun$$init$$7(PulseSpec.scala:63)

Sources for requesting from paged APIs

When streaming items from paged APIs I find myself often using unfoldAsync based implementations to solve the issue. Would a GraphStage based approach fit in here?

A possible implementation would support 3 different kinds of pagination:

  • Limit/Offset based
  • Page based
  • Next Link based (Like AWS does

What do you think?

FlowDeathPact

Allow terminating a stage on actor termination or the other way around, might be useful in Akka HTTP when we want to have an actor as long as the request/response flow is alive etc.

I implemented an initial version here: akka/akka#21076

Sample() stage

An operation which "drops every n-th element", useful when for example you just want to log some progress every 1000 elements etc: broadcast to the logging stream then sample().log()

Another use case: I want to migrate my data from X to Y, the migration is resumable, writes are idempotent, I sometimes need to write my offset, every 100 writes or so. sample().map(e => store(e.offset) (also known as mini-spark)

Ability to ack messages downstream from source

Messages are acked just after they are pushed downstream.

In my use-case I have to process the message, and if the processing fails, I have to nack the message (message TTL and dead letter exchange are defined on the AMQP queue).

ATM, I have created custom source and sink and ack the message in the sink (for my use case it's ok).

This issue is greatly inspired by this one: ScalaConsultants/reactive-rabbit#13

Is this features useful to anyone? How to implement it?

Add Debouncing

A useful bit of functionality I threw together: Given a time window, emit the latest value given a uniqueness function.

It might make sense to generalize this as a "time-aware reduce", rather than a pure "debounce"

See #60

Any interest in my ifChanged filter stage?

A few weeks ago I wrote the following to detect changes on a stream, only emitting new elements if they are not the same (in a user-defined way) as the previous element:

object StreamUtils {
  def ifChanged[T](hasChanged: (T, T)  Boolean = (a: T, b: T)  a != b): Flow[T, T, NotUsed] = {
    Flow[T]
      .map(Option(_))
      .prepend(Source.single(Option.empty[T]))
      .sliding(2)
      .collect {
        case Seq(None, Some(b))                         b
        case Seq(Some(a), Some(b)) if hasChanged(a, b)  b
      }
  }
}

If this is interesting, what would have to be added to make this acceptable? A Java API I assume?

Running graph twice is not safe.

Many of the javadsl methods are not safe when running the same graph twice.

For example, fold takes a concrete object as first parameters, such that all streams will use the same object. If you try to use with the List, then each run of the graph will reuse the same list, and results will be jumbled up together.

In the other extreme, there is Sink.lazyInit that has 2 levels of indirection. 1. the function. 2. the future. With this I can compensate for lack of methods that use Creator<> instead of concrete objects, but the code becomes a difficult to decipher mess.

DelayFlow stage with flexible delay management

I needed flexible mechanism to make delay in stream under certain circumstances (for example if http server goes down not to oveload it with redundant requests, delay for some time after receiving some number of 500 error codes in a row) and thought that it can be useful to others. I'll submit PR with my implementation soon. It will be great, if you'll decide that it would be useful.

FAILED: A bunch of tests failed in ValveSpec

[info] ValveSpec:
[info] A closed valve
[info] - should emit only 3 elements into a sequence when the valve is switched to open (1 second, 14 milliseconds)
[info] - should emit only 5 elements when the valve is switched to open (485 milliseconds)
[info] - should emit only 3 elements when the valve is switch to open/close/open (317 milliseconds)
[info] - should return false when the valve is already closed (542 milliseconds)
[info] - should emit nothing when the source is empty (157 milliseconds)
[info] - should emit nothing when the source is failing (46 milliseconds)
[info] - should not pull elements again when opened and closed and re-opened *** FAILED *** (261 milliseconds)
[info] A timeout occurred waiting for a future to complete. Queried 6 times, sleeping 15 milliseconds between each query. (ValveSpec.scala:171)
[info] org.scalatest.concurrent.Futures$FutureConcept$$anon$1:
[info] at org.scalatest.concurrent.Futures$FutureConcept.tryTryAgain$1(Futures.scala:538)

Source for zip files

A stage that works as a Source of data chunks extracted from zip files. In addition to regular files, the zip file might contain directories and other zip files.

FlowEnrichment stage

While working with akka-streams, I had a couple of flows where I had, for example, a (id, message) tuple but the ID was just carried along the stream. Everything in the stream itself (maps and filters, mostly) only cared about the message. The resulting code was pretty ugly (pattern matching the tuple at every point, with type annotations since the type-inference went haywire), and so I wrote a GraphStage that wraps a Flow and adds/removes the ID.

Do you think this is something that can be added here?

Add UnfoldFlow Source generator

a pattern I see comes up often in my code, is "unfolding" from flow.
I'll start with a pretty simple (yet useful) example:

consider Elasticsearch's scroll API. using it, you can traverse large amounts of data. So using the existing API, one might come up with something similar to (pseudo code - not real http client):

Source.unfoldAsync(initialScrollId) { scrollId =>
  val res: Future[HttpResponse] = httpClient.request(buildRequestFromId(scrollId))
  res.map { r =>
    val data = extractDataFrom(r)
    val next =  extractNextScrollIdFrom(r)
    if(data.isEmpty) None
    else Some(data -> next)
  }
}

well, since we are using streams, it might be much more efficient to use akka-https connection pool flows. only problem, we don't want redundant sub streams materialization, we might be using with Source.unfoldAsync:

//BAD EXAMPLE
Source.single(buildRequestFromId(scrollId)).via(connectionPoolFlow).runWith(Sink.head)

So, I'm suggesting to create an unfolding source from a flow, by building a graph similar to:

Source.unfoldFlow

API should be pretty straight forward:

def unfoldFlow[A,B,C,M](seed: A, flow: Flow[A,B,M])(unfoldWith: B => Option[(A,C)]): Source[C,M]

If you guys think it's useful, I'd be happy to create a PR for this.
and, IMO, this is something that could eventually migrate to the main streams module.

Add a retry graph factory, that works on flows

as discussed briefly on gitter, a really helpful API, would be a generic (generic as it can be) retry factory for flows.

1st draft (with some known issues, I haven't fixed yet) is described in this gist:
https://gist.github.com/hochgi/c8ee3273034fff8958d306459072fb22#file-akka-stream-retry-md

also, a proposed API will be submitted as a PR shortly (without implementation just yet).
I would like this issue/PR to be an open discussion on what would be the best API ("best" in terms of simplicity, generic, usefulness, etc'...).

also, if this contribution will get into the main streams code, it could be useful to have retry as a combinator (similar to how via is used).

Add an interval-based rate limiter

Imagine that you have to interact with an external service, which imposes limits on its clients regarding maximum allowed number of operations per some time unit.

Quite often such a service punishes request senders when they exceed that limit and, depending on the service provider, you may be charged extra or banned for several seconds/minutes.

We cannot use the FlowOps.throttle in such case, since it give control over how many elements will be passed within a period of time, but still not when it happens and due to some skew between the throttle's timer and the service's rate guardian ticks, we may exceed those allowed number of ops in a given time unit.

I did some research and create an interval-based rate limiter, which guarantee minimal interval between the downstreamed elements.

I'm going to prepare a Pull Request.

streams: request new feature: timed sorting

Clone of akka/akka#20643

Can we have timed sorting operation in akka streams?

Similar to takeWithin, This could be sortWithin

Possible method signatures

def sortWithin(d: FiniteDuration): Source[Out, Mat]
def sortWithin(d: FiniteDuration, c: Comparator): Source[Out, Mat]

Add an AccumulateWhileUnchanged stage

Given a stream of input elements, the stage will observe an arbitrary element property and accumulate the incoming elements while the value of the property remains unchanged. When the property changes, a sequence of accumulated elements will be emitted

Valve component pulls port (valve.in) twice

Test:

  "An open valve" should {
    "not pull elements again when closed and re-opened" in {

      val (probe, switch, resultFuture) = TestSource.probe[Int]
        .viaMat(new Valve(SwitchMode.Open))(Keep.both)
        .toMat(Sink.head)((l, r) => (l._1, l._2, r))
        .run()

      val result = for {
        _ <- switch.flip(SwitchMode.Close)
        _ <- switch.flip(SwitchMode.Open)
        _ = probe.sendNext(1)
        _ = probe.sendComplete()
        r <- resultFuture
      } yield r


      whenReady(result) {
        _ shouldBe 1
      }
    }
  }

Error :

The future returned an exception of type: java.lang.IllegalArgumentException, with message: requirement failed: Cannot pull port (valve.in) twice.
ScalaTestFailureLocation: com.omsignal.common.akka.stream.ValveSpec$$anonfun$1$$anonfun$apply$mcV$sp$1 at (ValveSpec.scala:37)
org.scalatest.exceptions.TestFailedException: The future returned an exception of type: java.lang.IllegalArgumentException, with message: requirement failed: Cannot pull port (valve.in) twice.

Directory watch - added, changed, deleted files as events

Idea for a new source, I have one lying around on my computer since earlier and there was discussions on akka/akka about such a source.

I'm thinking Source[(Path, ChangeType), SomeWayToCancel] or possibly skip that SomeWayToCancel since it can be achieved by combining with a KillSwitch

AWS S3 integration

Just passed AWS certification and going to do some AWS integration. Perhaps S3 is the simplest one so going to start from this.
Is it still hot topic?

Add PartitionWith stage

a rather simple, but useful stage:

class PartitionWith[In,Out0,Out1](p: In => Either[Out0,Out1]) extends GraphStage[FanOutShape2[In,Out0,Out1]]

useful when you need partition + map.
instead of having 1 partition + 2 map stages, code can be much simplified with this stage.

Documentation?

Is there going to be a standard for documenting what's available here, with usage examples, etc?

FAILED: Pulse Stage should emit the first available element

[info] PulseSpecAutoFusingOff:
[info] Pulse Stage
[info] - should signal demand once every interval (957 milliseconds)
[info] - should keep backpressure if there is no demand from downstream (565 milliseconds)
[info] An initially-opened Pulse Stage
[info] - should emit the first available element *** FAILED *** (174 milliseconds)
[info] A timeout occurred waiting for a future to complete. Queried 7 times, sleeping 15 milliseconds between each query. (PulseSpec.scala:63)
[info] org.scalatest.concurrent.Futures$FutureConcept$$anon$1:
[info] at org.scalatest.concurrent.Futures$FutureConcept.tryTryAgain$1(Futures.scala:538)

Failed CassandraSourceSpec

This test fails quite frequently in CI validation.

[info] - must stream the result of a Cassandra statement with several pages *** FAILED *** (473 milliseconds)
[info]   A timeout occurred waiting for a future to complete. Queried 11 times, sleeping 15 milliseconds between each query. (CassandraSourceSpec.scala:75)
[info]   org.scalatest.concurrent.Futures$FutureConcept$$anon$1:
[info]   at org.scalatest.concurrent.Futures$FutureConcept$class.tryTryAgain$1(Futures.scala:546)
[info]   at org.scalatest.concurrent.Futures$FutureConcept$class.futureValue(Futures.scala:558)
[info]   at org.scalatest.concurrent.ScalaFutures$$anon$1.futureValue(ScalaFutures.scala:74)
[info]   at akka.stream.contrib.cassandra.CassandraSourceSpec$$anonfun$1$$anonfun$apply$mcV$sp$2.apply$mcV$sp(CassandraSourceSpec.scala:75)
[info]   at akka.stream.contrib.cassandra.CassandraSourceSpec$$anonfun$1$$anonfun$apply$mcV$sp$2.apply(CassandraSourceSpec.scala:69)
[info]   at akka.stream.contrib.cassandra.CassandraSourceSpec$$anonfun$1$$anonfun$apply$mcV$sp$2.apply(CassandraSourceSpec.scala:69)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

Issue with Valve Component and completed Graph

Hello,

i'm using the valve component with akka-stream kafka to be able to pause and resume our kafka consumers.

I discovered an issue in the current implementation when trying to flip a valve on a finished stream (either completed or in error). In both case, the Future returned by flip will never complete because the callback in the ValveSwitch will never be invoked.

Here are 2 tests that will show the issue :

"a valve on an error'd graph" should {
    "return false when trying to flip it" in {
      val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int]
        .viaMat(Valve(Open))(Keep.both)
        .toMat(TestSink.probe[Int])(Keep.both)
        .run()

      val error = new RuntimeException("Boom !")
      sourceProbe.sendError(error)
      sinkProbe.request(1)
        .expectError(error)

      whenReady(switch.flip(Close)) {
        _ shouldBe false
      }
    }
  }

  "a valve on a completed graph" should {
    "return false when trying to flip it" in {
      val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int]
        .viaMat(Valve(Close))(Keep.both)
        .toMat(TestSink.probe[Int])(Keep.both)
        .run()

      sourceProbe.sendComplete()
      sinkProbe.expectSubscription()
      sinkProbe.expectComplete()

      whenReady(switch.flip(Open)) {
        _ shouldBe false
      }
    }

  }

The Valve is also affected by akka/akka#20503
Here is a gist that fixes the issue for the complete but not the error one : https://gist.github.com/fchaillou/fa34d1bd46d9c0f4db7c54fc8f535fe3

FAILED: TimeWindow flow test fails occasionally

[info] TimeWindow flow
[info] - should aggregate data for predefined amount of time *** FAILED *** (628 milliseconds)
[info] java.lang.AssertionError: assertion failed: expected OnNext(5), found OnNext(1)
[info] at scala.Predef$.assert(Predef.scala:170)
[info] at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:388)
[info] at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:373)

UnfoldFlowSpecAutoFusingOn failed

[info] UnfoldFlowSpecAutoFusingOn:
[info] unfoldFlow
[info]   should unfold Collatz conjecture with a sequence of 111 elements
[info]   - should with flow (207 milliseconds)
[info]   - should with buffered flow (158 milliseconds)
[info]   - should with function (117 milliseconds)
[info]   should increment integers & handle KillSwitch
[info]     should with simple flow
[info]       should killSwitch prepended to flow
[info]       - should fail instantly when aborted (11 milliseconds)
[info]       - should fail when inner stream is canceled and pulled before completion *** FAILED *** (23 milliseconds)
[info]         java.lang.AssertionError: assertion failed: expected OnError(java.lang.Exception: KILL!
[info] 	at akka.stream.contrib.UnfoldFlowSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$11.apply(UnfoldFlowSpec.scala:99)
[info] 	at akka.stream.contrib.UnfoldFlowSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$11.apply(UnfoldFlowSpec.scala:97)

consider adding a simpler API for retrying simple flows

this comes from a co-worker, I suggested Retry to.
in his case, we had a simple Flow[A,Future[B]],
which can easily be turned to Flow[A,Try[B]], using something like:

.mapAsync { f: Future[B] =>
  val p = Promise[Try[B]]
  f.onComplete(t => p.success(t))
  p.future
}

but in order to do something simple like retrying at most 5 times,
we had to come up with:

val flow: Flow[A,Try[B],M] = ...

val wrapped: Flow[(A,(A,Int)),(Try[B],(A,Int)),M] = Flow.fromGraph(GraphDSL.create(flow){
  implicit b => {
    f => {
      import GraphDSL.Implicits._

      val u = b.add(Unzip[A,(A,Int)]())
      val z = b.add(Zip[Try[B],(A,Int)]())

      u.out0 ~> f ~> z.in0
      u.out1   ~>    z.in1

      FlowShape(u.in, z.out)
    }
  }
})

val s = Flow.fromFunction[A,(A,(A,Int))](a => (a,(a,5)))

s.via(Retry(wrapped){
  case (_,0) => None
  case (a,i) => Some(a -> (a,i-1))
}).map(_._1)

well, that's a lot of boilerplate...
So, I thought I could add something like Retry.simple(flow,retries) which will do exactly what we did.

And maybe also something like:

def simpleWith[I,O](flow: Flow[I,O], retries: Int)(betweenRetries: (Try[O],I) => (Try[O],I))

to enable recovery attempts when needed.
not sure about this though... since it could be handled with the more basic existing API,
and it's reasonable to assume betweenRetries will make the call "boilerplaty" anyway...

Thoughts?

Retry concat should process multiple elements concurrently

I was using Retry.concat to wrap Akka HTTP connection flow and found out that only a single request was processed at a time.
This is because the RetryConcatCoordinator component does not pull a new element if it already has an existing element in cycle.

consider add a distinct stage?

Recently I need a distinct stage which at first sight I think it should take a buffer holding distinct elements and preventing duplicated elements from pushing downstream.

Considering that the stream may never stop, it could be dangerous when the buffer keeps growing. But in my scenario the stream will definitely stop and the buffer size is predictable.

Any suggestion?

BTW, I've seen akka/akka#19395 proposing for adding a dedupe stage.

Do we need Accumulate

According to the docs (I wrote): "This stage emits folded values like scan, but the first element emitted is not the zero value but the result of applying the given function to the given zero value and the first pushed element."

The same can be achieved with scan(...)(...).drop(1).

What do others think?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.