Giter Site home page Giter Site logo

akkadotnet / akka.persistence.redis Goto Github PK

View Code? Open in Web Editor NEW
29.0 29.0 19.0 1.53 MB

Redis storage for Akka.NET Persistence

License: Apache License 2.0

Batchfile 0.04% F# 5.32% C# 86.99% PowerShell 6.25% Shell 1.40%
akka akka-persistence akkadotnet redis

akka.persistence.redis's People

Contributors

aaronontheweb avatar alexvaluyskiy avatar arkatufus avatar danthar avatar dependabot-preview[bot] avatar dependabot[bot] avatar eaba avatar horusiath avatar igorfedchenko avatar mhbuck avatar nafeezabrar avatar reckface avatar sean-gilliam avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka.persistence.redis's Issues

Default database 0 overrides StackExchange.Redis defaultDatabase in configuration string

Version Information
Version of Akka.NET? 1.4.31
Which Akka.NET Modules? Akka.Persistence.Redis

Describe the bug
Akka.Persistence.Redis describes using the configuration-string using the StackExchange.Redis configuration string format. One option of the configuration of StackExchange.Redis is defaultDatabase, which selects the database the library should use by default.

This configuration setting is overridden by the Akka.Persistence.Redis setting database, which is undocumented (at least in the README) and defaults to 0.

As such, attempting to set the database using the configuration string does not work, always results in using database 0 and is not immediately clear why.

I suggest that this setting database be documented. Additionally, if database is unspecified, I think it shouldn't override the configuration string-specified defaultDatabase (however I can see this might inadvertently break existing deployments if they have used a non-zero defaultDatabase but are, in fact, using database 0).

Support the use of password protected servers

Hi,

Given that we are offering Tls support in akka.remote in 1.5, would it not be a good idea to support Tls connections to Secure Redis servers, requiring a password?

I've looked at this a bit and it appears that the underlying library that we are using for access to Redis doesn't support connections requiring a password. See this issue for more details.

I'm proposing we change to the stack exchange library, as this also supports protocol buffers, as other persistence plugins do, so we would get uniformity across the board.

I'm happy to do the work, if this is something that would be wanted.
Cheers
Sean.

redis zrevrangebyscore

akka.persistence.redis

"zrevrangebyscore" command is now deprecated. it can make some issues in production with massive requests

please check this link:
redis/redis#6680

It can be replaced by ZRANGE with the REV and BYSCORE arguments when migrating or writing new code.

Akka.Persistence.Redis v1.4.16 - the "we removed Akka.Persistence.Query support" thread

In the course of investigating #109, we came to some conclusions about the use of Akka.Persistence.Query in Akka.Persitsence.Redis:

  1. It's not all that straightforward to implement in clustered Redis scenarios (including Twemproxy) - as individual entities need to be persisted into a single Redis node inside the cluster in order for routing to function correctly, and tags will need to be stored into a single node as well and there's a very good chance these two nodes will not be the same. Therefore, we can't guarantee that the IPersistentRepresentation and the Tagged referencing it can be written in the same atomic transaction. That's a bit of a problem.
  2. It's non-performant - the Akka.Persistence.Redis tests that leverage Akka.Persistence.Query took half an hour to run on a single node. Once we removed Akka.Persistence.Query functionality from the RedisJournal and removed the tests that covered it, the entire single node test suite ran in under 60 seconds. You can see some benchmark values here: #114 (comment)
  3. Akka.Persistence.Query isn't a great fit for Redis - if you want flexible read / write queries, use a RDBMS or MongoDb. If you want blazing fast reads / writes for rapid-fire event sourcing, use this plugin.

We're opening this issue so we can get data from users this change adversely affected - if you were previously using Akka.Persistence.Query with Akka.Persistence.Redis and need that functionality back, please let us know and tell us a bit about your use case here!

Fix ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete spec

Akka.Persistence.TCK should rewrite the spec to

dJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_opening.cs
[Fact]
public virtual void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_opening()
{
    var queries = Sys.ReadJournalFor<RocksDbReadJournal>(RocksDbReadJournal.Identifier);

    var expectedList = new List<string> { "a", "b", "c" };
    var actualList = new List<string>();

    Setup("a", 1);
    Setup("b", 1);
    Setup("c", 1);

    var greenSrc = queries.CurrentPersistenceIds();
    var probe = greenSrc.RunWith(this.SinkProbe<string>(), _materializer);
    probe.Request(2);
    actualList.AddRange(probe.ExpectNextN(2));
    probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

    Setup("d", 1);

    probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
    probe.Request(5);
    actualList.AddRange(probe.ExpectNextN(1));
    probe.ExpectComplete();

    // events could be unordered
    Assert.True(!actualList.Except(expectedList).Any()); // should not have "d"
}

Persistence.Query support

I really want to see this feature in the redis plugin. But we should wait for Persistence.Query stabilization in Sql plugins

Fix RedisCurrentEventsByPersistenceIdSpec specs

  • CurrentEventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup
  • CurrentEventsByPersistenceId_should_return_empty_stream_for_journal_from_SequenceNr_greater_tha...
  • CurrentEventsByPersistenceId_should_not_see_new_events_after_completion

Assess performance and integrity of Redis data storage + serialization

Need to assess the following before we begin making any changes:

  1. Is Akka.Persistence.Redis making the best use of its underlying data structures? I.E. how many I/O writes per logical write are we using right now?
  2. Is Akka.Persistence.Redis serializing its data in accordance with Akka.NET best practices?
  3. Is Akka.Persistence.Redis creating identifiers in Redis that will play nicely with Redis Cluster, per #93

Akka.Persistence.Query: add support for CurrentPersistenceIds

Is your feature request related to a problem? Please describe.
While adding some integration tests for our Akka.Cluster.Sharding persistence cleanup tool, I ran into a small problem on this PR: petabridge/Akka.Cluster.Sharding.RepairTool#7 - Akka.Persistence.Redis' IReadJournal does not support ICurrentPersistenceIds().

I know we removed it as part of #126 because we thought it'd be too expensive to implement in clustered redis scenarios, but I wonder if that's true.

Describe the solution you'd like
The "all events" and "events by tag" queries are out of the question - they're expensive because they can't effectively be cleaned up in a clustered environment without some kind of inter-node index and we're not database architects so we're not going to build one. Users can use a relational database for that.

But Akka.Persistence requires us to keep track of all used PersistentIds even after the entities have had all of their data purged - we're required to keep those records in perpetuity and therefore cleanup is not an issue.

So, I suppose we can try implementing the ICurrentPersistentIds and IPersistentIds queries in one of two ways:

  1. Scatter-gather to all of the nodes in the cluster each time we need to run one of those queries and just use the built-in key indicies we have now. That's O(n) where n = entity count.
  2. Create a special table that only really needs to get replicated to single entity and have it contain a hashset that has all of the entity ids and we can use a Redis channel to receive notifications when that entity is updated. I don't really like this idea because it's single point of failure.

Describe alternatives you've considered
We could also go on not doing this, but since we're taking a dependency on ICurrentPersistentIds for helping Cluster.Sharding users cleanup I'd like to do our best to support it.

Persistence failure when replaying events

We have an Akka.Cluster deployment where one of our Nodes is hosted as a Topshelf service. The Node has single instance and uses Akka.Persistence.Redis for Actor state recovery in case of Service restart/upgrades. Deployment script successfully performs these steps: stop->uninstall->reinstall new version->start the service. We can see that Journals are present with correct state. However, after the service is started, we see this error in NLog causing state loss.

Persistence failure when replaying events for persistenceId [workerSupervisorActor]. Last known sequence number [0]

In Redis, we can see "PersistenceId": "workerSupervisorActor", "SequenceNr": 1
However in error description it says Last known sequence number [0], which means call to RedisJournal.ReadHighestSequenceNrAsync() is failing.

Connection String to Azure Redis Cache looks like this
xxx.redis.cache.windows.net:6380,password=xxx=,ssl=True,abortConnect=False,connectTimeout=10000,connectRetry=3

We're using these Nuget Packages:
Akka.Persistence: 1.1.1.28-beta
Akka.Persistence.Redis: 0.1.0-beta
StackExchange.Redis: 1.1.603

Unable to Use Multiple Named Redis journal/snapshot-store Configurations

The obvious example of this would be if you want to have your sharding persistence in a different key-space (or database) to the rest of your Journal/Snapshot persistence.

On reading the following from this blog about getting sharding working I started trying to separate my sharding persistence from my main business logic.

Akka.Cluster.Sharding will use the default event journal and snapshot plugin if not told to do otherwise. This means, that you may end with your business and sharding event mixed up with each other inside the same collections/tables. While it’s not mandatory, it’s good to configure cluster sharding to operate on a separate collections.

When running with Sql Server for my persistence, the example configuration from the blog I mentioned work fine. So I tried a similar pattern with this Redis but it seemed to be just using the default config (specifically using the same key-space).

Below is an example of the configuration I tried.

cluster.sharding {
    journal-plugin-id = "akka.persistence.journal.sharding"
    snapshot-plugin-id = "akka.persistence.snapshot-store.sharding"
}
persistence {
    journal {
        plugin = "akka.persistence.journal.redis"
        redis {
            class = "Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"
            configuration-string = "example"
            plugin-dispatcher = "akka.actor.default-dispatcher"
            key-prefix = "akka:persistence:journal"
        }
        sharding {
            class = "Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"
            configuration-string = "example"
            plugin-dispatcher = "akka.actor.default-dispatcher"
            key-prefix = "akka:persistence:sharding:journal"
        }
    }
    snapshot-store {
        plugin = "akka.persistence.snapshot-store.redis"
        redis {
            class = "Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"
            configuration-string = "example"
            plugin-dispatcher = "akka.actor.default-dispatcher"
            key-prefix = "akka:persistence:snapshot"
        }                    
        sharding {
            class = "Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis"
            configuration-string = "example"
            plugin-dispatcher = "akka.actor.default-dispatcher"
            key-prefix = "akka:persistence:sharding:snapshot"
        }
    }
}

What I was hoping I'd be able to do is have my systems journal and snapshot data to be stored using key prefixes of akka.persistence.journal and akka.persistence.snapshot respectively and my sharding data to be using akka.persistence.sharding.journal and akka.persistence.sharding.snapshot
Instead both the data from my system and all the sharding data is stored using these two key-prefixes - akka.persistence.journal and akka.persistence.snapshot

Can we get this updated so that this is configurable in the same way as the Akka.Persistence.SqlServer package?

CircuitBreaker configuration

From time to time we can observe errors while having problems to connect the Redis:
Recovery failed due to: Akka.Pattern.OpenCircuitException: Circuit Breaker is open; calls are failing fast at Akka.Pattern.Open.<Invoke>d__21.MoveNext() on message ().`

I have found that there are settings that used if nothing defined in the plugin config

 journal-plugin-fallback {
...
 circuit-breaker {
  max-failures = 10
  call-timeout = 10s
  reset-timeout = 30s
}

Is it possible to configure a circuit-breaker in

journal {
 plugin = "akka.persistence.journal.redis"
 redis {
  class = "Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"
  key-prefix = "akka:shmaka:persistence:"
  configuration-string = "localhost"
}

section?

Github links in documentation are outdated

Looks like StackExchange/StackExchange.Redis moved from Master to Main for their mainline branch, so older links just 404

I've got a local branch, with an update - its minor, but at least its correct

highestSequenceNr will leak for actors with finite lifetime

I'll describe the case.
I have an actor (PersistentFSM) that represents an order. While an order is active - everything is ok, but as soon as order closed (in either way), the actor is removed from the system and will never come back (archived order is quite another story).

Everything I can do is to call DeleteMessages(long.MaxValue) and DeleteSnapshots(SnapshotSelectionCriteria.Latest) to remove obsolete data from storage in actor removal procedure. But highestSequenceNr will still be left in storage to fulfill the JournalSpec.Journal_should_not_reset_HighestSequenceNr_after_journal_cleanup test.

Example HOCON settings are missing the plugin key

Being a noob I ran into issues while trying to get started with Akka.Persistence using Redis. It turns out I had to add plugin = "akka.persistence.journal.redis" to the journal settings and plugin = "akka.persistence.snapshot-store.redis" to the snapshot-store settings.

Recovery failure on deserializing snapshot

Version Information
Version of Akka.NET? 1.5.13
Which Akka.NET Modules? Akka.Persistence.Redis, Akka.Serialization.Hyperion

Describe the bug
Rarely, an actor couldn’t get started due to snapshot recovery failure, and this exception occurs:

---> System.Runtime.Serialization.SerializationException: Failed to deserialize object of type [oms.models.ActorSnapshots.UserActorState] from the stream. Cause: Failed to deserialize object of type [System.Collections.Generic.List`1[domain.Entities.OrderBaseEntity]] from the stream. Cause: Failed to deserialize object of type [domain.Entities.OrderSellEntity] from the stream. Cause: Unable to cast object of type 'domain.Entities.TradeItem' to type 'domain.Enums.OrderValidityEnum'.

To Reproduce
it's not reproducible.

Expected behavior
To Deserialize Snapshot state successfully. This is the State of the actor which is persisted :

public class UserActorState
{
    public List<OrderBaseEntity> Orders { get; set; } = new();
}

public abstract class OrderBaseEntity : BaseEntity
{
    public sealed override string Id { get; set; }
    public string ParentId { get; set; }
    public string ReferenceId { get; set; } = string.Empty;// algo parentId 
    public long Hon { get; set; }
    public string SymbolIsin { get; set; } 
    public string SymbolName { get; set; }
    public OrderValidityEnum Validity { get; set; }  
    public DateTime? ValidityDate { get; set; } 
    public decimal Price { get; set; } 
    public long Quantity { get; set; } 
    public virtual OrderSide Side { get; protected set; } 
    public string CustomerIsin { get; set; } 
    public OrderState OrderState { get; set; }
    public OrderFrom OrderFrom { get; set; }
    public DateTime? ModifiedAt { get; set; }
    public DateTime? ParentCreationDate { get; set; }
    public long ParentBlock { get; set; }
    public bool IsModifyToUpperCost { get; set; } = false;
    public long ParentRemainedQuantity { get; set; }
    public int OrderErrorCode { get; set; }
    public string OrderErrorText { get; set; }
    public OrderActionEnum OrderAction { get; set; } = OrderActionEnum.Add;
    public int? HidePrice { get; set; }
    public byte? SettlementDelay { get; set; }
    public int CSize { get; protected set; } = 1;//option
    public List<TradeItem> Trades { get; set; } = new();
}

public abstract class BaseEntity
{
    protected BaseEntity()
    {
    }

    protected BaseEntity(string identifier)
    {
        Id = identifier;
    }

    public virtual DateTime CreateDateTime { get; set; } = DateTime.Now;

    [Key, MaxLength(36)]
    public virtual string Id { get; set; } = Ulid.NewUlid().ToString();


    private List<IDomainEvent> _domainEvents;

    [JsonIgnore]
    public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents?.AsReadOnly();
}

public class TradeItem
{
    public TradeItem(DateTime dateTime,
        int tradeNumber,
        string orderId,
        string symbolIsin,
        OrderSide side,
        int? hidePrice,
        long quantity,
        int price,
        bool isCancel)
    {
        DateTime   =dateTime;
        TradeNumber=tradeNumber;
        OrderId    =orderId;
        SymbolIsin =symbolIsin;
        Side       =side;
        HidePrice  =hidePrice;
        Quantity   =quantity;
        Price      =price;
        IsCancel   =isCancel;
    }
    public DateTime DateTime { get; set; }
    public int TradeNumber { get; set; }
    public string OrderId { get; set; }
    public string SymbolIsin { get; set; }
    public OrderSide Side { get; set; }
    public int? HidePrice { get; set; }
    public long Quantity { get; set; }
    public int Price { get; set; }
    public bool IsCancel { get; set; } = false;
}

public class OrderBuyEntity : OrderBaseEntity
{
   public override OrderSide Side { get; protected set; } = OrderSide.Buy;
}

public class OrderSellEntity : OrderBaseEntity
{
   public override OrderSide Side { get; protected set; } = OrderSide.Sell;
}

public enum OrderValidityEnum
{
    DAY = 0,
    GOOD_TILL_DATE = 1
}

Actual behavior
Sometimes Failed to recover snapshot state

Environment
docker, .net8

Additional context
I've configured Hyperion serializer like this:

akka { actor { provider = cluster serializers { hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" } serialization-bindings { "System.Object" = hyperion } }

Redis cluster mode support

Does this plugin support Redis in cluster mode?
I have tried to set up a Redis cluster with 3 masters and 1 slave for each master (6 nodes 1 replica).
With this cluster configuration, I am getting an error:
Persisting failed due to: StackExchange.Redis.RedisCommandException: Multi-key operations must involve a single slot; keys can use 'hash tags' to help this, i.e. '{/users/12345}/account' and '{/users/12345}/contacts' will always be in the same slot at StackExchange.Redis.ServerSelectionStrategy.Select(Message message)

Library versions are:
Akka.Persistence.Redis v1.0.0
StackExchange.Redis v1.2.6

Sentinel support

Hey,

do you support Sentinel for redis?

Something like:

redis {
  sentinel = true
  sentinel-master = "mymaster"  //master name
  sentinels = [{host :"localhost", port: 26379}] // list of sentinel addresses
}

Move towards Redis streams

There's a new Redis data structure comming: Streams. I think, they are ideal fit for event journals.

Essentially streams come with 2 new commands: XADD and XREAD.

XADD <stream-id> * <key-1> <value-1> <key-2> <value-2> (...etc)

Essentially XADD creates a continuous stream of events given a stream-id (in our case persistence-id). * denotes point after which user may provide a variable number of key-value pairs. A single XADD is essentially an equivalent of a single event in event stream.

XREAD COUNT <max> STREAMS <stream-id> <last-id>

This allows us to read events from the stream-id, given the last known id: 0 is beginning of the stream, but then the numbers don't match the ones generated by event journal. This may be the potential pain point.

Persistence AsyncWriteJournal is not realy async

I've encountered the very strange issue while executing the performance tests in my application.
I am using ClusterSharding and PersistentFSM for main actors, that represent business entities. For the storage, I use Akka.Persistence.Redis plugin (that is expected to be fast). Also, I configured TaskDispather as default one.

Unfortunately, performance was much lower than I expected. Investigation showed that every actor in the system had empty message queue except for /system/akka.persistence.journal.redis with more than hundred messages (I am still working, but as for now, entity actors spawn rather large amount events to persist). Every time I measured it was always WriteMessages in the process.

That is strange, as it seems to me from code that AsyncWriteJournal.HandleWriteMessages should spawn async task and proceed to other messages. So it should be fast.

I used dotTrace to look at it a little bit closer.
2016-08-10 11 29 00

Now I am sure that everything is processed in the strictly single thread, So all my multithreaded application is depended on it as all actors are waiting their queue to persist.

Persistense failed due to "Collection was modified; enumeration operation may not execute"

Hi there, I faced with random exception under highly loaded persistent actor.

Persistense failed due to: System.InvalidOperationException: Collection was modified; enumeration operation may not execute.    
at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)    
at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()    
at Hyperion.SerializerFactories.DefaultDictionarySerializerFactory.<>c__DisplayClass3_0.BuildSerializer>b__1(Stream stream, Object obj, SerializerSession session)    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)    
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)    
at lambda_method(Closure , Stream , Object , SerializerSession )    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, Serializer
Session session)    
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)    
at lambda_method(Closure , Stream , Object , Serializer
Session )    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)    
at Hyperion.Serializer.Serialize(Object obj, Stream stream)    
at Akka.Serialization.HyperionSerializer.ToBinary(Object obj)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.GetPersistentPayload(Object obj)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.GetPersistentMessage(IPersistentRepresentation persistent)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.ToBinary(Object obj)    
at Akka.Persistence.Redis.JournalHelper.PersistentToBytes(IPersistentRepresentation message)    
at Akka.Persistence.Redis.Journal.RedisJournal.Extract(IPersistentRepresentation pr)    
at Akka.Persistence.Redis.Journal.RedisJournal.<WriteBatchAsync>d__12.MoveNext() 

--- End of stack trace from previous location where exception was thrown ---    

at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)    
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)    
at Akka.Persistence.Redis.Journal.RedisJournal.<WriteMessagesAsync>d__11.MoveNext() 

--- End of stack trace from previous location where exception was thrown ---    

at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)    
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)    
at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext() --- End of stack trace from previous location where exception was thrown ---    at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()

Used packages versions are:

  <package id="Akka" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence.Query" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence.Redis" version="1.0.0-beta1" targetFramework="net461" />
  <package id="Akka.Serialization.Hyperion" version="1.3.1-beta52" targetFramework="net461" />
  <package id="Akka.Streams" version="1.3.1" targetFramework="net461" />
  <package id="DotNetty.Buffers" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Codecs" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Common" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Handlers" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Transport" version="0.4.6" targetFramework="net461" />
  <package id="Google.Protobuf" version="3.3.0" targetFramework="net461" />
  <package id="Google.ProtocolBuffers" version="2.4.1.555" targetFramework="net461" />
  <package id="Hyperion" version="0.9.6" targetFramework="net461" />

In my scenario I have routine workflow that persists objects to the journal and every 100 records creates a snapshot and cleans up journal and previous snapshot

        protected void ProcessingJobState()
        {
            ...
            Command<TJob>(job =>
            {
                Persist(job, journaledEvent =>
                {
                     PendingJobs.Enqueue(job);

                      _persistentJournalSize++;
                      if (_persistentJournalSize < PersistentJournalMaxSize)
                      {
                          return;
                      }

                      _logger.Warning("Going to SaveSnapshot");

                      _persistentJournalSize = 0;
                      SaveSnapshot(PendingJobs.ToList());
                });
            });
            ...
            /*
             * Manage snapshots here
             */
            Command<SaveSnapshotSuccess>(success =>
            {
                _logger.Warning(success.ToString());
                
                var metadataSequenceNr = success.Metadata.SequenceNr;
                
                DeleteMessages(metadataSequenceNr);
                DeleteSnapshots(new SnapshotSelectionCriteria(metadataSequenceNr - 1));
            });
            Command<SaveSnapshotFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteMessagesFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteSnapshotsFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteMessagesSuccess>(success => { _logger.Warning(success.ToString()); });
            Command<DeleteSnapshotsSuccess>(success => { _logger.Warning(success.ToString()); });
        }

And regular logs looks like

Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42557, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42657, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42757, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42857, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42957, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 42557>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42556, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteMessagesSuccess<toSequenceNr: 42657>
DeleteMessagesSuccess<toSequenceNr: 42757>
DeleteMessagesSuccess<toSequenceNr: 42857>
DeleteMessagesSuccess<toSequenceNr: 42957>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42656, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42756, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42856, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42956, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 43057, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 43057>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 43056, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 43157, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 43157>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 43156, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Persistense failed due to: ...

The actual moment of the "Persistense failed" may vary from test to test and some times could be after Going to SaveSnapshot message.

The message rate during the tests is about 60/s. Redis runs on local machine.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.