Giter Site home page Giter Site logo

async-raft / async-raft Goto Github PK

View Code? Open in Web Editor NEW
999.0 26.0 82.0 1.98 MB

An implementation of the Raft distributed consensus protocol using the Tokio framework.

Home Page: https://async-raft.github.io/async-raft

License: Apache License 2.0

Rust 100.00%
raft consensus rust async tokio

async-raft's Introduction

async raft

An implementation of the Raft distributed consensus protocol using the Tokio framework. Please ⭐ on github!

Build Status Discord Chat Crates.io docs.rs License Crates.io Crates.io


Blazing fast Rust, a modern consensus protocol, and a reliable async runtime — this project intends to provide a consensus backbone for the next generation of distributed data storage systems (SQL, NoSQL, KV, Streaming, Graph ... or maybe something more exotic).

The guide is the best place to get started, followed by the docs for more in-depth details.

This crate differs from other Raft implementations in that:

  • It is fully reactive and embraces the async ecosystem. It is driven by actual Raft events taking place in the system as opposed to being driven by a tick operation. Batching of messages during replication is still used whenever possible for maximum throughput.
  • Storage and network integration is well defined via two traits RaftStorage & RaftNetwork. This provides applications maximum flexibility in being able to choose their storage and networking mediums. See the storage & network chapters of the guide for more details.
  • All interaction with the Raft node is well defined via a single public Raft type, which is used to spawn the Raft async task, and to interact with that task. The API for this system is clear and concise. See the raft chapter in the guide.
  • Log replication is fully pipelined and batched for optimal performance. Log replication also uses a congestion control mechanism to help keep nodes up-to-date as efficiently as possible.
  • It fully supports dynamic cluster membership changes according to the Raft spec. See the dynamic membership chapter in the guide. With full support for leader stepdown, and non-voter syncing.
  • Details on initial cluster formation, and how to effectively do so from an application's perspective, are discussed in the cluster formation chapter in the guide.
  • Automatic log compaction with snapshots, as well as snapshot streaming from the leader node to follower nodes is fully supported and configurable.
  • The entire code base is instrumented with tracing. This can be used for standard logging, or for distributed tracing, and the verbosity can be statically configured at compile time to completely remove all instrumentation below the configured level.

This implementation strictly adheres to the Raft spec (pdf warning), and all data models use the same nomenclature found in the spec for better understandability. This implementation of Raft has integration tests covering all aspects of a Raft cluster's lifecycle including: cluster formation, dynamic membership changes, snapshotting, writing data to a live cluster and more.

If you are building an application using this Raft implementation, open an issue and let me know! I would love to add your project's name & logo to a users list in this project.

contributing

Check out the CONTRIBUTING.md guide for more details on getting started with contributing to this project.

license

async-raft is licensed under the terms of the MIT License or the Apache License 2.0, at your choosing.


NOTE: the appearance of the "section" symbols § throughout this project are references to specific sections of the Raft spec.

async-raft's People

Contributors

diggsey avatar drmingdrmer avatar iamazy avatar justinlatimer avatar kper avatar linsinn avatar marinpostma avatar monoid avatar psiace avatar sunli829 avatar thedodd avatar xu-cheng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-raft's Issues

Bug with get_log_entries function

The current prototype for the get_log_entries is get_log_entries(&self, start: u64, stop: u64) where start and stop are the bounds of an excluded upper bound interval. In the MemStore implementation, you can notice that it is translated into a range: here. Turns out that when start = stop, MemStore returns an empty vec with this implementation. This becomes problematic because the replication module, here calls it with start = stop, which returns nothing, and triggers a snapshot.

I have encountered this bug when implementing raft for myself, with the same reasoning as the one in MemStore. Now, the fix is straightforward in MemStore, but I'd like to suggest that we change the prototype of get_log_entries to get_log_entries<R: RangeBounds<u64>>(&self, range: T). This would, in my sense, dismiss any ambiguities.

I have branch with both solution implemented locally, I can make a PR with whichever solution is preferred.

Add dedicated integration test for lagging->line-rate recovery under load

Here is a rough idea of what this test should do:

  • It should be quite similar to the other tests which are currently in place.
  • It should isolate a follower node, write a data to the cluster which will cause that node to fall behind into lagging state, but not into snapshotting state.
  • It should then spawn a task which will continue to write data to the cluster.
  • Just after it is spawned, the test should remove the isolated node from isolation should that it can begin the lagging state recovery process.
  • The spawned client should be instructed to stop after a few seconds, and then the test should assert that the lagging node has caught up and has the expected state.

`snapshot_dir` in `Config`?

It looks like the snapshot_dir field of Config might actually be a detail of the Storage implementation - I couldn't find another place it's used in the Raft implementation, it's just passed by the user when constructing their storage. Could this be left up to the storage implementer?

Single-node Raft needs to resume as leader after crash.

When a single-node Raft crashes and has state on disk, it needs to resume as leader. Currently in such conditions after a crash, it will stay in candidate state.

Pretty simple bug. The initialization routine was not taking into account the possibility of single-node cluster.

Nodes never seem to enter NonVoter state

Hello,

I'm seeing an issue with the initialization of a new new Raft cluster using actix-raft. My application follows this procedure:

  1. Start a actix-web server for node-to-node communication
  2. Use Consul to register the node in Consul using an agent local to the instance of my software
  3. Sleep for a while (for dev, since all nodes are local, I use 5 seconds)
  4. Ask Consul for all available nodes that provide my service
  5. Ask each node if a cluster already exists.
  6. If a cluster does exists, do nothing (wait for the cluster to sense the new node and add it)
  7. If a cluster doesn't exist, create one by sending an InitWithConfig message to the Raft actor.

If my understanding of the docs is correct, this implementation represents fairly standard practice for a Raft cluster and an actix-raft cluster in particular. However, when I run a test cluster, I get (from each node) an error explaining that calling InitWithConfig is not allowed because the node is in the Follower state with index 0.

I understand why this is a problem, but I don't understand why a brand new node (totally blank initial state) is going (seemingly) directly into the follower state. I suspect I am doing something wrong, but what is not clear.

I should say I have been able to hack around this issue by reporting an InitialConfig that contains the members of the cluster I am trying to form and by not sending the InitWithConfig message. Presumably this has the effect of tricking actix-raft into believing that there is already a cluster. This "solution" seems to present other problems, though, when it comes to dynamically altering the cluster configuration.

Any insights here would be awesome. As always, thanks in advance!

Finalize snapshot algorithm.

  • The Raft node must guarantee that the CreateSnapshot interface will never be called multiple overlapping times from the same Raft node, and it will not be called when an InstallSnapshot (the Raft node is currently installing a snapshot from the leader) operation is in progress.
  • finish up implementation of InstallSnapshot RPC handler.

Can't stop Raft.

Thank you for the great library, but I found a problem. After I call Raft::shutdown, Raft cannot stop and keep outputting the following message.

async_raft::core error reporting metrics error=channel closed id=1 
async_raft::core error reporting metrics error=channel closed id=1 
async_raft::core error reporting metrics error=channel closed id=2 
async_raft::core error reporting metrics error=channel closed id=0 
async_raft::core error reporting metrics error=channel closed id=2 
async_raft::core error reporting metrics error=channel closed id=0 

How is `Snapshot: AsyncWrite` utilized?

There must be something obvious I'm missing but wondering if you could point me to where/why snapshot has this bound? Is the library making some assumptions somewhere and is writing to the snapshot outside of the RaftStorage interface (which does not use this bound as far as I can tell from the trait implementation)?

Joint consensus not implemented correctly

I believe votes are counted incorrectly during joint consensus.

Firstly, during this phase, votes should be counted separately for the two clusters. Quote from the spec:

Agreement (for elections and entry commitment) requires separate majorities from both the old and new configurations.

Currently the code only tracks a single vote counter in the candidate state.

Secondly, this is not so clearly explained, but I believe a candidate which will be removed after a membership change, which has received the log entry containing the membership change, should only count the vote for itself in the "old" vote counter, but not the "new" vote counter.

Setup clippy integration in Actions CI

Not going to use rustfmt (I still can't bring myself around to be ok with it quite yet ...). However, we definitely need to get Clippy configured in CI.

trouble running tests

I could not find specific compile instructions in the documentation, did I miss something?

Running cargo test causes the following compiler error when it gets to memstore

   ...
   Compiling tracing-subscriber v0.2.11
   Compiling memstore v0.1.0 (C:\Users\nejat\repos\rust-foss\async-raft\memstore)
error: reached the type-length limit while instantiating `std::future::from_generator::<[s...]>>, ()}]>}]>>, ()}]>}]>>, ()}]>`
  --> C:\Users\nejat\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\src\libcore\future\mod.rs:59:1
   |
59 | / pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
60 | | where
61 | |     T: Generator<ResumeTy, Yield = ()>,
62 | | {
...  |
85 | |     GenFuture(gen)
86 | | }
   | |_^
   |
   = note: consider adding a `#![type_length_limit="2170465"]` attribute to your crate

error: aborting due to previous error

error: could not compile `async-raft`.

I have tried it with the following three targets, both master and 0.5.0

Windows 10

stable-x86_64-pc-windows-gnu
stable-x86_64-pc-windows-msvc

Ubuntu 20.04 Distro WSL2

stable-x86_64-unknown-linux-gnu

Thanks for the help

Crate imports Tokio 0.1

async-raft depends on tracing-futures 0.2.4, but this dependency somehow pulls in tokio 0.1.

An example of implementing using actix-raft

Hi,
I'm considering rewrite a Go project with Rust, and I'm looking for a server implementation using actix-raft for reference.
For example, do you know an example of implementing a simple key-value store using actix-raft?

`client_read` and `client_write` should share a Mutex lock

When using client_read and client_write without Mutex lock, unexpected issues would appear. It takes me a very long time to realize the issue. May I suggest to include a Mutex lock inside these two functions? Alternatively, the document should emphasize the application need to ensure there is no concurrent client_read and client_write.

Fallback conflict-resolution method uses the wrong term

When the conflict optimization is not used (conflict_opt is None) the replication stream decrements its match_index. After doing so, it should also fetch the corresponding match_term from the storage layer.

Failing to do this will result in sending an incorrect prev_log_term in future AppendEntries requests, which will then be rejected, causing the replication to become stuck.

Time based state changes

How might one change the replicated state as a result of timed events?

For example, let's say I want to partition data among some client applications. This might work by having each client send a heartbeat request to the raft cluster every few seconds. If a client fails to send a heartbeat within a certain time, I want to reassign its partitions to other clients and update the replicated state to reflect this change.

However, I don't want every node to be looking for "timed out" clients because they'd probably conflict. I don't want nodes to "own" the clients they talk to because then if the node goes down it won't be able to revoke the partitions from the clients it was connected to.

My guess would be that I should send "timeout" changes from the leader, but the only way to know if the node is the leader seems to be via the metrics trait. Is that how you would recommend achieving this?

On a related note: the system currently follows a strict request/response model for clients. What if I want to push information to a client in response to a state change?

Is it possible to call `Storage::get_membership_config` directly here?

I have stored the latest membership in storage, so that I can have better performance.

pending_config = self.core.storage.get_log_entries(stale_logs_start, stale_logs_stop).await


Since commit_index is always initialized to 0, I need to load all the logs every time I start raft.

self.commit_index = 0;


Please don't mind if I misunderstand this code. 🙂

Small bug in heartbeat timeouts.

When a heartbeat timeout is prematurely executed (which has caused a lot of problems previously), I forgot to reschedule another timeout to be executed. Simple issue, already have the fix. Will release as part of 0.3.1.

Needed updates for 0.2.0

A few issues here:

  • need to revisit the bounds on RaftStorage. Experimenting with synchronous actors. Will quite likely need to update constraints.
  • the RaftStorage::new constructor seems a bit unusable. Applications will need their own constructor patterns. It would probably be best to just communicate the need to supply the parent node's ID & snapshot dir in the docs as opposed to requiring this for implementation.

Optimization: Cache replicated entries for faster lookup for applying to state machine.

todo

  • Update the append entries algorithm for followers so that recent entries are buffered up to a specific threshold so that the process of applying logs to the state machine will not need to fetch those logs from storage a majority of the time.
  • Update the append entries handler so that the process of applying logs to the state machine is no longer directly involved. Right now, applying logs to the state machine is conditionally called when it becomes safe to do so. This needs to be moved to be its own async task.

Docs

  • do a full review of all docstrings for types & modules.
  • add a simple graphic to an overview section of the main README to visually demonstrate the layout of this system.
  • add docs to the metrics module and type.
  • add docs on how metrics system works. Pumps out metrics on a configurable interval. State changes & cluster membership changes are always immediately published.
  • put together a networking guide on how to get started with building a networking layer.
  • put together a storage guide on how to implement an application's RaftStorage layer.
  • update readme image to use the image from the github pages mdbook & add links to guide as needed.
  • add docs examples on how to instantiate and start the various actix actor components (Raft, RaftStorage, RaftNetwork &c).
    • include docs on how to implement such types.
  • add a section to the guide on putting everything together. See #17.
  • build mdBook as a more in-depth guide on using this system. Host on pages.
  • add docs on the config change process & the fact that an alternative failsafe is used by this implementation for the third reconfiguration issue, removed node disruptions, described at the end of §6, and which is an improvement over the recommendation in the spec. This implemention does not accept RPCs from members which are not part of its configuration. The only time this will become an issue is:
    • the leader which is committing the terminal configuration which removes some set of nodes dies after it has only replicated to a minority of the cluster.
    • a member of that minority becomes leader and stops communicating with the old nodes.
    • if the new leader is not able to commit a new entry to the rest of the cluster before the old nodes timeout, it is possible that one of the old nodes will start an election and become the leader.
    • if this happens, that node will simply commit the terminal config change which removes it from the cluster and step down.
    • such conditions are rare, and are easily accounted for without any additional complexity in implementation.
  • add docs to general application building section about client interactions. Per the Raft spec in §8, to ensure that client interactions are not applied > 1 due to a failure scenario and the client issuing a retry, applications must track client IDs and use serial numbers on each request. The RaftStorage implementaion may use this information in the client request log and reject the request from within the RaftStorage handler using an application specific error. The application's client may observe this error and treat it as an overall success. This is an application level responsibility, Raft simply provides the mechanism to be able to implement it.

Decouple snapshots from the filesystem

As we discussed previously, I'm using an in-memory store as I don't need persistence, but I would like to be able to use the snapshot/log-compaction feature to avoid the memory usage growing indefinitely and to make it easier for new nodes to catch up with the leader.

Unfortunately, the current snapshot feature seems intricately coupled with the filesystem, in that actix-raft directly streams results from a file on disk. While I think it would be nice to keep this as a "drop-in" snapshot implementation, I think this should be an implementation detail of the Storage type. Furthermore, I think it's a little odd how some of the APIs operate at a "chunk" level, whereas some operate via file names.

What I would like to see is for CreateSnapshot and InstallSnapshot to each be split into two parts. such that the second part could be shared between them, so we'd have something like:

  • CreateSnapshot
    This would take a log range and would save a corresponding snapshot somewhere (filesystem, memory, wherever). It would return an opaque snapshot ID.

  • SupplySnapshot
    This would take no arguments. It would return a new opaque snapshot ID and something implementing AsyncWrite. Instead of the current "chunk"-based API, this crate should simply write the snapshot data into the AsyncWrite object, calling flush where necessary so that there's no need for the per-chunk callback.

  • InstallSnapshot
    This would take a log range and an opaque snapshot ID produced by a call to CreateSnapshot or SupplySnapshot from the same node and would implement the logic to compact the log. To aid this, the "path" in EntrySnapshotPointer would be replaced with the opaque snapshot ID. The storage implementation could also use this point to delete other snapshots (as actix-raft should guarantee not call either of the other two methods whilst InstallSnapshot is in progress).

Finally, the directory path from the initial config would be dropped, as it will no longer be necessary.

Feature Request: add `Raft::client_current_leader` API

In addition to the existing Raft::client_read API, may I suggest to add an Raft::client_current_leader API like the following:

pub async fn client_current_leader(&self) -> Result<NodeId, ClientCurrentLeaderError> {
    match self.client_read().await {
        Ok(()) => Ok(cur_node_id),
        Err(ClientReadError::ForwardToLeader(Some(id))) => Ok(id),
        Err(ClientReadError::ForwardToLeader(None)) => Err(ClientCurrentLeaderError::LeaderUnknown),
        Err(ClientReadError::RaftError(e)) => Err(ClientCurrentLeaderError::RaftError(e)),
    }
}

This would be helpful to certain applications, where the leader is required to do certain preprocessing before submitting the real Raft write request.

Also, it may be a good idea to have another similar API, which returns the current leader without sending any Raft requests for performance reason. Something like:

pub async fn client_current_presumptive_leader(&self) -> Result<NodeId, ClientCurrentLeaderError> {
    if self.is_leader() {
        Ok(cur_node_id)
    } else if let Some(leader_id) = self.leader_id {
        Ok(leader_id)
    } else {
        Err(ClientCurrentLeaderError::LeaderUnknown)
    }
}

When using this API, it is up to the application's responsibility to check who is the real leader later.

Become runtime-agnostic by using tokio's alternatives

Since we must have a runtime, there are two ways to become runtime-agonostic. The first is to use conditional compiling and write code for each runtime. The second is just use smol or async-std which are compatible between them and have a feature flag to enable tokio compatibility. But in this case, it could be called by others without any configuration.

My opinion is to use smol and other small async libraries.

Pros

  • More compatibility of couse.
  • Smaller size, more flexiblity, performance improvements maybe in some points.
  • More useful internal API, eg. Task in smol or async-std are cancelable.

Cons

  • Unexpected performance issues, at least I don't know how performance would be when mixing smol with tokio.
  • Most of these libraries are created/maintained by @stjepang, but it depends what do you think.

Lagging replication streams will grow indefinitely

I may be misreading, but it appears that lagging replication streams will accumulate new messages indefinitely. Items are added to the buffered_outbound field when in the Lagging state:

    fn handle(&mut self, msg: RSReplicate<D>, ctx: &mut Self::Context) -> Self::Result {
        // Always update line commit & index info first so that this value can be used in all AppendEntries RPCs.
        self.line_commit = msg.line_commit;
        self.line_index = msg.entry.index;

        // Get a mutable reference to an inner buffer if permitted by current state, else return.
        match &mut self.state {
            // NOTE: exceeding line rate buffer size is accounted for in the `handle_append_entries_response` handler.
            RSState::LineRate(inner) => inner.buffered_outbound.push(msg.entry),
            RSState::Lagging(inner) => inner.buffered_outbound.push(msg.entry), // <-- Here
            _ => return Ok(()),
        };

        self.drive_state(ctx);
        Ok(())
    }

But the buffered output field is only truncated when in the LineRate state:

            // If running at line rate, and our buffered outbound requests have accumulated too
            // much, we need to purge and transition to a lagging state. The target is not able to
            // replicate data fast enough.
            if let RSState::LineRate(inner) = &self.state {
                if inner.buffered_outbound.len() > (self.config.max_payload_entries as usize) {
                    return Box::new(self.transition_to_lagging(ctx)); // <-- Here
                }
            }

It's unclear why the Lagging state needs to store buffered_outbound messages at all, since it will fetch messages from the storage layer as needed.

(BTW, I've been writing my own Raft implementation on top of act-zero using this crate as a reference where the paper is unclear, and it's really quite enjoyable.)

Leader blocked when apply_entry_to_state_machine takes to long

In my use case, applying a log entry may take a while, and I found that this would make the leader unable to make progresses, causing it to timeout, and trigger new elections. Here what I have found so far:

  • in LeaderState::run the select! blocks on handle_replica_event (after the log entry has been commited)
  • handle_replica_event calls handle_update_match_index that repeatedly call client_request_post_commit for each entry to be applied.

During all this time, the loop in run is blocked performing this task, and can't perform it's other tasks, such as sending heartbeats. This causes elections timeout, and for some reason that I haven't yet completely understand (probably in my implementation) the entry is not applied on the leader node, but is on the other nodes.

The log application process should not block the leader from operating normally. A possible fix would be to offload the work performed by client_request_post_commit to another thread, and instead of call it directly, handle_update_match_index enqueues entries to be applied. I am currently experimenting with this solution.

Question about message size within actix-raft (and Raft in general).

Hi everyone! If this is not an appropriate venue for the following question, please let me know.

TLDR: Is there a limit (hard or otherwise) to the size that Raft messages for committing log updates can be? I want to write a Raft application using actix-raft whose log updates are anywhere from a few bytes to hundreds of megabytes large; is this possible?

Long version:

I'd like to implement a simple file duplication system to keep replicate a folder across several different computers. For context: standard network/distributed file systems like nfs and gluster almost work for my use case but fail in a critical way: they are too slow. My application requires shared files to be read quickly by software running on the nodes that comprise my distributed system. (Actually gluster fails even more because of some of the shortcomings associated with user-space filesystems).

I've been looking to implement a pretty simple file replication system that just makes sure a given folder is appropriately synced up. Consensus is pretty important because if a file is visible to node A, then it must be visible to the other nodes otherwise we get into trouble (replication doesn't have to be immediate, but it should happen eventually). As a consequence, I've been looking at Raft for a few days and I am pretty eager to try a toy implementation with actix-raft. I believe I have a handle on most aspects of the project, but I have one lingering question that I can't seem to find an answer to online.

The file sizes will range from a few bytes to a few hundred megabytes. The way I see this system working is that if I write a file to any of the synced directories, the replicator running for that directory will sense a change (using inotify) and will use Raft to spread the new file around and reach a consensus so that, eventually, all functioning replicators (this is what I am terming the software that will use actix-raft) on the various nodes will have picked up the change and written the file locally. Now, without the use of some third-party storage mechanism (e.g. Azure/Azurite, S3/Minio, etc), it seems that to make this work with Raft, the messages that Raft uses internally to form/disseminate consensus will have to contain the relevant files themselves. My questions then are:

  1. Have I misunderstood anything about the appropriate use of Raft in the scenario I've described
  2. Are there hard limits on message size? Do the semantics of Raft allow or disallow what I am proposing?

Thanks in advance for any guidance here.

Implement PreVote & CheckQuorum optimizations

This is already a well-known and fairly well-discussed issue in the Raft community. Per @shikhar's comment below, let's implement the PreVote & CheckQuorum optimizations. (EDIT: the original content of this issue was describing a few issues/optimizations which @shikhar identified as being better expressed as PreVote & CheckQuorum in the comment below).

Ultimately, the PreVote phase along with the standard Raft algorithm where candidates will be rejected if a node has received a heartbeat from the leader within the minimum election timeout, should prove to increase the robustness and stability of actual stable leaders.

This coupled with the CheckQuorum optimization should then address the other side of the coin where leaders which should be deposed are indeed deposed and will not block election progress under partial network partitions.

PreVote

  • update spec PDF to reference the updated spec https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf
  • implement PreVote algorithm. This should actually just be a minor refactoring related to the Candidate state.
    • introduce a new is_pre_vote field to the VoteRequest RPC. Node's will handle such RPCs according to the PreVote algorithm.
    • Once a node enters the Candidate state, before incrementing its term, it should perform a request vote round using the is_pre_vote=true setting on the RPCs.
    • If the pre-vote succeeds, then it should increment its term and proceed with a normal vote.
    • The RPC handler for vote requests should be updated to handle the is_pre_vote field in the RPC.
    • Placeholder code is already present in the RPC handler where this needs to be implemented.

CheckQuorum

  • add config item check_quorum_missed_heartbeats_threshold: the number of consecutive missed heartbeats allowed before a leader will consider its connection to a follower node severed.

Latest Spec for Membership Changes

  • update Raft API to adopt the AddServer & RemoveServer RPC along with the simplified dynamic membership system described in the latest spec. Probably open a separate issue for this.
  • the algorithm used for adding non-voters should probably be updated to match that of the latest spec.
    • May want to preserve some of the current functionality for adding non-voters. We can add a remove non-voter method as well. This would provide a more robust read-only replica type functionality with deep integration in Raft.
    • Might need to make the add non-voter functionality idempotent in the sense that if called multiple times, it will just do the same thing: wait for the node to be synced. That way if leadership changes, the endpoint can just be called again with the same effect.
    • Update the README to describe this functionality once it is in place.

Snapshot per Latest Spec

  • only real change here is that the InstallSnapshot RPC is now described as having a field for the latest config. This is definitely easier than having to decode the config from the snapshot.

Add metrics system.

These metrics will show the current state of the Raft node over some time resolution. These will be useful for applications to expose Prometheus integrations and the like.

Periodic snapshots/compaction.

Snapshot creation needs to be triggered based on configuration & distance from last snapshot, not just based on leader detecting that a node is far behind.

  • This will just be a simple mechanism which compares the distance of the last_log_index from the last snapshot point, and will trigger a compaction as needed.
  • Needs state representation to ensure it does not overlap/conflict with receiving a snapshot from the leader (which will take precedence).

Actor system eventually fails

Hi!

I've been testing my Raft application and I saw some strange related my VoteRequest handler and the SaveHardState handler. I started stripping the code down to see where the issue was and, to further simplify things, I even started testing with just one node (though Raft expects 2, so the single node does attempt to become leader). For the purposes of this post, I will show my very simplified VoteRequest handler:

impl Handler<messages::VoteRequest> for AppNetwork {
    type Result = ResponseActFuture<Self, messages::VoteResponse, ()>;

    fn handle(&mut self, msg: messages::VoteRequest, _ctx: &mut Self::Context) -> Self::Result {
        let f = wrap_future(
            async move {
                println!("VoteRequest target: {}", msg.target);
                time::delay_for(std::time::Duration::from_millis(50)).await;
                println!("Done sleeping");
                Err(())
            }
            .boxed_local()
            .compat()
        );

        Box::new(f)
    }
}

Tada! It doesn't do anything: it just fails after a bit. Since the node thinks there is another node it should talk to, this code does get triggered. Initially, everything is fine: the node keeps trying at the vote as it should and the output looks like this:

VoteRequest target: 1
Done sleeping
VoteRequest target: 1
Done sleeping
VoteRequest target: 1
Done sleeping
VoteRequest target: 1
Done sleeping
...

My understanding is that this should basically go on forever since I never bring up the second node. But, eventually, the handler starts getting hung up on the sleep statement, so that the output looks like this:

VoteRequest target: 1
VoteRequest target: 1
VoteRequest target: 1
VoteRequest target: 1
VoteRequest target: 1
VoteRequest target: 1
VoteRequest target: 1
...

When we get here, the system never seems to recover.

At some point in my debugging, I thought it might be possible that another vote was getting started even though a future already existed for a previous vote requests, causing the extant future to get dropped and never completed. This doesn't explain why the system doesn't recover, but on a whim I changed the min/max vote request timeouts. Initially I tried (2000, 3000) and now I am working with (500, 1000). This doesn't seem to change anything: the problem persists.

Any thoughts on what might be happening here?

Many thanks!

Inconsistency with Raft spec

From the spec:

To prevent this problem, servers disregard RequestVote RPCs when they believe a current leader exists. Specifically, if a server receives a RequestVote RPC within the minimum election timeout of hearing from a current leader, it does not update its term or grant its vote.

The code does not appear to implement this behaviour.

On the other hand, implementing this behaviour seems like it could cause problems: if one node in a cluster temporarily loses connectivity then its "term" may increase beyond the rest of the cluster. When connectivity returns, this node will never be able to rejoin the cluster: it will always refuse AppendEntriesRequests from the leader due to its own "term" being ahead, but it will always fail to win VoteRequests because other nodes will still be receiving heartbeats from the leader.

I can only assume that servers are expected to update themselves with the "term" in a VoteRequest, even if that VoteRequest is going to be ignored due to this reason. However, this would seem to defeat the point of this change, which is intended to prevent ex-members from disrupting the cluster unnecessarily.

Discussion: Optimization for large AppendEntries RPC

This is a more of feature request on a possible optimization.

Currently, the AppendEntries RPC has a timeout guard using heartbeat_interval from the Config. In my use case, the latency between nodes is small, so ideally we should use a small heartbeat_interval for better performance. At the same time, the size of AppData and AppDataResponse can be quite large. This entails a very large heartbeat_interval. Otherwise, the system will report "timeout while sending AppendEntries RPC to target" and cause view change and leader election. I think this results to a suboptimal performance.

May I suggest to introduce a new append_entries_timeout config? In this case, when sending AppendEntries RPC, we should use append_entries_timeout as the timeout for normal AppendEntries RPC and heartbeat_interval as the timeout for heartbeat RPC.

Refactor snapshot handler.

Code quality in this module is not so great (src/raft/install_snapshot.rs). Async/await will help a lot once it is ready, and once acitx is fut03 compat.

Message passing latency?

Looking at the design of actix-raft, I wonder what the rationale for using this many individual actors is. IIUC there are at least 10 messages involved for every client request, with metrics even more. Benchmarking actix actors I found that message passing between actors isn't that cheap, and should only be used where necessary (e.g. decoupling blocking operations).

How about making actors optional and allowing implementation of the components via simple traits (too)?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.