Giter Site home page Giter Site logo

akka / akka-projection Goto Github PK

View Code? Open in Web Editor NEW
99.0 99.0 33.0 7.72 MB

Akka Projections is intended for building systems with the CQRS pattern, and facilitate in event-based service-to-service communication.

Home Page: https://doc.akka.io/docs/akka-projection/current/

License: Other

Scala 84.97% Java 14.74% Shell 0.23% Smalltalk 0.01% JavaScript 0.05%

akka-projection's Introduction

Akka

The Akka family of projects is managed by teams at Lightbend with help from the community.

We believe that writing correct concurrent & distributed, resilient and elastic applications is too hard. Most of the time it's because we are using the wrong tools and the wrong level of abstraction.

Akka is here to change that.

Using the Actor Model we raise the abstraction level and provide a better platform to build correct concurrent and scalable applications. This model is a perfect match for the principles laid out in the Reactive Manifesto.

For resilience, we adopt the "Let it crash" model which the telecom industry has used with great success to build applications that self-heal and systems that never stop.

Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Learn more at akka.io.

Reference Documentation

The reference documentation is available at doc.akka.io, for Scala and Java.

Current versions of all Akka libraries

The current versions of all Akka libraries are listed on the Akka Dependencies page. Releases of the Akka core libraries in this repository are listed on the GitHub releases page.

Community

You can join these groups and chats to discuss and ask Akka related questions:

In addition to that, you may enjoy following:

Contributing

Contributions are very welcome!

If you see an issue that you'd like to see fixed, or want to shape out some ideas, the best way to make it happen is to help out by submitting a pull request implementing it. We welcome contributions from all, even you are not yet familiar with this project, We are happy to get you started, and will guide you through the process once you've submitted your PR.

Refer to the CONTRIBUTING.md file for more details about the workflow, and general hints on how to prepare your pull request. You can also ask for clarifications or guidance in GitHub issues directly, or in the akka/dev chat if a more real time communication would be of benefit.

License

Akka is licensed under the Business Source License 1.1, please see the Akka License FAQ.

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.

akka-projection's People

Contributors

aldenml avatar aludwiko avatar chbatey avatar claudio-scandura avatar ennru avatar franciscolopezsancho avatar greyplane avatar huntc avatar ignasi35 avatar ihostage avatar johanandren avatar jrudolph avatar leviramsey avatar octonato avatar patriknw avatar pvlugter avatar rayroestenburg avatar roiocam avatar scala-steward avatar sebastian-alfers avatar seglo avatar steffenhaak avatar yiksanchan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-projection's Issues

SlickProjection + handler that is not using db

  • Using SlickProjection for managing the offset, but the event handler is not using the db, or not the same db
  • handler: Envelope => Future[Done]
  • This would be at-least-once or at-most-once

datastax-java-driver profile

We need a profile to set consistency = QUORUM

See reference.conf of akka-persistence-cassandra.

and we should mention advanced.reconnect-on-init = true in the docs

speedup SlickProjectionSpec

Must be possible to speedup

[info] SlickProjectionSpec:
[info] A Slick projection
[info] - must persist projection and offset in same the same write operation (transactional) (2 seconds, 550 milliseconds)
[info] - must restart from previous offset - fail with DBIOAction.failed (6 seconds, 200 milliseconds)
[info] - must restart from previous offset - fail with throwing an exception (6 seconds, 206 milliseconds)
[info] - must restart from previous offset - fail with bad insert on user code (6 seconds, 192 milliseconds)

Corresponding CassandraProjectionSpec:

[info] CassandraProjectionSpec:
[info] A Cassandra projection
[info] - must persist projection and offset (662 milliseconds)
[info] - must restart from previous offset - handler throwing an exception (3 seconds, 745 milliseconds)
[info] - must save offset after number of elements (919 milliseconds)
[info] - must save offset after idle duration (2 seconds, 296 milliseconds)

Make sure ApiMayChange and InternalApi

Before final release we should make sure that ApiMayChange is sprinkled at strategic places (not necessary everywhere), and mentioned in reference docs.

Also, make sure that not more than necessary is public.

atLeastOnce parameters

The CassandraProjection.atLeastOnce maybe has too many parameters to be convenient?
We could remove saveOffsetAfterElements and saveOffsetAfterDuration and place them in configuration.

Then have a builder style api to define them programmatically. atLeastOnce would then return AtLeastOnceCassandraProjection extends CassandraProjection which has the additional withSaveOffsetAfterElements and withSaveOffsetAfterDuration.

CassandraProjection.atLeastOnce(
  projectionId,
  sourceProvider,
  offsetExtractor) { env => ... }
  . withSaveOffsetAfterElements(100)
  . withSaveOffsetAfterDuration(1.second)

Probably not nice with the handler as second parameter list, but we talked about making the handler a trait instead of a function and then it will not be a second parameter list.

enable Xfatal-warnings

In Scala 2.13.2 @nowarn is built-in, but we might have to have a build for 2.12 and then we need the silencer plugin.

Projection Telemetry SPI

(edited the list of desired metrics)

Short description

Providing an SPI so projection implementation can raise certain events for 3rd party tooling to measure and report data to monitoring tools.

Details

Given the nature of a projection, the first idea that comes to mind is to measure "Wait, Service, and Residence Time" (as described in The Essential Guide To
Queueing Theory
, thanks @seglo for the reference). Summing up:

  • Waiting Time: time the element spent on the queue
  • Service Time: time the element spent being processed (since removed from the queue until readable by a read-side thread)
  • Residence Time: total time in the system (the sum of Waiting and Service times)

akka-projection can't promise to deliver Wait Time since it's dependant on each queuing technology to add a timestamp of the instant when the element was enqueued. Still, the SPI could include the necessary methods for custom implementations of akka-projection so the measure is taken when the data is available.

Getting more details

Once the user is familiar with "Wait, Service, and Residence Time" there are other pieces of relevant information to add to the measures so more accurate filtering in the monitoring tools helps gain more insights.

  1. First, we want counts of elements successfully processed

  2. Then, a rate of failures (failures on the stream per unit of time) and information about the cause, context, etc...

  3. Then we want metadata for the above times, counts and rates. A tuple name and key is a pair of String uniquely identifying a worker consuming a partitioned/sharded queue (edit: see ProjectionId ). This is useful to detect when a particular processor is slow as opposed to a fast one; or when a partition is very hot (meaning the partitioning strategy is not well balanced); or even there's a bug in the code causing a head-of-line issue.

  4. The metadata introduced in 3. could be extended, when available, to add the event (sub)type since each event type runs different code.

  5. In some scenarios, the Service Time could be too coarse and providing means for more fine-grained measures (e.g. name a portion of the stream and measures only that) to more accurately detect hotspots.

5.1. When batching the offset storage operation (only save one offset every n messages) we need to distinguish the processing time from the time-until-committed.


Scope

  • MUST HAVE: Service Time, count of successfully processed (1.), rate of failures (2.), and name/key metadata (3.). The SPi must support providing this data.

  • NICE TO HAVE: named portions of Service Time (4. and 5.), Wait Time and Residence Time. So we could provide the SPI so implementations provide that data when available.

ProjectionRunner should use TestKit.awaitCond

This will be a common scenario when for the Projection TestKit. You setup a projection, you run it and check that the results will eventually be reflected on the storage.

There is room for improvement here. Instead of wrapping it on a Scalatest.eventually we should have the re-try built-in, like Akka's within (with or without dilated time).

We should also take into considerations that users may build a projection for an infinite Source, for example, Akka Query eventsByTag. In such a case, if the passed assertion blocks succeeds or if it fail after some timeout we can stop the projection.

Anyway, for later.

Originally posted by @renatocaval in #35

Alpakka Kafka Projections (Producer)

Use case:

  • consume from any Source and publish the events in Kafka. This is the equivalent of Topic Producers in Lagom.

Specs:

Revisit database concurrency in KafkaToSlickIntegration example

The count can be updated by another Projection instance inbetween the read and the insertOrUpdate. See https://github.com/akka/akka-projection/pull/80/files#r421431861

What I would use for CRUD, like this, is optimistic locking. A version number column that is read in findByEventType and then used as condition in the update to be the same as what was read. The update also increments it by 1. If no row was updated it means that someone else updated inbetween. Then we could fail, and even use the retry mechanism from #95

Would be pretty nice to include that pattern in this example.

An alternative would be to have one counter per projectionId, so that it's a single writer.

Projection propagation notification

When a command comes into a system it is convenient to be able to return to the user once that event has been propagated to projections.

This could be implemented via a correlation id published from the projection or via each projection publishing its offset to pub sub or data that can be used to check that the projection has passed the offset of the event. We'd need to expose the offset of an event on the persistent actor side for the second option.

First step could be to document how a user would do this without support from projections via a correlation id.

This was originally brought up by @mckeeh3 - Hugh, did I miss anything?

Offset serialization

Offset is stored as String + manifest to support different offset types. That is a fixed set of types.

@ignasi35 questioned this

Why is the offsetStore responsible to handle the offset types/manifests?

Could readOffset have an extra argument offsetDeser: (ByteString, String) => Offset so the it's not responsibility of the offset store to manage deserialization.

I think the background reason for this was to make it human readable for back office tooling.

Make HandlerRecoveryStrategy a Projection setting instead

Now it's possible to use a retry strategy together with atMostOnce (we have a runtime check that prevents it).

Now it's part of the Handler and it can decide based on the failing Envelope and exception. That is probably overkill and not needed.

Instead, we can make it a setting for the projection instead.

CassandraProjection.atLeastOnce(...).withHandlerRecovery(HandlerRecoveryStrategy.retryAndFail)

Similar to what we discussed in #59

testcontainers.containers Mounts denied

Do I need configure anything in Docker Desktop to be able to use the test containers locally?

[info]   Cause: org.testcontainers.containers.ContainerFetchException: Can't get Docker image: RemoteDockerImage(imageNameFuture=java.util.concurrent.CompletableFuture@3e825116[Completed normally], imagePullPolicy=DefaultPullPolicy(), dockerClient=LazyDockerClient.INSTANCE)
[info]   at org.testcontainers.containers.GenericContainer.getDockerImageName(GenericContainer.java:1265)
[info]   at org.testcontainers.containers.GenericContainer.logger(GenericContainer.java:600)
[info]   at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:311)
[info]   at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:302)
[info]   at com.dimafeng.testcontainers.SingleContainer.start(Container.scala:46)
[info]   at akka.projection.cassandra.ContainerSessionProvider$.$anonfun$started$1(ContainerSessionProvider.scala:44)
...
[info]   Cause: com.github.dockerjava.api.exception.InternalServerErrorException: Mounts denied: EOF
[info]   at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.execute(OkHttpInvocationBuilder.java:287)
[info]   at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.execute(OkHttpInvocationBuilder.java:265)
[info]   at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.post(OkHttpInvocationBuilder.java:126)
[info]   at com.github.dockerjava.core.exec.StartContainerCmdExec.execute(StartContainerCmdExec.java:28)

Ensure that in-flight messages from KafkaSourceProvider are not processed for revoked partitions

Raised by @patriknw in #80 (comment)

Related thought, but for exactlyOnce. The old projection instance is processing offset 17, but before it has been committed to the database there is a hand-over of that topic-partition to another projection instance, reading offset 16, and processing 17. The old instance saves has successfully processed 17 and saves offset 17. The new instance will also process 17. It's not exactly-once.

Any way we can control the hand-over for such scenario?

Possible solution: #80 (comment)

Ensure Projection.runWithBackoff() is properly tested/testable

In #110, we introduced a Projection.runWithBackoff() that is intended to be use internally by ProjectionBehavior.

Currently, projection implementors need to take care of this and write tests exercising this method.

We should review this or add tests for our own implementation.

Start / Stop Projection API

Similar to current Lagom Projection API. Allows to inspect and manage projections.

  • Which projections do we have in the system?
  • What is the state? Are they running or stopped? Are they failing?
  • Stop/Start all projection of a given group (eg: stop all UserView projection)
  • Stop/Start specific projection inside a group (eg: stop UserView projection for tag 'UserEvent-3')

[epic] JDBC Projections

Work has started in #192

  • JdbcSession API (in #192)
  • JdbcOffsetStore (in #192)
  • Basic JdbcProjection impl (exactly-once) (in #192)
  • Dialects: Postgres, MySQL and H2
  • Dialect: MSSQL (in #280)
  • Dialect: Oracle - postponed to v0.4
  • User defined dialects (nice to have, can be deferred)
  • support Recovery Strategy
  • support StatusObserver
  • support ProjectionManagement
  • support for Async Handlers and atLeastOnce
  • documentation, including how to define custom JdbcSession (JPA/Hibernate) (in #273)

Use cases:

  • read from any source and track the offset on a JPA table (transactional and non-transactional )
  • read from Akka Persistence journal (Cassandra or JDBC) and track the offset on a JPA table (transactional and non-transactional ). Specialization of the above but since very common we can provide a boilerplate free variant.

Specs:

  • Offset is managed by a JDBC/JPA table
  • needs an offset extraction function to extract the offset from the envelope
    For transactional projection, event handlers must be a function (EntityManager, E) => CompletionStage[Done] where E is any envelope.
  • For non-transactional projection, event handlers can be a function E => CompletionStage[Done] where E is any envelope (eg: project to Elastic Search or send a command to an Entity).
  • Provide a JPA Offset Store based on Long. When reading Kafka, the offset will be a Long. Other offset types may be supported and can be added when needed (or implemented by user)
  • Convenience API when reading from Akka Persistence Journal.
    We have a privileged situation when reading from Akka Persistence Journal, as we know how the envelope looks like, we know how to extract the offset (no need for extractor). However, we need a special offset store that knows how to write and read akka.persistence.Offset

Cassandra Projection

The goal is to provide a projection that is backed by a Cassandra offset storage that can be used to track offsets.

The offset storage should support different kinds of offsets: Long, akka.persistence.Sequence, akka.persistence.TimeBasedUUID.

This projection won't support transactions and therefore can be offer delayed commits (commit every 100 events for example).

Use cases:

  • read from any source and track the offset on a Cassandra table (non-transactional)
  • read from Akka Persistence journal (Cassandra or JDBC) and track the offset on a Cassandra table (non-transactional). Specialization of the above but since very common we can provide a boilerplate free variant.

Specs:

  • Offset is managed in a Cassandra table
  • needs an offset extraction function to extract the offset from the envelope
  • requires a Flow that commits to a predefined Cassandra table (non-transactional), supports at-least-once and at-most-once
    since non-transactional, event handler can be a function E => Future[Done] where E is any envelope
  • Provide a Cassandra Offset Store based on Long. When reading Kafka, the offset will be a Long. Other offset typed may be supported and can be added when needed (or implemented by user)
  • Convenience API when reading from Akka Persistence Journal.
    We have a privileged situation when reading from Akka Persistence Journal, as we know how the envelope looks like, we know how to extract the offset (no need for extractor). However, we need a special offset store that knows how to write and read akka.persistence.Offset

Tasks:

  • OffsetStore #54
  • atLeastOnce #54
  • atMostOnce #62
  • Convenience for eventsByTag source
  • javadsl #84

Consider returning Mat value on `Projection.stop`

Maybe the returned Future should be the mat val Future of Sink.ignore?
Then we should maybe also expose that as a getter. Not sure what to name it. def whenStopped: Future[Done].
Purpose of that is to be able to observe that the projection failed/completed for other reasons than stop().

Originally posted by @patriknw in #35

Use testcontainers project for implementations

It would be beneficial to use real underlying datasources for integration testing of implementations (Slick/JDBC, Cassandra, Kafka, etc.) The testcontainers-java allows the developer to automatically orchestrate Docker containers during the lifetime of a test. This has several benefits:

  • Does not pollute Java test classpath with "embedded" technologies like embedded Kafka, Cassandra, H2, etc.
  • Test multiple implementations when applicable, such as Postgres and MariaDB, or Cassandra and its Datastax enterprise counterpart.
  • Run actual datasources that don't have an embedded or mockable Java dependency.

Alpakka Kafka Projections (Consumer)

Create projection for Alpakka Kafka Source where the offset is tracked in Kafka (committing)

Use case:

  • reads from any Alpakka Kafka Source and commits offset on the Topic. The user only provides a Source/Flow that runs all operations and commits back to the Topic.

Specs:

  • minimal projection only to run it in a managed and distributed fashion
  • Source is Kafka, projection target is whatever the user decides
  • all logic can be defined at the Source and by the user. This allows for arbitrary complexity.
  • Offset is managed by Kafka, supports at-least-once and at-most-once

faile: wrong logger loaded

Test failure with:

[info] HandlerRecoveryImplSpec:
[info] akka.projection.internal.HandlerRecoveryImplSpec *** ABORTED *** (2 milliseconds)
SLF4J: A number (3) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
[info]   java.lang.IllegalArgumentException: Requires Logback logger for [], it was a [org.slf4j.helpers.SubstituteLogger]
[info]   at akka.actor.testkit.typed.internal.LogbackUtil$.getLogbackLogger(LogbackUtil.scala:26)
[info]   at akka.actor.testkit.typed.internal.CapturingAppender$.get(CapturingAppender.scala:21)
[info]   at akka.actor.testkit.typed.scaladsl.LogCapturing.$init$(LogCapturing.scala:36)
[info]   at akka.projection.internal.HandlerRecoveryImplSpec.<init>(HandlerRecoveryImplSpec.scala:43)
[info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

[epic] Slick Projections

Use cases:

  • read from any source and track the offset on a Slick table
    (transactional and non-transactional )
  • read from Akka Persistence journal (Cassandra or JDBC) and track the offset on a Slick table (transactional and non-transactional). Specialization of the above but since very common we can provide a boilerplate free variant.

Specs:

  • Offset is managed by a Slick table
  • needs an offset extraction function to extract the offset from the envelope
  • Projection event handler is a function E => DBIO[Done] where E is any envelope.
    Note that differently from JPA, we don't need two APIs because we have DBIO.
  • Provide a Slick Offset Store based on Long. When reading Kafka, the offset will be a Long. Other offset types may be supported and can be added when needed (or implemented by user)
  • Convenience API when reading from Akka Persistence Journal.
    We have a privileged situation when reading from Akka Persistence Journal, as we know how the envelope looks like, we know how to extract the offset (no need for extractor). However, we need a special offset store that knows how to write and read akka.persistence.Offset

Breakdown:

  • support for transactional projections - see PR #35
  • support for non-transactional projections (offset buffering / at-least-once) - see PR #69
  • Returning Mat value on Projection.stop - see #44
  • Consider the introduction of EventHandler trait - see #43
  • ava.sql.Timestamp for Spanner - see #41 - not part of epic
  • Convenience API to integrate with akka-persistence-query API - see #46

Exactly once Kafka projections

Short description

Support an exactly once scenario with offset management and projection into Kafka.

Details

A KafkaProjection implementation could store store processed offsets and entity projection in Kafka in a single Kafka transaction. Offsets could be stored in a compacted topic with our data structure in the same way Kafka managed offsets are stored. Because query options of Kafka topics are limited, all entries of the compacted topic must be streamed to retrieve the offsets for the right ProjectionId, but the number of entries should be limited because of compaction. Kafka works around this limitation by keeping a key value store in memory (the Consumer Group coordinator) on a Kafka broker that caches the latest consumer group offsets.

Consider the introduction of EventHandler trait

Doesn't look too bad for this simple event handler, but worth exploring.

I think a class also encourages that the handler can have state. That's ofc possible with a function also, but less obvious that it s safe since most (should) think a function is stateless. We could always provide a convenience implementation of such class that just wraps a function.

Originally posted by @patriknw in #35

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.