Giter Site home page Giter Site logo

okumin / akka-persistence-sql-async Goto Github PK

View Code? Open in Web Editor NEW
115.0 11.0 26.0 114 KB

A journal and snapshot store plugin for akka-persistence using RDBMS.

License: Apache License 2.0

Scala 99.92% Shell 0.08%
akka-persistence scala mysql postgresql akka

akka-persistence-sql-async's People

Contributors

alexguzun avatar lbialy avatar okumin avatar rawyler avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-persistence-sql-async's Issues

PostgreSQL message/record size

In "com.github.mauricio" %% "mysql-async" % "0.2.16" dependency there is a limit for a record size set to 16MB. The internal config parameter name is maximumMessageSize.
If the size is reached the snapshots cannot be read. An exception is thrown:
com.github.mauricio.async.db.postgresql.exceptions.MessageTooLongException: Message of type 68 has size 16948250, higher than the limit 16777216
Unfortunately there is no way to change the limit without the code modification (which I temporarily did)

It would be useful to be able to set that limit up in application.conf (additional configuration key)

Best regards
areczeq

Support for multiple Database configurations

Is there any way to achieve the following?

Use akka-persistence-sql-async for two kinds of persistent actor
One type is in mysql database A and the other is in mysql database B
They require different configuration details.

Publish for Scala 2.12

Hi,

I am trying to use this library as a backend for my akka app, which is based on Scala 2.12. Would it be possible to publish this repository for Scala 2.12?

Best,
Hyun Min

SSL Connection

Hi... Is there a property that allows us to connect to a database in another host that requires ssl? We're trying to connecto to a heroku postgres data base and we're having that issue.

Thanks

Metadata and Journal not being written atomically

I've encountered an issue where entries are being written to the metadata table but no corresponding entry is made in the journal table. I'm not sure why the journal write is failing specifically but this seems to happen under (not particularly high) load when I start pushing our data into the system. Individual writes seem to complete successfully.

My very initial look at the code suggests that nothing ever calls begin/commit/rollback on TxAsyncDBSession to cause a transaction to actually be created but I'm not familiar enough with the libraries in question to determine if this is the case.

This is occurring with Akka 2.4.2 using Sharding against a PostgreSQL instance running locally in a Docker container.

The errors I'm getting look like:

Persistence failure when replaying events for persistenceId [Item-1234]. Last known sequence number [0]
akka.pattern.CircuitBreaker$$anon$1: Circuit Breaker Timed out.

The write failures are also circuit breaker timeouts, the cause of which I haven't yet determined. Regardless it seems that on failure the database is left in an inconsistent state.

Don't swallow journal serialization errors.

If serializers fail for some reason, the error is simply ignored. This is whack behaviour.

Here is the culprit:

This collect filters out failures.

This also seems to result in the $batch expression on line 53 evaluating to the empty string when result contains only failures, producing invalid SQL syntax.

It would be super nice to get some proper error handling in here, or at least logging the cause exception to stderr.

How to close the debug info?

Debug sql info like this, how to close it?

10:54:10.023 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
[WARN] [01/08/2016 10:54:10.024] [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] [akka.serialization.Serialization(akka://system)] Using the default Java serializer for class [actor.PersistenceBasicObj$ExampleState] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
10:54:10.031 [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
10:54:10.038 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_journal (persistence_key, sequence_nr, message) VALUES (?, ?, ?)' with (2,11,[B@6d8b440c)
10:54:10.043 [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_snapshot (persistence_key, sequence_nr, created_at, snapshot) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE created_at = ?, snapshot = ?' with (2,10,1452221650023,[B@beaa9fc,1452221650023,[B@beaa9fc)
10:54:10.046 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
10:54:10.055 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_journal (persistence_key, sequence_nr, message) VALUES (?, ?, ?)' with (2,12,[B@7fb8247f)

An exception causes bad sql to be generated

If you throw an exception in a persistentActor there seems to be a chance that asyncWriteMessages will be handed a empty sequence of atomicwrites, in which case the sql generated is invalid. The leveldb doesn't batch like this journal impl, so it doesn't suffer from the same issue.

Should be a simple fix.

Taking a snapshot and deleting old messages causes highest sequence nr to be reset

(Observed on Postgres)

When messages are written to the journal, they get monotically increasing sequence numbers, dictated by akka-persistence following a call to asyncReadHighestSequenceNr.

If a snapshot is taken, and then a delete message command is sent in order to delete all messages up to that snapshot, then the snapshot will have sequence_nr of the latest event which is incorporated into the snapshot, but the journal will be empty.

Now when akka-persistence asks for asyncReadHighestSequenceNr it will receive an answer of 0, and start writing events again at 1. But the snapshot is ahead of this in terms of sequence_nr so when a recovery occurs, some events will not be replayed even though they are newer than the snapshot.

It seems that asyncReadHighestSequenceNr should have a memory of the sequence numbers that have been used, even when events have been deleted from the journal, in order for the expected behaviour of snapshotting and deleting old messages to work.

See for example the behaviour of the Cassandra persistence plugin:

https://github.com/krasserm/akka-persistence-cassandra/blob/master/src/main/scala/akka/persistence/cassandra/journal/CassandraRecovery.scala#L28

Update homepage table instructions

Hi.

The table creation scripts on the doc specify the use of BLOB for snapshots and journal.

Mysql's default max size for BLOB is 65k. Mysql default is also to not enable strict mode. This mean that writing more than that as a blob truncates it "silently" (an alert should be raised, but nothing was in my logs, could be my configuration, unsure), failing on recovery.

Maybe add a caveat about using MEDIUMBLOB (50Mb max) upon planning to store a lot in a snapshot.

Help for migration

Hi,

I'm just wondering if there is any guide to migrate from 0.2.x to 0.3.1.
To be more specific, my concern is the sequence_nr column in the metadata table. Is it the biggest sequence number from the journal table?

SSL mode is not being parsed

Hi,
I'm struggling to set up a connection to a Postgresql database that requires ssl.
I've configured the JDBC URL as: jdbc:postgresql://localhost:5432/datahub?sslmode=require
However, the app fails with the following error:

[info] com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException: ErrorMessage(fields=Map(Line -> 481, File -> auth.c, SQLSTATE -> 28000, Routine -> ClientAuthentication, Message -> SSL connection is required. Please specify SSL options and retry., Severity -> FATAL)) [info] at com.github.mauricio.async.db.postgresql.PostgreSQLConnection.onError(PostgreSQLConnection.scala:175) [info] at com.github.mauricio.async.db.postgresql.codec.PostgreSQLConnectionHandler.channelRead0(PostgreSQLConnectionHandler.scala:206)

This is strange, because the implementation of ParseURL seems correct. I've created a small test class with this line of code:

val configuration = com.github.mauricio.async.db.postgresql.util.URLParser.parse("postgresql://localhost:5432/datahub?sslmode=require")
^ this will parse the ssl mode correctly.

I've run a debugger session in IntelliJ and put a breakpoint on com.github.mauricio.async.db.postgresql.util.URLParser in the assembleConfiguration.
But it seems this method is never invoked.

I put another breakpoint in PostgreSQLConnectionHandler, method initChannel (line 89) where I can view the parsed configuration. It turns out that database = Some(datahub?sslmode=require) and SSLConfiguration = (disable,None).

Can it be, that some other Parse method is being used? Can this be configured somewhere?

I spend quite some time debugging this, but I am unsure where to look further.

Versions:
val akkaVersion = "2.5.3"
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.okumin" %% "akka-persistence-sql-async" % "0.4.0",
"com.github.mauricio" %% "postgresql-async" % "0.2.21",

New release

Hey @okumin, I've been trying to use this extension along with quill and since I'm forced to use Mauricio's postgresql-async @ 0.2.20 (due to quill dependency) and akka-persistence-sql-async uses scalikejdbc-async @ 0.5.5 which depends on postgresql-async @ 0.2.16 I get:

java.lang.NoSuchMethodError: com.github.mauricio.async.db.Configuration$.$lessinit$greater$default$6()Ljava/nio/charset/Charset;
        at scalikejdbc.async.AsyncConnectionPool.<init>(AsyncConnectionPool.scala:33)
        at scalikejdbc.async.internal.postgresql.PostgreSQLConnectionPoolImpl.<init>(PostgreSQLConnectionPoolImpl.scala:37)
        at scalikejdbc.async.AsyncConnectionPoolFactory$.apply(AsyncConnectionPoolFactory.scala:43)
        at scalikejdbc.async.AsyncConnectionPool$.add(AsyncConnectionPool.scala:88)
        at akka.persistence.common.ScalikeJDBCExtension.<init>(ScalikeJDBCExtension.scala:20)
        at akka.persistence.common.ScalikeJDBCExtension$.createExtension(ScalikeJDBCExtension.scala:8)
        at akka.persistence.common.ScalikeJDBCExtension$.createExtension(ScalikeJDBCExtension.scala:6)
        at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:737)
        at akka.actor.ExtensionId$class.apply(Extension.scala:79)
        at akka.persistence.common.ScalikeJDBCExtension$.apply(ScalikeJDBCExtension.scala:6)

Is it possible for you to try and update this library to use newer scalikejdbc-async from 0.6.x branch which does support latest postgresql-async? I'll try to work on update on my own and send a PR, but I get some weird errors during project import in Idea and I'll have to set up databases for tests.

Serialize as Json

Is it possible to serialize the payload as Json using this persistence plugin? I think the plugin would need to honor any configured serialization plugins? Thanks!

Incompatible with Akka 2.4-M2 persistence API

Looks like some breaking changes were introduced in the 2.4-M2 persistence API.

I have an updated branch that is compatible with 2.4-M2, but it will break binary compatibility with all the versions before that. How would you want to handle it? Publish a separate artifact?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.