Giter Site home page Giter Site logo

dnvriend / akka-persistence-inmemory Goto Github PK

View Code? Open in Web Editor NEW
134.0 7.0 39.0 546 KB

Akka-persistence-inmemory is a plugin for akka-persistence that stores journal and snapshot messages memory, which is very useful when testing persistent actors, persistent FSM and akka cluster

License: Apache License 2.0

Scala 95.86% Shell 1.54% Java 2.31% Makefile 0.28%

akka-persistence-inmemory's Introduction

akka-persistence-inmemory

Join the chat at https://gitter.im/dnvriend/akka-persistence-inmemory Build Status Download Codacy Badge License

akka-persistence-inmemory is a plugin for akka-persistence that stores journal and snapshot messages memory, which is very useful when testing persistent actors, persistent FSM and akka cluster.

Installation

Add the following to your build.sbt:

// the library is available in Bintray repository
resolvers += Resolver.bintrayRepo("dnvriend", "maven")

// akka 2.5.x
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2"

// akka 2.4.x
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "2.4.20.1"

Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

License

This code is open source software licensed under the Apache 2.0 License.

Configuration

Add the following to the application.conf:

akka {
  persistence {
    journal.plugin = "inmemory-journal"
    snapshot-store.plugin = "inmemory-snapshot-store"
  }
}

Configuring the query API

The query API can be configured by overriding the defaults by placing the following in application.conf:

inmemory-read-journal {
  # Absolute path to the write journal plugin configuration section to get the event adapters from
  write-plugin = "inmemory-journal"

  # there are two modes; sequence or uuid. If set to "sequence" and NoOffset will be requested, then
  # the query will return Sequence offset types. If set to "uuid" and NoOffset will be requested, then
  # the query will return TimeBasedUUID offset types. When the query is called with Sequence then
  # the query will return Sequence offset types and if the query is called with TimeBasedUUID types then
  # the query will return TimeBasedUUID offset types.
  offset-mode = "sequence"

  # ask timeout on Futures
  ask-timeout = "10s"

  # New events are retrieved (polled) with this interval.
  refresh-interval = "100ms"

  # How many events to fetch in one query (replay) and keep buffered until they
  # are delivered downstreams.
  max-buffer-size = "100"
}

Clearing Journal and Snapshot messages

It is possible to manually clear the journal an snapshot storage, for example:

import akka.actor.ActorSystem
import akka.persistence.inmemory.extension.{ InMemoryJournalStorage, InMemorySnapshotStorage, StorageExtension }
import akka.testkit.TestProbe
import org.scalatest.{ BeforeAndAfterEach, Suite }

trait InMemoryCleanup extends BeforeAndAfterEach { _: Suite =>

  implicit def system: ActorSystem

  override protected def beforeEach(): Unit = {
    val tp = TestProbe()
    tp.send(StorageExtension(system).journalStorage, InMemoryJournalStorage.ClearJournal)
    tp.expectMsg(akka.actor.Status.Success(""))
    tp.send(StorageExtension(system).snapshotStorage, InMemorySnapshotStorage.ClearSnapshots)
    tp.expectMsg(akka.actor.Status.Success(""))
    super.beforeEach()
  }
}

From Java:

ActorRef actorRef = extension.journalStorage();

InMemoryJournalStorage.ClearJournal clearJournal = InMemoryJournalStorage.clearJournal();
tp.send(actorRef, clearJournal);
tp.expectMsg(new Status.Success(""));

InMemorySnapshotStorage.ClearSnapshots clearSnapshots = InMemorySnapshotStorage.clearSnapshots();
tp.send(actorRef, clearSnapshots);
tp.expectMsg(new Status.Success(""));

offset-mode

akka-persistence-query introduces akka.persistence.query.Offset, an ADT that defines akka.persistence.query.NoOffset, akka.persistence.query.Sequence and akka.persistence.query.TimeBasedUUID. These offsets can be used when using the queries akka.persistence.query.scaladsl.EventsByTagQuery2 and akka.persistence.query.scaladsl.CurrentEventsByTagQuery2 to request and offset in the stream of events.

Because akka-persistence-inmemory implements both the Sequence-based number offset strategy as the TimeBasedUUID strategy it is required to configure the inmemory-read-journal.offset-mode="sequence". This way akka-persistence-inmemory knows what kind of journal it should emulate when a NoOffset type is requested. EventEnvelope will contain either a Sequence when the configuration is sequence or a TimeBasedUUID when the configuration is uuid.

By default the setting is sequence.

query and event-adapters

Write plugins (ie. akka-persistence-plugins that write events) can define event adapters. These event adapters can be reused when executing a query so that the EventEnvelope contains the application domain event and not the data-model representation of that event. Set the inmemory-read-journal.write-plugin="inmemory-journal" and configure it with the write plugin name (defaults to the inmemory-journal).

Refresh Interval

The async query API uses polling to query the journal for new events. The refresh interval can be configured eg. "1s" so that the journal will be polled every 1 second. This setting is global for each async query, so the allPersistenceId, eventsByTag and eventsByPersistenceId queries.

Max Buffer Size

When an async query is started, a number of events will be buffered and will use memory when not consumed by a Sink. The default size is 100.

How to get the ReadJournal using Scala

The ReadJournal is retrieved via the akka.persistence.query.PersistenceQuery extension:

import akka.persistence.query.scaladsl._

lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
 .asInstanceOf[ReadJournal
    with CurrentPersistenceIdsQuery
    with AllPersistenceIdsQuery
    with CurrentEventsByPersistenceIdQuery
    with CurrentEventsByTagQuery
    with EventsByPersistenceIdQuery
    with EventsByTagQuery]

How to get the ReadJournal using Java

The ReadJournal is retrieved via the akka.persistence.query.PersistenceQuery extension:

import akka.persistence.query.PersistenceQuery
import akka.persistence.inmemory.query.journal.javadsl.InMemoryReadJournal

final InMemoryReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(InMemoryReadJournal.class, InMemoryReadJournal.Identifier());

Persistence Query

The plugin supports the following queries:

AllPersistenceIdsQuery and CurrentPersistenceIdsQuery

allPersistenceIds and currentPersistenceIds are used for retrieving all persistenceIds of all persistent actors.

import akka.actor.ActorSystem
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.Source
import akka.persistence.query.PersistenceQuery
import akka.persistence.inmemory.query.journal.scaladsl.InMemoryReadJournal

implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()(system)
val readJournal: InMemoryReadJournal = PersistenceQuery(system).readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier)

val willNotCompleteTheStream: Source[String, NotUsed] = readJournal.allPersistenceIds()

val willCompleteTheStream: Source[String, NotUsed] = readJournal.currentPersistenceIds()

The returned event stream is unordered and you can expect different order for multiple executions of the query.

When using the allPersistenceIds query, the stream is not completed when it reaches the end of the currently used persistenceIds, but it continues to push new persistenceIds when new persistent actors are created.

When using the currentPersistenceIds query, the stream is completed when the end of the current list of persistenceIds is reached, thus it is not a live query.

The stream is completed with failure if there is a failure in executing the query in the backend journal.

EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery

eventsByPersistenceId and currentEventsByPersistenceId is used for retrieving events for a specific PersistentActor identified by persistenceId.

import akka.actor.ActorSystem
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.Source
import akka.persistence.query.scaladsl._

implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()(system)

lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
 .asInstanceOf[ReadJournal
    with CurrentPersistenceIdsQuery
    with AllPersistenceIdsQuery
    with CurrentEventsByPersistenceIdQuery
    with CurrentEventsByTagQuery
    with EventsByPersistenceIdQuery
    with EventsByTagQuery]

val willNotCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)

val willCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)

You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr or use 0L and Long.MaxValue respectively to retrieve all events. Note that the corresponding sequence number of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given sequence number.

The returned event stream is ordered by sequence number, i.e. the same order as the PersistentActor persisted the events. The same prefix of stream elements (in same order) are returned for multiple executions of the query, except for when events have been deleted.

The stream is completed with failure if there is a failure in executing the query in the backend journal.

EventsByTag and CurrentEventsByTag

eventsByTag and currentEventsByTag are used for retrieving events that were marked with a given tag, e.g. all domain events of an Aggregate Root type.

import akka.actor.ActorSystem
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.Source
import akka.persistence.query.scaladsl._

implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()(system)

lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
 .asInstanceOf[ReadJournal
    with CurrentPersistenceIdsQuery
    with AllPersistenceIdsQuery
    with CurrentEventsByPersistenceIdQuery
    with CurrentEventsByTagQuery
    with EventsByPersistenceIdQuery
    with EventsByTagQuery]

val willNotCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.eventsByTag("apple", 0L)

val willCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByTag("apple", 0L)

To tag events you'll need to create an Event Adapter that will wrap the event in a akka.persistence.journal.Tagged class with the given tags. The Tagged class will instruct the persistence plugin to tag the event with the given set of tags. The persistence plugin will not store the Tagged class in the journal. It will strip the tags and payload from the Tagged class, and use the class only as an instruction to tag the event with the given tags and store the payload in the message field of the journal table.

import akka.persistence.journal.{ Tagged, WriteEventAdapter }
import com.github.dnvriend.Person.{ LastNameChanged, FirstNameChanged, PersonCreated }

class TaggingEventAdapter extends WriteEventAdapter {
  override def manifest(event: Any): String = ""

  def withTag(event: Any, tag: String) = Tagged(event, Set(tag))

  override def toJournal(event: Any): Any = event match {
    case _: PersonCreated 
      withTag(event, "person-created")
    case _: FirstNameChanged 
      withTag(event, "first-name-changed")
    case _: LastNameChanged 
      withTag(event, "last-name-changed")
    case _  event
  }
}

The EventAdapter must be registered by adding the following to the root of application.conf Please see the demo-akka-persistence-jdbc project for more information. The identifier of the persistence plugin must be used which for the inmemory plugin is inmemory-journal.

inmemory-journal {
  event-adapters {
    tagging = "com.github.dnvriend.TaggingEventAdapter"
  }
  event-adapter-bindings {
    "com.github.dnvriend.Person$PersonCreated" = tagging
    "com.github.dnvriend.Person$FirstNameChanged" = tagging
    "com.github.dnvriend.Person$LastNameChanged" = tagging
  }
}

You can retrieve a subset of all events by specifying offset, or use 0L to retrieve all events with a given tag. The offset corresponds to an ordered sequence number for the specific tag. Note that the corresponding offset of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given offset.

In addition to the offset the EventEnvelope also provides persistenceId and sequenceNr for each event. The sequenceNr is the sequence number for the persistent actor with the persistenceId that persisted the event. The persistenceId + sequenceNr is an unique identifier for the event.

The returned event stream contains only events that correspond to the given tag, and is ordered by the creation time of the events, The same stream elements (in same order) are returned for multiple executions of the same query. Deleted events are not deleted from the tagged event stream.

Storage extension

You can change the default storage to store a journal by defined property keys using this configuration. This can be useful to configure a behavior similar to cassandra key spaces.

# the storage in use
inmemory-storage {
  # storage using inmemory journal for each different value for the configured property keys
  class = "akka.persistence.inmemory.extension.StorageExtensionByProperty"
  # property keys in journal plugin configuration, for each different value a own journal will be stored
  property-keys = ["keyspace"]
}

Changelog

2.5.15.2 (2019-06-28)

  • Scala 2.11.x, 2.12.x, 2.13.x support
  • Akka 2.5.15 -> 2.5.23

2.4.20.1 (2019-06-28)

  • Merged PR #59 "Pluggable storage" by Beat Sager, thanks!

2.5.15.1 (2018-09-09)

  • Java 8 binary release

2.5.15.0 (2018-08-29)

  • Applied PR #50 "Fix for Akka Typed Persistence" by Lukasz Sanek, thanks!
  • Java 10 binary release

2.4.20.0 (2018-08-29)

  • Merged PR #50 "Fix for Akka Typed Persistence" by Lukasz Sanek, thanks!
  • Merged PR #52 "Provide nice Java API for clearing journal" by Christopher Batey, thanks!

2.5.1.2 (2018-08-14)

2.4.18.2 (2017-12-03)

  • Merged PR #42 "Scala 2.12.4 support" by sullis, thanks!

2.5.1.1 (2017-05-23)

  • Fix for issue #35 "no serializer for internal plugin messages"

2.4.18.1 (2017-05-23)

  • Fix for issue #35 "no serializer for internal plugin messages"

2.5.1.0 (2017-05-03)

  • Akka 2.5.0 -> 2.5.1

2.4.18.0 (2017-05-03)

  • Akka 2.4.17 -> 2.4.18

2.5.0.0 (2017-04-13)

  • Support for Akka 2.5.0

2.5.0.0-RC2 (2017-04-03)

  • Support for Akka 2.5.0-RC2

2.5.0.0-RC1 (2017-03-21)

  • Support for Akka 2.5.0-RC1

2.5.0.0-M2 (2017-02-24)

  • Support for Akka 2.5-M2
  • Changed to a simpler Time-based UUID generator.

2.4.17.3 (2017-02-24)

  • Changed to a simpler Time-based UUID generator.

2.4.17.2 (2017-02-16)

  • Fix for issue #33 'InMemoryReadJournal.eventsByPersistenceId returns deleted messages'

2.5.0.1-M1 (2017-02-16)

  • Fix for issue #33 'InMemoryReadJournal.eventsByPersistenceId returns deleted messages'
  • Fix for PR #31 'eventsByTag including substrings of tag' by jibbers42, thanks!
  • Tags will be matched against the whole tag so tag 'foo' will be matched against 'foo' and not 'fo' or 'f' which was the previous behavior.

2.4.17.1 (2017-02-12)

  • Fix for PR #31 'eventsByTag including substrings of tag' by jibbers42, thanks!
  • Tags will be matched against the whole tag so tag 'foo' will be matched against 'foo' and not 'fo' or 'f' which was the previous behavior.

2.4.17.0 (2017-02-11)

  • Akka 2.4.16 -> 2.4.17

2.4.16.0 (2017-01-29)

  • New versioning scheme; now using the version of Akka with the akka-persistence-inmemory version appended to it, starting from .0
  • Support for Akka 2.4.16
  • Support akka 2.11.x and 2.12.x
  • Changed how the byTag queries work, the requested offset is excluding, so if a materialized stream is created, when you ask for Sequence(2) for example, you will get Sequence(3) and so on so this is for the use case when you store the lastest offset on the read side, you can just put that value in the query and the stream will continue with the next offset, no need to manually do the plus-one operation.

2.5.0.0-M1 (2017-01-29)

  • New versioning scheme; now using the version of Akka with the akka-persistence-inmemory version appended to it, starting from .0
  • Support for Akka 2.5-M1
  • Support akka 2.11.x and 2.12.x
  • You need Java 8 or higher
  • Please read the Akka 2.4 -> 2.5 Migration Guide
  • Changed how the byTag queries work, the requested offset is excluding, so if a materialized stream is created, when you ask for Sequence(2) for example, you will get Sequence(3) and so on so this is for the use case when you store the lastest offset on the read side, you can just put that value in the query and the stream will continue with the next offset, no need to manually do the plus-one operation.

1.3.18 (2016-12-21)

  • Akka 2.4.14 -> 2.4.16

1.3.17 (2016-12-08)

  • Scala 2.12.0 -> 2.12.1

1.3.16 (2016-11-22)

  • Akka 2.4.13 -> 2.4.14

1.3.15 (2016-11-19)

  • Akka 2.4.12 -> 2.4.13

1.3.14 (2016-11-03)

  • cross scala 2.11.8 and 2.12.0 build

1.3.13 (2016-11-01 - Birthday Edition!)

  • Implemented support for the akka.persistence.query.TimeBasedUUID.
  • You should set the new configuration key inmemory-read-journal.offset-mode = "uuid", defaults to sequence to produce EventEnvelope2 that contain TimeBasedUUID offset fields.

1.3.12 (2016-10-28)

  • Akka 2.4.11 -> 2.4.12
  • Support for the new queries CurrentEventsByTagQuery2 and EventsByTagQuery2, please read the akka-persistence-query documentation to see what has changed.
  • The akka-persistence-inmemory plugin only supports the akka.persistence.query.NoOffset or akka.persistence.query.Sequence offset types.
  • There is no support for the akka.persistence.query.TimeBasedUUID offset type. When used, akka-persistence-inmemory will throw an IllegalArgumentException.

1.3.11 (2016-10-23)

  • Scala 2.11.8 and 2.12.0-RC2 compatible

1.3.10 (2016-09-30)

  • Akka 2.4.10 -> 2.4.11

1.3.9 (2016-09-22)

  • Adapted version of PR #28 by Yury Gribkov - Fix bug: It doesn't adapt events read from journal, thanks!
  • As event adapters are no first class citizins of akka-persistence-query (yet), a workaround based on the configuration of akka-persistence-cassandra has been implemented in the inmemory journal based on the work of Yury Gribkov. Basically, the query-journal will look for a write-plugin entry in the inmemory-read-journal configuration of your application.conf that must point to the writePluginId that will write the events to the journal. That writePlugin has all event adapters configured and if applicable, those event adapters will be used to adapt the events from the data-model to the application-model effectively you should have application-model events in your EventEnvelope if configured correctly.
  • Removed the non-official and never-to-be-used bulk loading interface

1.3.8 (2016-09-07)

  • Akka 2.4.9 -> Akka 2.4.10

1.3.7 (2016-08-21)

  • Fix for EventsByPersistenceId should terminate when toSequenceNumber is reached as pointed out by monktastic, thanks!

1.3.6 (2016-08-20)

  • Akka 2.4.9-RC2 -> Akka 2.4.9

1.3.6-RC2 (2016-08-06)

  • Akka 2.4.9-RC1 -> 2.4.9-RC2

1.3.6-RC1 (2016-08-03)

  • Akka 2.4.8 -> 2.4.9-RC1

1.3.5 (2016-07-23)

  • Support for the non-official bulk loading interface akka.persistence.query.scaladsl.EventWriter added. I need this interface to load massive amounts of data, that will be processed by many actors, but initially I just want to create and store one or more events belonging to an actor, that will handle the business rules eventually. Using actors or a shard region for that matter, just gives to much actor life cycle overhead ie. too many calls to the data store. The akka.persistence.query.scaladsl.EventWriter interface is non-official and puts all responsibility of ensuring the integrity of the journal on you. This means when some strange things are happening caused by wrong loading of the data, and therefor breaking the integrity and ruleset of akka-persistence, all the responsibility on fixing it is on you, and not on the Akka team.

1.3.4 (2016-07-17)

  • Codacy code cleanup release.

1.3.3 (2016-07-16)

  • No need for Query Publishers with the new akka-streams API.

1.3.2 (2016-07-09)

  • Journal entry 'deleted' fixed, must be set manually.

1.3.1 (2016-07-09)

  • Akka 2.4.7 -> 2.4.8,
  • Behavior of akka-persistence-query *byTag query should be up to spec,
  • Refactored the inmemory plugin code base, should be more clean now.

1.3.0 (2016-06-09)

  • Removed the queries eventsByPersistenceIdAndTag and currentEventsByPersistenceIdAndTag as they are not supported by Akka natively and can be configured by filtering the event stream.
  • Implemented true async queries using the polling strategy

1.2.15 (2016-06-05)

  • Akka 2.4.6 -> 2.4.7

1.2.14 (2016-05-25)

  • Fixed issue Unable to differentiate between persistence failures and serialization issues
  • Akka 2.4.4 -> 2.4.6

1.2.13 (2016-04-14)

  • Akka 2.4.3 -> 2.4.4

1.2.12 (2016-04-01)

  • Scala 2.11.7 -> 2.11.8
  • Akka 2.4.2 -> 2.4.3

1.2.11 (2016-03-18)

  • Fixed issue on the query api where the offset on eventsByTag and eventsByPersistenceIdAndTag queries were not sequential

1.2.10 (2016-03-17)

  • Refactored the akka-persistence-query interfaces, integrated it back again in one jar, for jcenter deployment simplicity

1.2.9 (2016-03-16)

  • Added the appropriate Maven POM resources to be publishing to Bintray's JCenter

1.2.8 (2016-03-03)

  • Fix for propagating serialization errors to akka-persistence so that any error regarding the persistence of messages will be handled by the callback handler of the Persistent Actor; onPersistFailure.

1.2.7 (2016-02-18)

  • Better storage implementation for journal and snapshot

1.2.6 (2016-02-17)

  • Akka 2.4.2-RC3 -> 2.4.2

1.2.5 (2016-02-13)

  • akka-persistence-jdbc-query 1.0.0 -> 1.0.1

1.2.4 (2016-02-13)

  • Akka 2.4.2-RC2 -> 2.4.2-RC3

1.2.3 (2016-02-08)

  • Compatibility with Akka 2.4.2-RC2
  • Refactored the akka-persistence-query extension interfaces to its own jar: "com.github.dnvriend" %% "akka-persistence-jdbc-query" % "1.0.0"

1.2.2 (2016-01-30)

  • Code is based on akka-persistence-jdbc
  • Supports the following queries:
    • allPersistenceIds and currentPersistenceIds
    • eventsByPersistenceId and currentEventsByPersistenceId
    • eventsByTag and currentEventsByTag
    • eventsByPersistenceIdAndTag and currentEventsByPersistenceIdAndTag

1.2.1 (2016-01-28)

  • Supports for the javadsl query API

1.2.0 (2016-01-26)

  • Compatibility with Akka 2.4.2-RC1

1.1.6 (2015-12-02)

  • Compatibility with Akka 2.4.1
  • Merged PR #17 Evgeny Shepelyuk Upgrade to AKKA 2.4.1, thanks!

1.1.5 (2015-10-24)

1.1.4 (2015-10-17)

  • Compatibility with Akka 2.4.0
  • Merged PR #12 Evgeny Shepelyuk Live version of eventsByPersistenceId, thanks!

1.1.3 (2015-10-02)

  • Compatibility with Akka 2.4.0
  • Akka 2.4.0-RC3 -> 2.4.0

1.1.3-RC3 (2015-09-24)

  • Merged PR #10 Evgeny Shepelyuk Live version of allPersistenceIds, thanks!
  • Compatibility with Akka 2.4.0-RC3
  • Use the following library dependency: "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.3-RC3"

1.1.1-RC3 (2015-09-19)

  • Merged Issue #9 Evgeny Shepelyuk Initial implemenation of Persistence Query for In Memory journal, thanks!
  • Compatibility with Akka 2.4.0-RC3
  • Use the following library dependency: "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.1-RC3"

1.1.0-RC3 (2015-09-17)

  • Merged Issue #6 Evgeny Shepelyuk Conditional ability to perform full serialization while adding messages to journal, thanks!
  • Compatibility with Akka 2.4.0-RC3
  • Use the following library dependency: "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.0-RC3"

1.1.0-RC2 (2015-09-05)

  • Compatibility with Akka 2.4.0-RC2
  • Use the following library dependency: "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.0-RC2"

1.0.5 (2015-09-04)

  • Compatibilty with Akka 2.3.13
  • Akka 2.3.12 -> 2.3.13

1.1.0-RC1 (2015-09-02)

  • Compatibility with Akka 2.4.0-RC1
  • Use the following library dependency: "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.0-RC1"

1.0.4 (2015-08-16)

  • Scala 2.11.6 -> 2.11.7
  • Akka 2.3.11 -> 2.3.12
  • Apache-2.0 license

1.0.3 (2015-05-25)

  • Merged Issue #2 Sebastián Ortega Regression: Fix corner case when persisted events are deleted, thanks!
  • Added test for the corner case issue #1 and #2

1.0.2 (2015-05-20)

  • Refactored from the ConcurrentHashMap implementation to a pure Actor managed concurrency model

1.0.1 (2015-05-16)

  • Some refactoring, fixed some misconceptions about the behavior of Scala Futures one year ago :)
  • Akka 2.3.6 -> 2.3.11
  • Scala 2.11.1 -> 2.11.6
  • Scala 2.10.4 -> 2.10.5
  • Merged Issue #1 Sebastián Ortega Fix corner case when persisted events are deleted, thanks!

1.0.0 (2014-09-25)

  • Moved to bintray

0.0.2 (2014-09-05)

  • Akka 2.3.4 -> 2.3.6

0.0.1 (2014-08-19)

  • Initial Release

Have fun!

akka-persistence-inmemory's People

Contributors

acjay avatar beatsager avatar chbatey avatar dlisin avatar dnvriend avatar eshepelyuk avatar gitter-badger avatar jibbers42 avatar lomigmegard avatar psliwa avatar vasily-kirichenko avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-persistence-inmemory's Issues

Pluggable storage for akka 2.5

MR #59 allows defining keyspaces to have different journals but this MR wasn't backported to the akka 2.5 branch.

Is it something you could do?

SnapshotStorage dead letter

I encountered an error of dead letter sent to snapshotstorage when saving snapshot. Any advice of what could be the issue. I didn't change any of the default configurations.

InMemoryReadJournal.eventsByPersistenceId returns deleted messages

In commit 9ce9a6b "find deleted messages" you changed the way currentEventsByPersistenceId (and therefore eventsByPersistenceId) worked so that they now return deleted messages. I think the expected behaviour for them would be to not return deleted messages. I believe that's how your akka-persistence-jdbc library works, messages flagged as deleted are filtered out.

I stumbled across this while writing test code which was supposed to test that the correct messages we're being deleted, but the journal was always returning them despite the fact they were deleted.

So I think this is a bug, but could be wrong.

Thanks

Wrong resolver unresolved dependency: com.github.dnvriend#akka-persistence-inmemory_2.12;2.5.15.1: not found

[warn]           +- com.technov:workshop-akka-persistence_2.12:1.0-SNAPSHOT
[error] sbt.librarymanagement.ResolveException: unresolved dependency: com.github.dnvriend#akka-persistence-inmemory_2.12;2.5.15.1: not found
[error]     at sbt.internal.librarymanagement.IvyActions$.resolveAndRetrieve(IvyActions.scala:334)
[error]     at sbt.internal.librarymanagement.IvyActions$.$anonfun$updateEither$1(IvyActions.scala:208)
[error]     at sbt.internal.librarymanagement.IvySbt$Module.$anonfun$withModule$1(Ivy.scala:243)
[error]     at sbt.internal.librarymanagement.IvySbt.$anonfun$withIvy$1(Ivy.scala:204)
[er

In Scala 2.12
It doesn't work with resolvers += Resolver.jcenterRepo
But it's works with resolvers += Resolver.bintrayRepo("dnvriend", "maven")

if the message is not serializable, then system does not persist - but no warning given

If the object that is being persisted turns out not to be serializable, then the store fails. However, no warning is given, the thing simply is not there on a recovery operation.
I presume this is not part of the TCK (and will probably raise it as an issue there).

config settings

assertion failed: expected GotBack(Some(Thing(2))), found GotBack(None)
java.lang.AssertionError: assertion failed: expected GotBack(Some(Thing(2))), found GotBack(None)

akka {
loglevel = DEBUG
logger-startup-timeout = 30s
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor.debug.fsm = true
loggers = ["akka.event.slf4j.Slf4jLogger"]

extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]

persistence {
// journal.plugin = "akka-persistence-sql-async.journal"
// snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store"
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
journal-plugin-fallback {
replay-filter {
mode = fail
}
}
}

//----------------- Kryo config ----------------------

actor {
serialize-messages = off

serializers {
  java = "akka.serialization.JavaSerializer"
  # Define kryo serializer
  kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}

serialization-bindings {
  "java.io.Serializable" = java
}

kryo {
  type = "nograph"
  idstrategy = "default"
  kyro-trace = true
}

}
}
////////////////////////////
code
/////////////////////////////
package com.optrak.vrp.ddd

import java.util.UUID

import akka.actor.{ActorLogging, PoisonPill, Props}
import akka.persistence.PersistentActor
import com.optrak.opkakka.test.TestSupport.AkkaTestkitContext
import com.optrak.vrp.ddd.SimplePersistor.PersistMe
import org.specs2.mutable.Specification

/**

  • Created by tim on 21/02/16.
    */

object SimplePersistor {
case class PersistMe(k: AnyRef)

case class GotBack(kOpt: Option[AnyRef])

case object Request

def props = Props(new SimplePersistor("hello"))
}
import SimplePersistor._

case class SimplePersistor(persistenceId: String) extends PersistentActor with ActorLogging {

var local: Option[AnyRef] = None

def handler(msg: PersistMe) = {
local = Some(msg.k)
}
override def receiveRecover: Receive = {
case pm: PersistMe =>
local = Some(pm.k)
}

override def receiveCommand: Receive = {
case pm: PersistMe =>
persist(pm)(handler)
case Request =>
sender ! GotBack(local)

}

}

class TestAkkaSerializability extends Specification {
sequential //our set always has the same persistence id, so have to

trait Checkit extends AkkaTestkitContext {

def checkItOut(k: AnyRef) = {
  val persy1 = system.actorOf(SimplePersistor.props)
  persy1 ! Request
  expectMsg(GotBack(None))

  persy1 ! PersistMe(k)
  persy1 ! Request
  expectMsg(GotBack(Some(k)))

  persy1 ! PoisonPill
  Thread.sleep(200)

  val persy2 = system.actorOf(SimplePersistor.props)
  persy2 ! Request
  expectMsg(GotBack(Some(k)))

}

}

"persistent set" should {
"work with String" in new Checkit() {
checkItOut("ho")
}

"fails with unserializable" in new Checkit() {
  val x = 1
  case class Thing(y: Int)
  val tricky = new Thing(2) {
    def xMe = x
  }

  checkItOut(tricky)

}

}

}

highestSequenceNo should be kept on message deletion

With AKKA 2.4.1 release there comes a clarification about highestSeqenceNo behavior on message deletion. This issue akka/akka#18559 adds tests to plugin TCK that will enforce maintaining highestSequenceNo for persistenceId after message deletion or journal cleanup.

Currently akka-persistence-inmemory doesn't maintain highestSequenceNo, and it is reset on journal cleanup. This should be fixed for future compatibility with AKKA 2.4.1

@dnvriend I'd like to pick up and fix the issue if you have no objections.

Improve doc for Java

As a beginner with AKKA (using javadsl) I had some troubles sending the "ClearJournal" message to the in-memory-store. Currently it seems not to be possible to do this with a proper Java-API. I had to use some strange "$"-classes (at least for me because I don't know Scala).

The documentation should be improved with a "full" example:

TestProbe tp = new TestProbe(akkaSetup.actorSystem());
StorageExtensionImpl extension = (StorageExtensionImpl) StorageExtension.get(akkaSetup.actorSystem());
ActorRef actorRef = extension.journalStorage();

InMemoryJournalStorage.ClearJournal$ myvar = InMemoryJournalStorage.ClearJournal$.MODULE$;
        tp.send(actorRef, myvar);
        tp.expectMsg(new Status.Success(""));

Also I'm not really sure about the versions. In maven I have 2.5.15.2 configured. However the latest version in github is 2.5.15.1.

Using wrong compareTo function for TimeBasedUUIDs in eventsByTag

Hi,

I am using akka-persistence-inmemory not only for unit-testing, but also for a proof-of-concept application that runs longer than some seconds. The application "writes" events to the inmemory-journal and synchronizes them via PersistenceQuery using readJournal.eventsByTag. The offset-mode is set to uuid.

During testing I stumbled over the problem that after a (random) amount of time the eventsByTag source stops emitting events that are persisted. I currently assume this is because the InMemoryJournalStorage.eventsByTag uses > to compare two TimeBasedUUIDs, which uses the java.util.UUID.compareTo (InMemoryJournalStorage)

However, this compareTo function does not use the same ordering as the TimeBasedUUIDs which are generated in UUIDS.timeBased().

Therefore, it may happen that a current timeBasedUUID is less than an older timeBasedUUID, and thus the eventsByTag does not return the old (but not yet delivered) event.

A small failing test confirms this behaviour:

    import java.time.{OffsetDateTime, ZoneOffset}
    import akka.persistence.inmemory.util.UUIDs
    import akka.persistence.query.TimeBasedUUID
    import org.scalatest.{Matchers, WordSpecLike}

    class TimeBasedUUIDSpec
      extends WordSpecLike with Matchers {

      "The TimeBasedUUID" when {
        "comparing two TimeBasedUUIDs that differ by a second using '<' " should {
          "compare correctly" in {
    
            val one = TimeBasedUUID(UUIDs.startOf(OffsetDateTime.of(2002, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toEpochSecond * 1000))
            val two = TimeBasedUUID(UUIDs.startOf(OffsetDateTime.of(2002, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC).toEpochSecond * 1000))

            one.value.timestamp() < two.value.timestamp() shouldBe true
            one < two shouldBe true
          }
        }
      }
    }

A possible fix could be to use a different compare function, like it is done here.

Akka 2.6

Do you plan to support Akka 2.6.x?

Seems like eventsByTag query misses events (at buffer's boundary).

Hi,

I use Akka 2.5.14 and akka-persistence-inmemory 2.5.1.1, and seems like I fall into following issue:
I emit quite fast a lot of events (more that default max-buffer-size) and sometimes 101th, 202th and 303th events are not pushed into stream.
I tried to read code, and following place I consider as suspicious:

  override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
    Source.unfoldAsync[Offset, Seq[EventEnvelope]](offset) { (from: Offset) =>
      def nextFromOffset(xs: Seq[EventEnvelope]): Offset = {
        if (xs.isEmpty) from else xs.last.offset match {
          case Sequence(n)         => Sequence(n)
          case TimeBasedUUID(time) => TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time) + 1))
        }
      }
      ticker.flatMapConcat(_ => currentEventsByTag(tag, from)
        .take(maxBufferSize)).runWith(Sink.seq).map { xs =>
        val next = nextFromOffset(xs)
        Some((next, xs))
      }
    }.mapConcat(identity)

Let consider an example (real values from test, sorry I cannot share test yet):
100th event has TimeBasedUUID(fa7225e0-a223-11e8-b71e-e9435a127f49)
101th event has TimeBasedUUID(fa7225e1-a223-11e8-b71e-e9435a127f49)

According to my logs, my stream processing logic never got 101th event. If we run code from nextFromOffset:

@ val time1 = UUID.fromString("fa7225e0-a223-11e8-b71e-e9435a127f49") 
time1: UUID = fa7225e0-a223-11e8-b71e-e9435a127f49
@ val next1 = TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time1) + 1)) 
next1: TimeBasedUUID = TimeBasedUUID(fa724cf0-a223-11e8-8080-808080808080)
@ val time2 = UUID.fromString("fa7225e1-a223-11e8-b71e-e9435a127f49") 
time2: UUID = fa7225e1-a223-11e8-b71e-e9435a127f49
@ TimeBasedUUID(time2).compare(next1) 
res9: Int = -1

So if currentEventsByTag returned 101 events, last one is dropped by take(100) and next offset returned to unfold will be fa724cf0-a223-11e8-8080-808080808080 which is after 101th event's timestamp.

If I set max-buffer-size to value above number of events my test can generate - everything works fine.

What you think about my thoughts?

Ability to get all saved snaphots

For now there is SnapshotForMaxSequenceNr message in InMemorySnapshotStorage, but it returns just one snapshot. It would be great to have a message like Snapshots to get all snapshots by persistenceId. It is useful in testing scenarios where someone need to test proper snapshot saving and deletion.

[question] Instanciating the read-journal

Hi Dennis,

On the README file you show how to instantiate the read journal in Scala as such:

lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
 .asInstanceOf[ReadJournal
    with CurrentPersistenceIdsQuery
    with AllPersistenceIdsQuery
    with CurrentEventsByPersistenceIdQuery
    with CurrentEventsByTagQuery
    with EventsByPersistenceIdQuery
    with EventsByTagQuery]

Why aren't you promoting it like the following?

  lazy val readJournal =
    PersistenceQuery(system)
      .readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier)

It's not only conciser, but also less error prone. Any thoughts?

Dropping support for Akka 2.4 for 2.5

Akka 2.4.20 is now 1 year old, I am dropping support for it. The 2.5.0.0 branch will be renamed to master and the codebase will be refactored to support 2.5.0. The current master will be renamed to 2.4.20.

Missing support for keyspace similar to akka-persistence-cassandra

In our product we use this great plugin as well when we start up the developers local servers. We have many applications running store their data in deployed environment in different cassandra keyspaces. In order to test all this applications together we need this feature as well local in memory.

As discussed in #39 I created #59

Problem serializing scalaxb autogenerated classes

Hello, we have a project that we tested using the in-memory journal and snapshot store. Out project uses scalaxb to autogenerate case classes from XSD schemas. We switched to your approach so we can do persistence queries. The problem is that now the persistence does not work anymore. State (in PersisteneFMS) is not persisted and the only error we could find is "Rejected to persist event type ... due to [scalaxb.XMLStandardTypes$$anon$12]." I thought java serialization would be used and it would not fail. This worked with the default memory journal we used before. I also tried to write an Event Adapter to bypass this by making a string representation of the case class and persisted that instead. Still not working. Any idea what could be the issue here?

Snapshot sequence number is not increased when snapshoting

Hello. I have a project that for testing uses the akka-persistence-inmemory. The problem is that the snapshot sequence number is not increased. I cannot publish the code since is private but I just create a simple PersistentActor and call the saveSnapshot(...) 3 times. I have another actor recovering with the same persistence ID and it gets the latest snapshot but the sequence number is 0 and it should be 3. I checked the log and I do get 3 SaveSnapshotSuccess(---) messages but they all have the same sequence number. Anyone know why?

Fails with exception on start

Akka version 2.11-2.4.6
library version 1.2.14

in application.conf I put:

akka ...
actor ...
persistence {
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
}

Getting exception wich looks like incomatible to version

[11:04:13][Step 1/1] app 2016-05-26 11:04:13.828 ERROR [coreservices-akka.actor.default-dispatcher-37][][akka.actor.OneForOneStrategy][][][][][] head of empty list [11:04:13][Step 1/1] app java.util.NoSuchElementException: head of empty list [11:04:13][Step 1/1] app at scala.collection.immutable.Nil$.head(List.scala:420) ~[scala-library-2.11.8.jar:na] [11:04:13][Step 1/1] app at scala.collection.immutable.Nil$.head(List.scala:417) ~[scala-library-2.11.8.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.invoke(ActorCell.scala:495) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_40] [11:04:13][Step 1/1] app 2016-05-26 11:04:13.836 ERROR [coreservices-akka.actor.default-dispatcher-37][][akka.actor.ActorSystemImpl][][][][][] Uncaught fatal error from thread [coreservices-akka.persistence.dispatchers.default-plugin-dispatcher-40] shutting down ActorSystem [coreservices] [11:04:13][Step 1/1] app java.lang.NoSuchMethodError: akka.stream.ActorMaterializer$.apply(Lscala/Option;Lscala/Option;Lakka/actor/ActorRefFactory;)Lakka/stream/ActorMaterializer; [11:04:13][Step 1/1] app at akka.persistence.inmemory.snapshot.InMemorySnapshotStore.<init>(InMemorySnapshotStore.scala:33) ~[akka-persistence-inmemory_2.11-1.2.14.jar:1.2.14] [11:04:13][Step 1/1] app at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.lang.reflect.Constructor.newInstance(Constructor.java:422) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.lang.Class.newInstance(Class.java:442) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at akka.util.Reflect$.instantiate(Reflect.scala:44) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.NoArgsReflectConstructor.produce(IndirectActorProducer.scala:105) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.Props.newActor(Props.scala:213) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.newActor(ActorCell.scala:562) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.create(ActorCell.scala:588) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at akka.dispatch.Mailbox.run(Mailbox.scala:223) ~[akka-actor_2.11-2.4.6.jar:na] [11:04:13][Step 1/1] app at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_40] [11:04:13][Step 1/1] app at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_40]

Error during actor creation due to misconfiguration

I'm sure this is user error, but I am unsure how to get past it. Help?

According to the docs, I shouldn't need any more than:
akka.persistence.journal.plugin = "in-memory-journal", correct?
However, doing this yields the following error:

requirement failed: 'reference.conf' is missing persistence plugin config path: 'akka.persistence.journal.in-memory-journal'
akka.actor.ActorInitializationException: exception during creation

Using akka.persistence.journal.plugin = "akka.persistence.journal.in-memory-journal" yields the same results.

I have

  <repositories>
    <repository>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
      <id>central</id>
      <name>bintray</name>
      <url>http://dl.bintray.com/dnvriend/maven</url>
    </repository>
  </repositories>
...
    <dependency>
      <groupId>com.github.dnvriend</groupId>
      <artifactId>akka-persistence-inmemory_2.11</artifactId>
      <version>1.1.3</version>
    </dependency>

in my pom.

Serialization error in InMemoryAsyncWriteJournal after deleting messages

Hi,
In our project we switched from Akka 2.4.16 to 2.5.17 and also from akka-persistence-inmemory plugin version 1.1.6 to 2.5.0. After that we got a problem: sometimes app craches with error during serialization inside persisting messages by in-memory plugin.

In our system there are few kinds of messages which are persisted, some of them are persisted by persistAsync() and some of them by persist() methods. We also have custom serializer for messages that once allocate protostuff buffer while initializing and then clean it in the end of every call of toBinary() method (We don't allocate it every time we call toBinary(), because messages can be large and protostuff doc advises for such situations to allocate buffer once and then reuse it).

For version old versions of akka and plugin everything was okay, but in new version there are problems when different kinds of messages are persisting the same time - after debugging we found that they are executing in different threads in the same time and share the same buffer, so protostuff throws IllegalState message (because buffer is not in reset state for one of threads) and our app crashes.
Strange thing is that such situation occurs only after calling deleteMessages(N), N > 0.

We also got this error in akka 2.4.16 and plugin version 2.4.18, so looks like something about persisting and deleting event were changed in plugin.

Stacktrace of error:

	at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistentPayloadBuilder(MessageSerializer.scala:176) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
	at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistentMessageBuilder(MessageSerializer.scala:152) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
	at akka.persistence.serialization.MessageSerializer.toBinary(MessageSerializer.scala:47) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
	at akka.serialization.Serialization$$anonfun$serialize$1$$anonfun$apply$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.serialization.Serialization$$anonfun$serialize$1$$anonfun$apply$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.12.jar:na]
	at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:177) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.serialization.Serialization.akka$serialization$Serialization$$withTransportInformation(Serialization.scala:168) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.serialization.Serialization.serialize(Serialization.scala:177) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal.serialize(InMemoryAsyncWriteJournal.scala:51) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
	at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal.akka$persistence$inmemory$journal$InMemoryAsyncWriteJournal$$$anonfun$4(InMemoryAsyncWriteJournal.scala:65) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
	at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal$lambda$$$nestedInAnonfun$3$1.apply(InMemoryAsyncWriteJournal.scala:64) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
	at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal$lambda$$$nestedInAnonfun$3$1.apply(InMemoryAsyncWriteJournal.scala:64) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
	at akka.stream.impl.fusing.Map$$anon$7.onPush(Ops.scala:47) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:499) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:401) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:701) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:716) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:709) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.actor.Actor$class.aroundPreStart(Actor.scala:528) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) ~[akka-stream_2.11-2.5.1.jar:na]
	at akka.actor.ActorCell.create(ActorCell.scala:652) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.17.jar:2.5.17]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.17.jar:2.5.17]
Caused by: java.lang.ArrayIndexOutOfBoundsException: null
	at java.lang.System.arraycopy(Native Method) ~[na:1.8.0_112]
	at io.protostuff.WriteSession.toByteArray(WriteSession.java:149) ~[protostuff-api-1.3.8.jar:1.3.8]
	at io.protostuff.ProtostuffIOUtil.toByteArray(ProtostuffIOUtil.java:194) ~[protostuff-core-1.3.8.jar:1.3.8]  ```

2.4-M3 Compatibility

Hi - it would be brilliant if you could update for M3 compatibility, since I'm desperate to get my hands on all the new persistence goodies but all my akka persistence unit tests use inmemory so I never have to worry about existing state.

"Old Akka Persistence plugins released for 2.3 are not compatible with the changes that were done in 2.4-M2. We do not plan to do more changes to the plugin API so now is a good time for Akka Persistence plugin maintainers to migrate and release a preview for 2.4-M3. Even though there are rather many API changes since 2.3 in the Persistence plugin API it should not be difficult to migrate. "

SBT cannot find the package

build.sbt

scalaVersion := "2.12.7"
val akka_version = "2.5.17"

resolvers ++= Seq(
    "jitpack" at "https://jitpack.io",
    Resolver.jcenterRepo
)

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor-typed" % akka_version,
    "com.typesafe.akka" %% "akka-stream-typed" % akka_version,
    "com.typesafe.akka" %% "akka-persistence-typed" % akka_version,
    "com.typesafe.akka" %% "akka-persistence-query" % akka_version,
    "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akka_version,
    "com.typesafe.akka" %% "akka-cluster-typed" % akka_version,
    "com.typesafe.akka" %% "akka-actor-testkit-typed" % akka_version % Test,
    "com.typesafe.akka" %% "akka-stream-testkit" % akka_version % Test,
    "com.typesafe.akka" %% "akka-http" % "10.1.5",
    "com.typesafe.akka" %% "akka-slf4j" % akka_version,
    "com.typesafe.akka" %% "akka-distributed-data" % akka_version,
    "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",

    "com.github.dnvriend" % "akka-persistence-inmemory_2.11" % "2.5.15.1" % Test

    // some other deps here
)

Result:

[warn] ==== jitpack: tried
[warn]   https://jitpack.io/com/github/dnvriend/akka-persistence-inmemory_2.11/2.5.15.1/akka-persistence-inmemory_2.11-2.5.15.1.pom
[warn] ==== jcenter: tried
[warn]   https://jcenter.bintray.com/com/github/dnvriend/akka-persistence-inmemory_2.11/2.5.15.1/akka-persistence-inmemory_2.11-2.5.15.1.pom
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	::          UNRESOLVED DEPENDENCIES         ::
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	:: com.github.dnvriend#akka-persistence-inmemory_2.11;2.5.15.1: not found
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] 	Note: Unresolved dependencies path:
[warn] 		com.github.dnvriend:akka-persistence-inmemory_2.11:2.5.15.1 (C:\WLCR\google-play-crawler\build.sbt#L19-51)
[warn] 		  +- default:google-play-crawler_2.12:0.1
[error] sbt.librarymanagement.ResolveException: unresolved dependency: com.github.dnvriend#akka-persistence-inmemory_2.11;2.5.15.1: not found

unresolved dependency: com.github.dnvriend#akka-persistence-inmemory_2.11;1.1.5: not found

My plugins.sbt looks like

logLevel := Level.Warn

resolvers += Classpaths.typesafeReleases
resolvers += Classpaths.sbtPluginReleases
resolvers += "dnvriend at bintray" at "http://dl.bintray.com/dnvriend/maven"
resolvers += Resolver.url("bintray-sbt-plugin-releases", url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.3")

and Build.scala is

lazy val persistence = project
    .settings(commonSettings: _*)
    .settings(libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-persistence" % "2.4-SNAPSHOT",
      "org.iq80.leveldb" % "leveldb" % "0.7",
      "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",
      "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.1.5" % "test"
    ))

when I try to update my project, I see

> update
[info] Updating {file:/Users/harit/IdeaProjects/akka101/}akka101...
[info] Updating {file:/Users/harit/IdeaProjects/akka101/}test_harness...
[info] Updating {file:/Users/harit/IdeaProjects/akka101/}cluster_simple...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Updating {file:/Users/harit/IdeaProjects/akka101/}persistence...
[info] Resolving com.github.dnvriend#akka-persistence-inmemory_2.11;1.1.5 ...
[warn]  module not found: com.github.dnvriend#akka-persistence-inmemory_2.11;1.1.5
[warn] ==== local: tried
[warn]   /Users/harit/.ivy2/local/com.github.dnvriend/akka-persistence-inmemory_2.11/1.1.5/ivys/ivy.xml
[warn] ==== public: tried
[warn]   https://repo1.maven.org/maven2/com/github/dnvriend/akka-persistence-inmemory_2.11/1.1.5/akka-persistence-inmemory_2.11-1.1.5.pom
[warn] ==== Typesafe Repository: tried
[warn]   http://repo.typesafe.com/typesafe/releases/com/github/dnvriend/akka-persistence-inmemory_2.11/1.1.5/akka-persistence-inmemory_2.11-1.1.5.pom
[info] Resolving jline#jline;2.12.1 ...
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn]  ::          UNRESOLVED DEPENDENCIES         ::
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn]  :: com.github.dnvriend#akka-persistence-inmemory_2.11;1.1.5: not found
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn] 
[warn]  Note: Unresolved dependencies path:
[warn]      com.github.dnvriend:akka-persistence-inmemory_2.11:1.1.5 (/Users/harit/IdeaProjects/akka101/project/Akka101Build.scala#L33)
[warn]        +- com.learner:persistence_2.11:0.1.0
[trace] Stack trace suppressed: run last persistence/*:update for the full output.
[error] (persistence/*:update) sbt.ResolveException: unresolved dependency: com.github.dnvriend#akka-persistence-inmemory_2.11;1.1.5: not found
[error] Total time: 1 s, completed Nov 26, 2015 10:01:03 PM
> 

what am I doing wrong?

eventsByPersistenceId query from an actor makes actor's context = null

Finally pinpointed the problem. I define a trait:

trait InMemQuerySupport extends Actor {
        import akka.NotUsed
        import akka.persistence.query.{EventEnvelope, PersistenceQuery}
        import akka.stream.scaladsl.Source
        import akka.persistence.inmemory.query.scaladsl.InMemoryReadJournal

        def queryJournal(idToQuery: String, fromSequenceNr: Long = 0L,
                         toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
          val source = PersistenceQuery(context.system).           
          readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier).
          eventsByPersistenceId(idToQuery, fromSequenceNr, toSequenceNr)
        }

        def queryJournalFrom(idToQuery: String, fromSequenceNr: Long = 0L): Source[EventEnvelope, NotUsed] =
          {
          assert(context != null, "This passes.")
          val source = queryJournal(idToQuery, fromSequenceNr, Long.MaxValue)
          assert(context != null, "This fails.")
          source
           }

      }

which I mix-in with a persistent actor. Calling queryJournalFrom(queryId, offsetForNextFetch) inside the actor in a test during the recovery phase for the actor causes the assertion above to fail. This is the full context of the code above. and although the system works in the demo, integrating it into another project with the same environment fails. This doesn't seem to happen with Cassandra's journal plugin.

unknown actor creator [class akka.persistence.inmemory

with version 1.2.5

[Error][2016-02-16 15:55:39]: unknown actor creator [class akka.persistence.inmemory.journal.InMemoryJournal]

akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:172)
at akka.actor.ActorCell.create(ActorCell.scala:606)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: unknown actor creator [class akka.persistence.inmemory.journal.InMemoryJournal]
at akka.actor.IndirectActorProducer$.apply(IndirectActorProducer.scala:62)
at akka.actor.Props.producer(Props.scala:132)
at akka.actor.Props.<init>(Props.scala:145)
at akka.persistence.Persistence.akka$persistence$Persistence$$createPlugin(Persistence.scala:280)
at akka.persistence.Persistence$PluginHolderExtensionId.createExtension(Persistence.scala:300)
at akka.persistence.Persistence$PluginHolderExtensionId.createExtension(Persistence.scala:294)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:757)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.persistence.Persistence$PluginHolderExtensionId.apply(Persistence.scala:294)
at akka.persistence.Persistence.pluginHolderFor(Persistence.scala:258)
at akka.persistence.Persistence.journalConfigFor(Persistence.scala:214)
at akka.persistence.Eventsourced$class.$init$(Eventsourced.scala:55)
at qgame.engine.pay.module.order.UnHandledReceiptActor.<init>(UnHandledReceiptActor.scala:17)
at qgame.engine.pay.module.order.UnHandledReceiptActor$$anonfun$props$1.apply(UnHandledReceiptActor.scala:71)
at qgame.engine.pay.module.order.UnHandledReceiptActor$$anonfun$props$1.apply(UnHandledReceiptActor.scala:71)
at akka.actor.TypedCreatorFunctionConsumer.produce(IndirectActorProducer.scala:87)
at akka.actor.Props.newActor(Props.scala:214)
at akka.actor.ActorCell.newActor(ActorCell.scala:562)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more

Publish for Scala 2.13

Publishing a release for Scala 2.13 would simplify using this for testing newer in newer projects. Is that something that would be possible?

Should not use scheduler with repeat-delay=0

In InMemoryReadJournal there is a repeat with zero delay:
Source.tick(refreshInterval, 0.seconds, 0)

Why is zero used as repeat delay? Using 10 Miliseconds would solve the problem with the following combination:

When using https://github.com/miguno/akka-mock-scheduler this causes a infinite loop.
The Problem is an implementation detail, that results in an endless-loop when using repeat-delay=zero.
(See https://github.com/miguno/akka-mock-scheduler/blob/develop/src/main/scala/com/miguno/akka/testing/MockScheduler.scala#L33)

no serializer for internal plugin messages

During testing I have akka.actor.serialize-messages = on (with = off everything works ok)
It means every message sent through tell / ask will be serialized then deserialized then placed to actor mailbox.

All my messages inherit from one trait and I provide serializer for this trait.
Also akka handles it's own internal messages with custom serializer (protobuf I think).

Messages sent internally in this plugin have no defined serializer and default java serializer is used.

In my particular case I hit trouble in this line:
https://github.com/dnvriend/akka-persistence-inmemory/blob/master/src/main/scala/akka/persistence/inmemory/journal/InMemoryAsyncWriteJournal.scala#L77

WriteList is sent to actor which means akka will serialize and deserialize this message.
Because there is no special serializer a default java one will be used.
But this object stores my classed internally which means java serializer will be applied to my classes but it will cause serialization to fail because they must be serialized with my serializer.

One possible idea could be to extend JournalCommand with NoSerializationVerificationNeeded which will force akka to skip serialization of these messages even though serialize-messages = on.

Issues using akka-persistence-memory with fun-cqrs

Hi.

I am trying to use funCQRS with the inmemory backend but I am having some issues.
The command side works fine and it seems that I can save data.
What doesn't seem to work is the projection side and I was asked by the maintainer of funCQRS to check the config for the backend. From the docs it seems that the only config required is:

akka.persistence {
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
}

Is this all the config needed?

Again funCQRS mentioned that we need another entry in the application.conf:

akka.persistence {
tagger = "io.funcqrs.akka.DomainEventTagAdapter"
}

But, I see no mention of this anywhere in the docs here.

Thanks in advance.

Publish akka-persistence-inmemory with Java 8

com.github.dnvriend:akka-persistence-inmemory_2.12:2.5.15.0 is published with Java 10. As a result it can only be used with other Java 10 code :-( Can you publish a new version compiled with Java 8? as we are not yet in a position to upgrade.

    2018-09-03 09:26:54.615Z ERROR [akka.actor.ActorSystemImpl] Uncaught error from thread [async-actor-spec-root-akka.actor.default-dispatcher-4]: akka/persistence/inmemory/util/UUIDUtil has been compiled by a more recent version of the Java Runtime (class file version 54.0), this version of the Java Runtime only recognizes class file versions up to 52.0, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[async-actor-spec-root] sourceThread=async-actor-spec-root-akka.actor.default-dispatcher-4, akkaTimestamp=09:26:54.614UTC, akkaSource=akka.actor.ActorSystemImpl(async-actor-spec-root), sourceActorSystem=async-actor-spec-root
    java.lang.UnsupportedClassVersionError: akka/persistence/inmemory/util/UUIDUtil has been compiled by a more recent version of the Java Runtime (class file version 54.0), this version of the Java Runtime only recognizes class file versions up to 52.0

Native call in Cassandra UUIDs causes journal to hang

Hi @dnvriend

You will be amazed by this one.

The cassandra UUDs utility class has a native call to collect the process id.
This happen on this line:
https://github.com/datastax/java-driver/blob/af5a17c028b44defc675fb0145e56cd8cd45b651/driver-core/src/main/java/com/datastax/driver/core/utils/UUIDs.java#L155

It turns out that this hangs depending on your system + jvm version. I'm on a Mac OS Siena running Java 1.8.121.

It hangs only on the first persisted event. When the first JournalEntry is initialised. After that the bug won't happen anymore because it relies on a static global variable inside the cassandra driver. PosixLoader.GETPID_AVAILABLE to be precise.

The consequence of this is that the first event ever persisted block the actor for quite a while and the caller gets a timeout.

I have tried to work around it by initialising the PID earlier, but wasn't successful.

Currently, I initialise it on a support test class in Fun.CQRS, but obviously the solution must be added to the plugin instead.
(see https://github.com/strongtyped/fun-cqrs/blob/feature/75-path-dependent-types/modules/akka/src/test/scala/io/funcqrs/akka/AkkaBackendSupport.scala#L27)

Persistence Query JavaDsl

It would be cool if we there was the JavaDsl implementation for the read journal. I prepared it for the 1.1.6 version because we ware currently on AKKA 2.4.1 but I could also do it for the current version and create a pull request.

I would suggest to simply do it like the MongoDB AKKA persistence plugin: https://github.com/scullxbones/akka-persistence-mongo/blob/master/common/src/main/scala/akka/contrib/persistence/mongodb/MongoReadJournal.scala

I am also not sure how to test this, any ideas about that without a lot of duplicated test code?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.