Giter Site home page Giter Site logo

akka / akka-persistence-cassandra Goto Github PK

View Code? Open in Web Editor NEW
328.0 19.0 135.0 3.4 MB

A replicated Akka Persistence journal backed by Apache Cassandra

Home Page: https://doc.akka.io/docs/akka-persistence-cassandra/

License: Other

Scala 99.96% Shell 0.04%
akka-persistence cassandra akka cqrs eventsourcing

akka-persistence-cassandra's Introduction

Akka

The Akka family of projects is managed by teams at Lightbend with help from the community.

We believe that writing correct concurrent & distributed, resilient and elastic applications is too hard. Most of the time it's because we are using the wrong tools and the wrong level of abstraction.

Akka is here to change that.

Using the Actor Model we raise the abstraction level and provide a better platform to build correct concurrent and scalable applications. This model is a perfect match for the principles laid out in the Reactive Manifesto.

For resilience, we adopt the "Let it crash" model which the telecom industry has used with great success to build applications that self-heal and systems that never stop.

Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Learn more at akka.io.

Reference Documentation

The reference documentation is available at doc.akka.io, for Scala and Java.

Current versions of all Akka libraries

The current versions of all Akka libraries are listed on the Akka Dependencies page. Releases of the Akka core libraries in this repository are listed on the GitHub releases page.

Community

You can join these groups and chats to discuss and ask Akka related questions:

In addition to that, you may enjoy following:

Contributing

Contributions are very welcome!

If you see an issue that you'd like to see fixed, or want to shape out some ideas, the best way to make it happen is to help out by submitting a pull request implementing it. We welcome contributions from all, even you are not yet familiar with this project, We are happy to get you started, and will guide you through the process once you've submitted your PR.

Refer to the CONTRIBUTING.md file for more details about the workflow, and general hints on how to prepare your pull request. You can also ask for clarifications or guidance in GitHub issues directly, or in the akka/dev chat if a more real time communication would be of benefit.

License

Akka is licensed under the Business Source License 1.1, please see the Akka License FAQ.

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.

akka-persistence-cassandra's People

Contributors

2m avatar btomala avatar chbatey avatar danischroeter avatar ennru avatar helena avatar hlavki avatar ignasi35 avatar ihostage avatar jessepreiner avatar jewertow avatar johanandren avatar jroper avatar jsfwa avatar kasper-f avatar knirski avatar krasserm avatar ktoso avatar magro avatar marcospereira avatar matlockx avatar nvollmar avatar octonato avatar patriknw avatar psliwa avatar raboof avatar scala-steward avatar sebastian-alfers avatar xuwei-k avatar zapletal-martin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-persistence-cassandra's Issues

mark internal API

originally there was no API, but now there are a few things that are public API

set right visibility and INTERNAL API markers

Akka persistence not working on 2.4.2_RC1

I just migrated to the new version of the plugin using the new version of akka, and i'm getting a class not found exception.

I'm using Java 1.8, with akka 2.4.2_RC1 and the 0.7 version of the akka-persistence-cassandra plugin

Caused by: java.lang.NoClassDefFoundError: akka/stream/scaladsl/ImplicitMaterializer
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:21)
    at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
    at scala.util.Try$.apply(Try.scala:192)
    at akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
    at akka.persistence.Persistence.akka$persistence$Persistence$$createPlugin(Persistence.scala:274)
    at akka.persistence.Persistence$PluginHolderExtensionId.createExtension(Persistence.scala:300)
    at akka.persistence.Persistence$PluginHolderExtensionId.createExtension(Persistence.scala:294)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:757)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.persistence.Persistence$PluginHolderExtensionId.apply(Persistence.scala:294)
    at akka.persistence.Persistence.pluginHolderFor(Persistence.scala:258)
    at akka.persistence.Persistence.journalConfigFor(Persistence.scala:214)
    at akka.persistence.Eventsourced$class.$init$(Eventsourced.scala:54)
    at akka.persistence.UntypedPersistentActor.<init>(PersistentActor.scala:111)
    at com.xtiva.poc.akka.command.codes.CodesAggregate.<init>(CodesAggregate.java:40)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    ... 16 more
Caused by: java.lang.ClassNotFoundException: akka.stream.scaladsl.ImplicitMaterializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 59 more

Type mismatch at akka clustering PersistentShardCoordinator during deserialization

Steps to repro:

  1. Use both akka-cluster-sharding and akka-persistence-cassandra in the project
  2. Initialize clustered system by invoking ClusterSharding(system).start(...,typeName="MyClusteredSystem",...).
  3. Run the app
  4. Stop/kill the app after a while
  5. Run the app again - exception is thrown:

[ERROR] akka.cluster.sharding.PersistentShardCoordinator! Persistence failure when replaying events for persistenceId [/sharding/MyClusteredSystemCoordinator]. Last known sequence number [1]
java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to akka.Done
at akka.persistence.cassandra.journal.CassandraRecovery$$anonfun$asyncReplayMessages$1.apply(CassandraRecovery.scala:48) ~[akka-persistence-cassandra_2.11-0.10.jar:0.10]
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[scala-library-2.11.7.jar:1.0.0-M1]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.7.jar:1.0.0-M1]
at scala.util.Success.map(Try.scala:237) ~[scala-library-2.11.7.jar:1.0.0-M1]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[scala-library-2.11.7.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[scala-library-2.11.7.jar:na]
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) ~[akka-actor_2.11-2.4.2.jar:na]
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) ~[akka-actor_2.11-2.4.2.jar:na]
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[akka-actor_2.11-2.4.2.jar:na]
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[akka-actor_2.11-2.4.2.jar:na]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[scala-library-2.11.7.jar:na]
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) ~[akka-actor_2.11-2.4.2.jar:na]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) ~[akka-actor_2.11-2.4.2.jar:na]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405) ~[akka-actor_2.11-2.4.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]

deadlock in cassandra-snapshot-store plugin

i have 10 persistent actors, and a cassandra-snapshot-store enabled.

upon system startup, akka persistence issues a LoadSnapshot message to the snapshot store for each of my actors (10 messages total).

they all try acquiring a cassandra connection at CassandraSnapshotStore:55:
55: private[this] lazy val cassandraSession: CassandraSession

by default, the journal's "default-dispatcher" has parallelism-max = 8

in my case this means that 7 threads park on the lazy val's initialization monitor, and the remaining one tries to do the initialization, but alas! it needs to wait for a Future to complete at ConfigSessionProvider:85:

84: def clusterBuilder(clusterId: String)(implicit ec: ExecutionContext): Future[Cluster.Builder] = {
85: lookupContactPoints(clusterId).map { cp =>

the problem is that the map function does not get executed because there are no threads remaining in the default dispatcher's thread pool (it seems to be configured to be executed by the same default dispatcher).

naturally, in case i raise the parallelism-max to above 10, everything starts working fine, like so:

cassandra-snapshot-store {
...
default-dispatcher {
...
fork-join-executor {
...
parallelism-max = 16

Not adjacent sequence number in EventsByTagPublisher

Hi,
using CassandraReadJournal, eventsByTag stream correctly push the tagged event to the stream consumer when in an actor I write a first tagged event in the Journal via persist call,
but when I write a second tagged event, the stream push nothing and after 30 seconds the system logs:
[error] ...-a.p.c.q.EventsByTagPublisher: query chunk aborted for tag [...], timBucket [20160222], expected sequence number [14] for [...], but got [15].

Cassandra compatibility

Hi,

we had an issue today with the 0.8 release in conjunction with cassandra 2.1.7 as well as 2.1.9. The following error was reported:

java.lang.IllegalStateException: Unset value at index 5.
If you want this value to be null, please set it to null explicitly.

Bumping cassandra to 2.2.4 resolved it for us. Just so you know/are aware of this

Snapshots @ Sonatype (Travis?)

Looks like the build is configured to push to Sonatype, but there are no snapshots there. Does this project need to be added to Travis?

PersistentShardCoordinator recover issue with Casssandra 2.2.4

Hello,

I am experiencing an issue with Akka Sharding 2.4.4 backed by Akka Persistence Cassandra 0.14 with Cassandra 2.2.4.

My config is the following:

akka {
  cluster {
    metrics.enabled = off

    sharding {
      remember-entities = on
      journal-plugin-id = cassandra-journal-akka-sharding
      snapshot-plugin-id = cassandra-snapshot-store-akka-sharding
    }

  }
}

cassandra-journal-akka-sharding = ${cassandra-journal} {
  cassandra-2x-compat = on
  keyspace = "akka_sharding"
}
cassandra-snapshot-store-akka-sharding = ${cassandra-snapshot-store} {
  keyspace = "akka_sharding_snapshot"
}

The first time I run my node, the keyspace and tables are created fine and the system is running (events are put in akka_sharding.messages).
However, when I stopped and restarted my node, akka.cluster.sharding.PersistentShardCoordinator failed with the following error:

2016-05-10 11:48:29,201 [warn] - akka.persistence.cassandra.query.scaladsl.CassandraReadJournal - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:13 no viable alternative at input 'MATERIALIZED' (      [CREATE] MATERIALIZED...)
2016-05-10 11:48:29,213 [error] - akka.cluster.sharding.PersistentShardCoordinator - Persistence failure when replaying events for persistenceId [/sharding/TaskManagerCoordinator]. Last known sequence number [0]
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.SyntaxError: line 2:13 no viable alternative at input 'MATERIALIZED' (      [CREATE] MATERIALIZED...)

I know that cassandra-2x-compat = on is taken into account because when I remove this line, the system fails during the first start when creating the keyspace with the error:

2016-05-10 11:45:29,939 [error] - akka.cluster.sharding.PersistentShardCoordinator - Persistence failure when replaying events for persistenceId [/sharding/TaskManagerCoordinator]. Last known sequence number [0]
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.SyntaxError: line 2:13 no viable alternative at input 'MATERIALIZED' (      [CREATE] MATERIALIZED...)

It seems that when they are events in cassandra-journal-akka-sharding, the PersistentShardCoordinator fails to recover even with cassandra-2x-compat = on

asyncReplayMessages doesn't always complete

The returned Future is supposed to be always completed with success or failure, not being left in a pending uncompleted state. It is not completed if C* server is stopped in the middle of an asyncReplayMessages that is in progress.

0.11 Failing to connect to cassandra db

I've been using the 0.7 version of the plugin so far without much problems. Since we are writting a new application, we decided to go with the newer version, but now the plugin is unable to connect to our dev cassandra database.

The message i get is:

Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: All host(s) tried for query failed (tried: /172.30.255.78:9042 (com.datastax.driver.core.exceptions.TransportException: [/172.30.255.78] Error writing))

This error is not consistent though, i would get this the 90% of the times i run the code. Just FWIW the 0.7 version of the plugin works fine.

Improve keyspace and table initialization

Several related issues from krasserm/akka-persistence-cassandra

Potential race on keyspace initialization: krasserm/akka-persistence-cassandra#148

persistence query streams should work even if schema doesn't exist: krasserm/akka-persistence-cassandra#154

CassandraReadJournal doesn't create metadata table: krasserm/akka-persistence-cassandra#151

Retry schema creation from CassandraJournal and CassandraSnapshotStore: krasserm/akka-persistence-cassandra#121

The recovery mechanism for the journal does not work for multiple defined journals.

The current implementation assumes there is only one journal when recovering. As a result when you define an additional journal the persistent actor fails to recover as it uses the default cassandra-query-journal configuration which points (cassandra-query-journal.write-plugin) to the default journal (cassandra-journal).

This pull request (#76 ) fixes the problem by making the cassandra-query-journal configurable.

publishSigned

@2m and @rkuhn discovered an issue with publishSigned combined with the ReleasePlugin in another project and suggested adding:

override def requires = sbtrelease.ReleasePlugin to object Publish extends AutoPlugin { ... }

Possibility to use Cassandra Cluster Metrics

Feature request:
After connecting to Cassandra, session has a registry of metrics. These metrics give insight into Cassandra cluster performance. It would be nice to have the option to retrieve these metrics from session in order to push to a Graphite Server.

rewrite RowIterator

In #74 I have removed almost all blocking calls.

The remaining blocking is in the RowIterator that is used for loading snapshot metadata and the RowIterator that is used when deleting events (ReadLowestSequenceNr). That blocking is now contained by a separate dispatcher, but it would be good to rewrite the RowIterators.

too large batches

Due to the removal of the max-message-batch-size in akka/akka#19694 we can easily hit the max batch size limit (batch_size_fail_threshold_in_kb).

CassandraJournalPerfSpec blows up with max-message-batch-size = 500.

compatibility issue with Cassandra 2.1.9 (protocol v3)

Hi,

We just updated akka-persistence-cassandra from 0.6 to 0.7. We are using Akka Cluster Sharding in our project. We are using Cassandra 2.2.x.

Persistence-cassandra + Sharding worked smoothly with 0.6, but with 0.7 we have the following error when the TaskManagerCoordinator tries to persist a ShardRegionRegistered event:

2016-01-26 09:47:16,572 ERROR [akka.cluster.sharding.PersistentShardCoordinator] (kairos-akka.actor.default-dispatcher-19) Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionRegistered] with sequence number [1] for persistenceId [/sharding/TaskManagerCoordinator].
java.lang.IllegalStateException: Unset value at index 5. If you want this value to be null, please set it to null explicitly.
    at com.datastax.driver.core.BoundStatement.ensureAllSet(BoundStatement.java:1351)
    at com.datastax.driver.core.SessionManager.makeRequestMessage(SessionManager.java:559)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:128)
    at akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$cassandra$journal$CassandraJournal$$execute(CassandraJournal.scala:267)
    at akka.persistence.cassandra.journal.CassandraJournal$$anonfun$5.apply(CassandraJournal.scala:140)
    at akka.persistence.cassandra.journal.CassandraJournal$$anonfun$5.apply(CassandraJournal.scala:137)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.immutable.List.map(List.scala:285)
    at akka.persistence.cassandra.journal.CassandraJournal.asyncWriteMessages(CassandraJournal.scala:137)
    at akka.persistence.journal.AsyncWriteJournal$$anonfun$1$$anonfun$liftedTree1$1$1.apply(AsyncWriteJournal.scala:66)
    at akka.persistence.journal.AsyncWriteJournal$$anonfun$1$$anonfun$liftedTree1$1$1.apply(AsyncWriteJournal.scala:66)
    at akka.pattern.CircuitBreaker$State$class.materialize$1(CircuitBreaker.scala:298)
    at akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker.scala:315)
    at akka.pattern.CircuitBreaker$Closed$.callThrough(CircuitBreaker.scala:363)
    at akka.pattern.CircuitBreaker$Closed$.invoke(CircuitBreaker.scala:371)
    at akka.pattern.CircuitBreaker.withCircuitBreaker(CircuitBreaker.scala:115)
    at akka.persistence.journal.AsyncWriteJournal$$anonfun$1.liftedTree1$1(AsyncWriteJournal.scala:66)
    at akka.persistence.journal.AsyncWriteJournal$$anonfun$1.applyOrElse(AsyncWriteJournal.scala:66)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at akka.persistence.cassandra.journal.CassandraJournal.aroundReceive(CassandraJournal.scala:31)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The configured EventAdapters are not used by the CassandraReadJournal queries

Given a configuration:

cassandra-journal {
  event-adapters {
    protobuf = "com.mypackage.ProtoEventAdapter"
  }

  event-adapter-bindings {
    “com.mypackage.PersistedEvent" = protobuf
    "com.google.protobuf.MessageLite" = protobuf
  }
}

The configured event adapters are used only to write to the journal and when a event is replayed in a PersistentActor. They are not used by the CassandraReadJournal queries.

The LevelDbReadJournal call the configured event-adapters also for queries instead.

failed: CassandraSslSpec

When running tests after #19 the CassandraSslSpec* fails on my machine (osx).

01/22 11:03:15 WARN [nioEventLoopGroup-2-3] i.n.c.ChannelInitializer - Failed to initialize a channel. Closing: [id: 0x7b68d077, /127.0.0.1:53524 => /127.0.0.1:53521]
java.lang.IllegalArgumentException: Cannot support TLS_RSA_WITH_AES_256_CBC_SHA with currently installed providers
    at sun.security.ssl.CipherSuiteList.<init>(CipherSuiteList.java:92) ~[na:1.8.0_45]
    at sun.security.ssl.SSLEngineImpl.setEnabledCipherSuites(SSLEngineImpl.java:2038) ~[na:1.8.0_45]
    at org.apache.cassandra.transport.Server$AbstractSecureIntializer.createSslHandler(Server.java:361) ~[cassandra-all-3.0.1.jar:3.0.1]
    at org.apache.cassandra.transport.Server$SecureInitializer.initChannel(Server.java:416) ~[cassandra-all-3.0.1.jar:3.0.1]
    at io.netty.channel.ChannelInitializer.channelRegistered(ChannelInitializer.java:68) ~[netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRegistered(AbstractChannelHandlerContext.java:143) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRegistered(AbstractChannelHandlerContext.java:129) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRegistered(DefaultChannelPipeline.java:733) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:450) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.access$100(AbstractChannel.java:378) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:424) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358) [netty-common-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [netty-transport-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) [netty-common-4.0.33.Final.jar:4.0.33.Final]
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) [netty-common-4.0.33.Final.jar:4.0.33.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

java.lang.IllegalArgumentException: requirement failed: Region Actor not registered

Got this issue when a node was killed (probally because of thread-starvation). Only way i could fix it is a full cluster stop and drop the cassandra keyspace, which is not preferable but okay for now since we only use akka persistence for sharding.

Akka 2.4.6, akka-persistence-cassandra 0.12, Cassandra 3.0.5

Exception in receiveRecover when replaying event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardHomeAllocated] with sequence number [54001] for persistenceId [/sharding/ProductCoordinator].

java.lang.IllegalArgumentException: requirement failed: Region Actor[akka.tcp://[email protected]:53583/system/sharding/Product#-375408820] not registered: State(Map(45 -> Actor[akka://application/system/sharding/Product#689991690], 34 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 67 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 93 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 12 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 66 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 89 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 51 -> Actor[akka://application/system/sharding/Product#689991690], 84 -> Actor[akka://application/system/sharding/Product#689991690], 73 -> Actor[akka://application/system/sharding/Product#689991690], 78 -> Actor[akka://application/system/sharding/Product#689991690], 19 -> Actor[akka://application/system/sharding/Product#689991690], 23 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 4 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 88 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 77 -> Actor[akka://application/system/sharding/Product#689991690], 15 -> Actor[akka://application/system/sharding/Product#689991690], 90 -> Actor[akka://application/system/sharding/Product#689991690], 44 -> Actor[akka://application/system/sharding/Product#689991690], 33 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 22 -> Actor[akka://application/system/sharding/Product#689991690], 56 -> Actor[akka://application/system/sharding/Product#689991690], 55 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 26 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 50 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 37 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 68 -> Actor[akka://application/system/sharding/Product#689991690], 13 -> Actor[akka://application/system/sharding/Product#689991690], 46 -> Actor[akka://application/system/sharding/Product#689991690], 24 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 94 -> Actor[akka://application/system/sharding/Product#689991690], 83 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 35 -> Actor[akka://application/system/sharding/Product#689991690], 16 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 79 -> Actor[akka://application/system/sharding/Product#689991690], 5 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 10 -> Actor[akka://application/system/sharding/Product#689991690], 59 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 87 -> Actor[akka://application/system/sharding/Product#689991690], 48 -> Actor[akka://application/system/sharding/Product#689991690], 21 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 76 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 54 -> Actor[akka://application/system/sharding/Product#689991690], 65 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 71 -> Actor[akka://application/system/sharding/Product#689991690], 32 -> Actor[akka://application/system/sharding/Product#689991690], 80 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 82 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 49 -> Actor[akka://application/system/sharding/Product#689991690], 6 -> Actor[akka://application/system/sharding/Product#689991690], 36 -> Actor[akka://application/system/sharding/Product#689991690], 1 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 17 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 25 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 60 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 47 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 31 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 69 -> Actor[akka://application/system/sharding/Product#689991690], 95 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 64 -> Actor[akka://application/system/sharding/Product#689991690], 53 -> Actor[akka://application/system/sharding/Product#689991690], 42 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 75 -> Actor[akka://application/system/sharding/Product#689991690], 0 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 20 -> Actor[akka://application/system/sharding/Product#689991690], 27 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 70 -> Actor[akka://application/system/sharding/Product#689991690], 2 -> Actor[akka://application/system/sharding/Product#689991690], 86 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 38 -> Actor[akka://application/system/sharding/Product#689991690], 81 -> Actor[akka://application/system/sharding/Product#689991690], 92 -> Actor[akka://application/system/sharding/Product#689991690], 18 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 7 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 29 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 41 -> Actor[akka://application/system/sharding/Product#689991690], 63 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 91 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970], 52 -> Actor[akka://application/system/sharding/Product#689991690], 85 -> Actor[akka://application/system/sharding/Product#689991690], 28 -> Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970]),Map(Actor[akka.tcp://[email protected]:58390/system/sharding/Product#-596355970] -> Vector(34, 67, 93, 12, 66, 89, 23, 4, 88, 33, 55, 26, 50, 37, 24, 83, 16, 5, 59, 21, 76, 65, 80, 82, 1, 17, 25, 60, 47, 31, 95, 42, 0, 27, 86, 18, 7, 29, 63, 91, 28), Actor[akka://application/system/sharding/Product#689991690] -> Vector(45, 51, 84, 73, 78, 19, 77, 15, 90, 44, 22, 56, 68, 13, 46, 94, 35, 79, 10, 87, 48, 54, 71, 32, 49, 6, 36, 69, 64, 53, 75, 20, 70, 2, 38, 81, 92, 41, 52, 85)),Set(Actor[akka.tcp://[email protected]:34712/system/sharding/Product#739142098], Actor[akka.tcp://[email protected]:59506/system/sharding/Product#-1272485012]),Set(),false)
    at scala.Predef$.require(Predef.scala:224)
    at akka.cluster.sharding.ShardCoordinator$Internal$State.updated(ShardCoordinator.scala:289)
    at akka.cluster.sharding.PersistentShardCoordinator$$anonfun$receiveRecover$1.applyOrElse(ShardCoordinator.scala:752)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.persistence.Eventsourced$$anon$3$$anonfun$1.applyOrElse(Eventsourced.scala:467)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
    at akka.cluster.sharding.PersistentShardCoordinator.akka$persistence$Eventsourced$$super$aroundReceive(ShardCoordinator.scala:720)
    at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:511)
    at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:176)
    at akka.cluster.sharding.PersistentShardCoordinator.aroundReceive(ShardCoordinator.scala:720)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:495)
    at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)

I did however query for the event in cassandra, not sure this is usable.

cqlsh:akka> select * from messages where persistence_id = '/sharding/ProductCoordinator' AND partition_nr = 0 AND sequence_nr = 54001;       

 persistence_id               | partition_nr | sequence_nr | timestamp                            | timebucket | used | event                                                                                                                                                                | event_manifest | message | ser_id | ser_manifest | tag1 | tag2 | tag3 | writer_uuid
------------------------------+--------------+-------------+--------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+--------+--------------+------+------+------+--------------------------------------
 /sharding/ProductCoordinator |            0 |       54001 | fefd61a0-2cb5-11e6-ab42-c7c25b8bbbc4 |   20160607 | True |       0x0a01381249616b6b612e7463703a2f2f6170706c69636174696f6e403137322e31362e362e343a35333839382f73797374656d2f7368617264696e672f50726f6475637423363839393931363930 |                |    null |     13 |           AF | null | null | null | 7442711a-5681-4486-8c79-8fb94f9b1795
 /sharding/ProductCoordinator |            0 |       54001 | 07c43cf0-2cb6-11e6-b3d0-c5656fbdf40b |   20160607 | True | 0x0a023336124b616b6b612e7463703a2f2f6170706c69636174696f6e403137322e31362e31312e333a35333538332f73797374656d2f7368617264696e672f50726f64756374232d333735343038383230 |                |    null |     13 |           AF | null | null | null | 3671fc9f-0708-449d-9740-60e25e68daae

(2 rows)

Config refactoring: Enforce one-to-one between the write and the query plugins

It must be one-to-one between the write and the query plugins. It would be more elegant if we could enforce that in the configuration. I'm thinking if we can do that the query journal is always a sub-section of the write journal config.

cassandra-journal {
  query-journal {
  }
}

other-cassandra-journal {
  query-journal {
  }
}

Then we would not need these query-plugin and write-plugin cross references.

To implement that we need: akka/akka#19822

Serialise PersistentRepr as Json

I'd like to serialiser PersistentRepr as Json, however it looks like I would need to heavily override CassandraJournal in order achieve this.

  1. Do you have any plans on providing ability to serialise PersistentRepr as Json ?
  2. Are there any ideas on how it can be done on my side with minimal changes to CassandraJournal ?

Resolve Cassandra's host and port dynamically

Akka Persistence Cassandra presently assumes that a Cassandra cluster is accessible at a fixed set of IP addresses. While this may be the case for large Cassandra clusters, it is likely so for smaller Cassandra clusters that are associated with microservices.

This problem can be resolved by handling a Future[Option[(InetAddress, Int)]] each time a connection is required. We had to solve a similar issue with our internal contrail library. contrail now looks for an actor responsible for service location via actor selection if config declares that this is what we must do. Otherwise we declare a successful future that returns the static host and port of config.

Keyspace creation fails permanently if a PoisonPill is sent to the only PersistentActor

Note: This title of this issue might be wrong, we've spent the last 3 hours try to diagnose this issue and that title is the best we can do to describe it.

Our initial start of our application sees that between the startup of the first PersistentActor and that actor becoming ready took 1 minute 22 seconds (when we allow it to complete):

[DEBUG] 2016-02-19T10:56:38,434 [akka.persistence.Persistence(akka://octopus)] sistence.Persistence(akka://octopus) - Create plugin: cassandra-journal akka.persistence.cassandra.journal.CassandraJournal
[DEBUG] 2016-02-19T10:56:38,463 [akka.persistence.Persistence(akka://octopus)] sistence.Persistence(akka://octopus) - Create plugin: cassandra-snapshot-store akka.persistence.cassandra.snapshot.CassandraSnapshotStore
...
[DEBUG] 2016-02-19T10:58:00,288 [akka.tcp://octopus@localhost:22452/system/cassandra-journal] e.cassandra.journal.CassandraJournal - initialized CassandraSession successfully

During that time the actor in question had scheduled a 15 second PoisonPill to itself to expect it's initial state. When the actor was stopped, it's guardian restarted it on a 2 second back off.

When the PoisonPill was processed it looks like the Cassandra plugin broke something (we're not sure what). Because if we stopped the system completely, increased the PoisonPill time to 10 minutes and restarted, the system never came up and eventually gave us:

Persistence failure when replaying events for persistenceId [TLO-B]. Last known sequence number [0]
akka.pattern.CircuitBreakerOpenException: Circuit Breaker is open; calls are failing fast

If we changed from the original key space name (KS1), to a new one (KS2) and started the system with the 10 minute timeout, after 1 minute 22 seconds the actor got and processed it's initial state. We never saw a persistence failure. Every time we restarted the system under KS2 it starts up almost instantly without an issue, however restarting under KS1 gives us the PersistenceFailure messages after a while.

Infrastructure

We are running a 4 node Cassandra cluster in separate VM's on a single host.
Our application sits on another VM on a different host.

Group batch writes based on how much data will be sent to cassandra

Akka persistence batches requests under the hood. When the batch becomes to large the Cassandra server will reject the write (default 50KiB).

I want control over how much data akka persistence cassandra will include in one write batch. My use case is log files which means that the size can vary from message to message. Therefore it would make most sense to expose a setting that would allow the user to set the max size so that the Journals don't go above this limit. Default size would be 50KiB since that is Cassandras default size.

Since I was curious I did start a rough sketch, it turned out that it didn't solve the problem in the end. So I need some input on how to proceed.

The code can be viewed in this branch/commit magnusart@7bc41cd

Improve test

Hey, how about using ccm and scassandra for test?

  • ccm would make cross-version or under cluster test enable.
  • scassandra especially aimed edge case test, e.g. config the delay time of query response.

If needed, I could create PR for preview

CassandraIntegrationSpec uses deprecated PersistentView

akka-persistence-cassandra/src/test/scala/akka/persistence/cassandra/journal/CassandraIntegrationSpec.scala:108: trait PersistentView in package persistence is deprecated: use Persistence Query instead
[warn]   class ViewA(val viewId: String, val persistenceId: String, probe: ActorRef) extends PersistentView {
[warn]                                                                                       ^
[warn] one warning found

Repo not searchable

Unfortunately this repo is a fork which means that it is not searchable by default until it gets more stars than the parent fork (afaik).
Maybe the parent fork can be deleted (but I do not know whether it solves the problem) or this repo can be deleted and re-created without the fork link?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.