okumin / akka-persistence-sql-async Goto Github PK
View Code? Open in Web Editor NEWA journal and snapshot store plugin for akka-persistence using RDBMS.
License: Apache License 2.0
A journal and snapshot store plugin for akka-persistence using RDBMS.
License: Apache License 2.0
In "com.github.mauricio" %% "mysql-async" % "0.2.16" dependency there is a limit for a record size set to 16MB. The internal config parameter name is maximumMessageSize.
If the size is reached the snapshots cannot be read. An exception is thrown:
com.github.mauricio.async.db.postgresql.exceptions.MessageTooLongException: Message of type 68 has size 16948250, higher than the limit 16777216
Unfortunately there is no way to change the limit without the code modification (which I temporarily did)
It would be useful to be able to set that limit up in application.conf (additional configuration key)
Best regards
areczeq
Is there any way to achieve the following?
Use akka-persistence-sql-async for two kinds of persistent actor
One type is in mysql database A and the other is in mysql database B
They require different configuration details.
Hi,
I am trying to use this library as a backend for my akka app, which is based on Scala 2.12. Would it be possible to publish this repository for Scala 2.12?
Best,
Hyun Min
Hi... Is there a property that allows us to connect to a database in another host that requires ssl? We're trying to connecto to a heroku postgres data base and we're having that issue.
Thanks
I've encountered an issue where entries are being written to the metadata table but no corresponding entry is made in the journal table. I'm not sure why the journal write is failing specifically but this seems to happen under (not particularly high) load when I start pushing our data into the system. Individual writes seem to complete successfully.
My very initial look at the code suggests that nothing ever calls begin/commit/rollback on TxAsyncDBSession to cause a transaction to actually be created but I'm not familiar enough with the libraries in question to determine if this is the case.
This is occurring with Akka 2.4.2 using Sharding against a PostgreSQL instance running locally in a Docker container.
The errors I'm getting look like:
Persistence failure when replaying events for persistenceId [Item-1234]. Last known sequence number [0]
akka.pattern.CircuitBreaker$$anon$1: Circuit Breaker Timed out.
The write failures are also circuit breaker timeouts, the cause of which I haven't yet determined. Regardless it seems that on failure the database is left in an inconsistent state.
If serializers fail for some reason, the error is simply ignored. This is whack behaviour.
Here is the culprit:
This collect
filters out failures.
This also seems to result in the $batch
expression on line 53 evaluating to the empty string when result
contains only failures, producing invalid SQL syntax.
It would be super nice to get some proper error handling in here, or at least logging the cause exception to stderr.
Debug sql info like this, how to close it?
10:54:10.023 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
[WARN] [01/08/2016 10:54:10.024] [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] [akka.serialization.Serialization(akka://system)] Using the default Java serializer for class [actor.PersistenceBasicObj$ExampleState] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
10:54:10.031 [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
10:54:10.038 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_journal (persistence_key, sequence_nr, message) VALUES (?, ?, ?)' with (2,11,[B@6d8b440c)
10:54:10.043 [system-akka.persistence.dispatchers.default-plugin-dispatcher-6] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_snapshot (persistence_key, sequence_nr, created_at, snapshot) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE created_at = ?, snapshot = ?' with (2,10,1452221650023,[B@beaa9fc,1452221650023,[B@beaa9fc)
10:54:10.046 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG s.async.AsyncConnectionPool$ - Borrowed a new connection from scalikejdbc.async.internal.mysql.MySQLConnectionPoolImpl@1ed13381
10:54:10.055 [system-akka.persistence.dispatchers.default-plugin-dispatcher-5] DEBUG scalikejdbc.async.TxAsyncDBSession - [SQL Execution] 'INSERT INTO test_journal (persistence_key, sequence_nr, message) VALUES (?, ?, ?)' with (2,12,[B@7fb8247f)
You changed the pass
configuration to password
here 5c9d964#diff-a4441f1c142401995aaa7a4923038bab and that looks like a good change. But the currently published maven version contains the pass
version. Perhaps you could release a 0.1.1 version with this change?
If you throw an exception in a persistentActor there seems to be a chance that asyncWriteMessages
will be handed a empty sequence of atomicwrites, in which case the sql generated is invalid. The leveldb doesn't batch like this journal impl, so it doesn't suffer from the same issue.
Should be a simple fix.
password
is more common and it's strange to see pass
key beside password
of other's configuration blocks.
(Observed on Postgres)
When messages are written to the journal, they get monotically increasing sequence numbers, dictated by akka-persistence following a call to asyncReadHighestSequenceNr
.
If a snapshot is taken, and then a delete message command is sent in order to delete all messages up to that snapshot, then the snapshot will have sequence_nr
of the latest event which is incorporated into the snapshot, but the journal will be empty.
Now when akka-persistence asks for asyncReadHighestSequenceNr
it will receive an answer of 0, and start writing events again at 1. But the snapshot is ahead of this in terms of sequence_nr
so when a recovery occurs, some events will not be replayed even though they are newer than the snapshot.
It seems that asyncReadHighestSequenceNr
should have a memory of the sequence numbers that have been used, even when events have been deleted from the journal, in order for the expected behaviour of snapshotting and deleting old messages to work.
See for example the behaviour of the Cassandra persistence plugin:
Hi.
The table creation scripts on the doc specify the use of BLOB for snapshots and journal.
Mysql's default max size for BLOB is 65k. Mysql default is also to not enable strict mode. This mean that writing more than that as a blob truncates it "silently" (an alert should be raised, but nothing was in my logs, could be my configuration, unsure), failing on recovery.
Maybe add a caveat about using MEDIUMBLOB (50Mb max) upon planning to store a lot in a snapshot.
Hi,
I'm just wondering if there is any guide to migrate from 0.2.x to 0.3.1.
To be more specific, my concern is the sequence_nr
column in the metadata table. Is it the biggest sequence number from the journal table?
Hi,
I'm struggling to set up a connection to a Postgresql database that requires ssl.
I've configured the JDBC URL as: jdbc:postgresql://localhost:5432/datahub?sslmode=require
However, the app fails with the following error:
[info] com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException: ErrorMessage(fields=Map(Line -> 481, File -> auth.c, SQLSTATE -> 28000, Routine -> ClientAuthentication, Message -> SSL connection is required. Please specify SSL options and retry., Severity -> FATAL)) [info] at com.github.mauricio.async.db.postgresql.PostgreSQLConnection.onError(PostgreSQLConnection.scala:175) [info] at com.github.mauricio.async.db.postgresql.codec.PostgreSQLConnectionHandler.channelRead0(PostgreSQLConnectionHandler.scala:206)
This is strange, because the implementation of ParseURL seems correct. I've created a small test class with this line of code:
val configuration = com.github.mauricio.async.db.postgresql.util.URLParser.parse("postgresql://localhost:5432/datahub?sslmode=require")
^ this will parse the ssl mode correctly.
I've run a debugger session in IntelliJ and put a breakpoint on com.github.mauricio.async.db.postgresql.util.URLParser in the assembleConfiguration.
But it seems this method is never invoked.
I put another breakpoint in PostgreSQLConnectionHandler, method initChannel (line 89) where I can view the parsed configuration. It turns out that database = Some(datahub?sslmode=require) and SSLConfiguration = (disable,None).
Can it be, that some other Parse method is being used? Can this be configured somewhere?
I spend quite some time debugging this, but I am unsure where to look further.
Versions:
val akkaVersion = "2.5.3"
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.okumin" %% "akka-persistence-sql-async" % "0.4.0",
"com.github.mauricio" %% "postgresql-async" % "0.2.21",
Hey @okumin, I've been trying to use this extension along with quill and since I'm forced to use Mauricio's postgresql-async
@ 0.2.20 (due to quill dependency) and akka-persistence-sql-async
uses scalikejdbc-async
@ 0.5.5 which depends on postgresql-async
@ 0.2.16 I get:
java.lang.NoSuchMethodError: com.github.mauricio.async.db.Configuration$.$lessinit$greater$default$6()Ljava/nio/charset/Charset;
at scalikejdbc.async.AsyncConnectionPool.<init>(AsyncConnectionPool.scala:33)
at scalikejdbc.async.internal.postgresql.PostgreSQLConnectionPoolImpl.<init>(PostgreSQLConnectionPoolImpl.scala:37)
at scalikejdbc.async.AsyncConnectionPoolFactory$.apply(AsyncConnectionPoolFactory.scala:43)
at scalikejdbc.async.AsyncConnectionPool$.add(AsyncConnectionPool.scala:88)
at akka.persistence.common.ScalikeJDBCExtension.<init>(ScalikeJDBCExtension.scala:20)
at akka.persistence.common.ScalikeJDBCExtension$.createExtension(ScalikeJDBCExtension.scala:8)
at akka.persistence.common.ScalikeJDBCExtension$.createExtension(ScalikeJDBCExtension.scala:6)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:737)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.persistence.common.ScalikeJDBCExtension$.apply(ScalikeJDBCExtension.scala:6)
Is it possible for you to try and update this library to use newer scalikejdbc-async
from 0.6.x branch which does support latest postgresql-async
? I'll try to work on update on my own and send a PR, but I get some weird errors during project import in Idea and I'll have to set up databases for tests.
Is it possible to serialize the payload as Json using this persistence plugin? I think the plugin would need to honor any configured serialization plugins? Thanks!
Looks like some breaking changes were introduced in the 2.4-M2 persistence API.
I have an updated branch that is compatible with 2.4-M2, but it will break binary compatibility with all the versions before that. How would you want to handle it? Publish a separate artifact?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.