Comments (3)
There is also other examples that should answer your questions. https://github.com/tikv/raft-rs/tree/master/examples.
I'm unsure how to trigger tick in a thread-safe manner.
You can use locks or use message queue, it's OK as long as the Rust compiler doesn't complain.
Will it advance the inner timer
There is no inner timer, you have to call tick
to let RawNode
know time has passed. If IO has to be happen in the thread that drives Raft, I'm afraid it has to delay the timer and cause unexpected elections.
BTW, shall we discuss this kind of questions on discussions, or just reuse issues?
Both are OK. This is a small project that doesn't need complicated rules. :)
from raft-rs.
Thank you!
Both are OK
I'd prefer Discussions and thus check if it works for you :D
There is no inner timer
Yeah. I see there is only tick counter. But I don't check if other method increase those counters internally. As you said, if only tick
advance the counter (effective timer), and heavy IO delays that logic, the raft state can be unexpected unstable.
May I ask how TiKV handles or works around this risk?
locks or use message queue
You're right that tick take a mut
reference instead of interior mutability and Rust compiler should help here.
other examples
I checked them roughly. But those examples seems quite handmade that preconfigure role and state a bit. My examination case is running a raft nodes cluster in different process/hosts and now I'm trying to make three nodes campaign works stably.
I'll bring more concrete questions back if any. Now the main loop is like:
pub fn start(mut self) -> anyhow::Result<()> {
// simulate self.node.campaign
self.tx_message.send(RaftMessage {
msg_type: MessageType::MsgHup as i32,
..Default::default()
})?;
loop {
self.select_any();
match self.shutdown.try_recv() {
Err(TryRecvError::Empty) => {
// no shutdown signal so far
}
Ok(()) | Err(TryRecvError::Disconnected) => {
info!(self.logger, "shutting down because server stopped");
return Ok(());
}
}
for raft_msg in self.rx_message.try_iter() {
if raw_node::is_local_msg(raft_msg.msg_type()) {
self.node.raft.step(raft_msg)?;
} else {
self.node.step(raft_msg)?;
}
}
self.on_ready();
}
}
I'm understanding how to handle and advance the raft core state and which state I should hold in server layer now.
from raft-rs.
FYI I can run a three nodes cluster with a stable leader now. Below is the main loop I'd like to share. Perhaps I will disclose the source once an MVP is constructed.
For questions above:
call several steps in batch and then call ready
It seems correct. While I don't know if some ready state must be handled to update the raft store before step more messages. But that order should be considered and coordinated in the server.
In the main loop below, for each run, the node processes in -
- Tick (while there is an issue crossbeam-rs/crossbeam#989 on timer semantic)
- Process local messages ("local" should be well-defined later, that I'm unsure if it's different from
is_local_msg
in raft-rs. So far, it means messages sent from the same server) - Process remote messages.
- Handle possible
Ready
.
difference between the three or four variants of advance
It seems advance
is enough now. When I work on the store part, perhaps more variants are needed.
pub struct RaftNode {
node: RawNode<MemStorage>,
state: NodeState,
shutdown: Receiver<()>,
tx_message: Sender<RaftMessage>,
rx_message: Receiver<RaftMessage>,
#[allow(dead_code)] // prevent unnecessary recv error
tx_api: Sender<ServiceMessage>,
rx_api: Receiver<ServiceMessage>,
tick: Receiver<Instant>,
peers: PeerProxyMap,
logger: Logger,
}
pub struct NodeInitContext {
pub id: u64,
pub peers: Vec<Peer>,
pub shutdown: Receiver<()>,
pub tx_message: Sender<RaftMessage>,
pub rx_message: Receiver<RaftMessage>,
pub tx_api: Sender<ServiceMessage>,
pub rx_api: Receiver<ServiceMessage>,
pub logger: Logger,
}
#[derive(Debug)]
struct NodeState {
role: StateRole,
}
impl RaftNode {
pub fn new(context: NodeInitContext) -> RaftNode {
let NodeInitContext {
id,
peers,
shutdown,
tx_message,
rx_message,
tx_api,
rx_api,
logger,
} = context;
let config = {
let mut config = Config::new(id);
config.priority = (1 << id) as i64;
config.election_tick = 10;
config.heartbeat_tick = 1;
config.max_size_per_msg = 1024 * 1024 * 1024;
config.validate().unwrap();
config
};
let voters = peers.iter().map(|p| p.id).collect::<Vec<_>>();
let storage = MemStorage::new_with_conf_state((voters, vec![]));
let logger = logger.new(o!("tag" => format!("peer_{id}")));
let node = RawNode::new(&config, storage, &logger).unwrap();
let peers = PeerProxyMap::new(peers, logger.clone());
let state = NodeState {
role: StateRole::Follower,
};
let tick = crossbeam_channel::tick(Duration::from_millis(100));
RaftNode {
node,
state,
peers,
tx_message,
rx_message,
tx_api,
rx_api,
tick,
shutdown,
logger,
}
}
pub fn do_main(self) {
let logger = self.logger.clone();
if let Err(err) = self.start() {
error!(logger, "raft node shutdown improperly"; "err" => ?err);
}
}
fn start(mut self) -> anyhow::Result<()> {
// simulate self.node.campaign
self.tx_message.send(RaftMessage::campaign())?;
loop {
self.select_any();
match self.shutdown.try_recv() {
Err(TryRecvError::Empty) => {
// no shutdown signal so far
}
Ok(()) | Err(TryRecvError::Disconnected) => {
info!(self.logger, "shutting down because server stopped");
return Ok(());
}
}
if self.tick.try_recv().is_ok() {
self.node.tick();
}
for msg in self.rx_message.try_iter() {
let msg = msg.0;
if suraft::raw_node::is_local_msg(msg.msg_type()) {
self.node.raft.step(msg)?;
} else {
self.node.step(msg)?;
}
}
for msg in self.rx_api.try_iter() {
self.node.step(msg.0)?;
}
self.on_ready();
}
}
fn on_ready(&mut self) {
if !self.node.has_ready() {
return;
}
trace!(self.logger, "on_ready");
let mut ready = self.node.ready();
// trace!(self.logger, "{:#?}", ready);
if let Some(ss) = ready.ss() {
if ss.raft_state != self.state.role {
info!(
self.logger,
"changing raft node role from {:?} to {:?}", self.state.role, ss.raft_state
);
self.state.role = ss.raft_state;
}
}
for msg in ready.take_messages() {
match msg.msg_type() {
eraftpb::MessageType::MsgAppend => {
self.tx_message.send(RaftMessage(msg)).unwrap();
}
eraftpb::MessageType::MsgHeartbeat => {
let peer = self.peers.get_peer(msg.to).unwrap();
std::thread::spawn(move || peer.tell(msg));
}
msg => {
error!(self.logger, "unimplemented {:?}", msg);
unimplemented!("{:?}", msg)
}
}
}
for msg in ready.take_persisted_messages() {
match msg.msg_type() {
eraftpb::MessageType::MsgRequestVote
| eraftpb::MessageType::MsgRequestVoteResponse
| eraftpb::MessageType::MsgHeartbeatResponse => {
let peer = self.peers.get_peer(msg.to).unwrap();
std::thread::spawn(move || peer.tell(msg));
}
msg => {
error!(self.logger, "unimplemented {:?}", msg);
unimplemented!("{:?}", msg)
}
}
}
self.node.advance(ready);
}
fn select_any(&self) {
let mut select = Select::new();
select.recv(&self.shutdown);
select.recv(&self.rx_message);
select.recv(&self.rx_api);
select.recv(&self.tick);
select.ready();
}
}
from raft-rs.
Related Issues (20)
- Using PROST in `Error` HOT 3
- Clarification needed: processing ready state HOT 1
- Snapshot gets rejected unless the node is already a part of the ConfState HOT 3
- Clarification needed: rejecting confchange HOT 1
- Unclear documentation for Progress maybe_snapshot_abort HOT 2
- [doc] minor typo in library documentation HOT 1
- Detach the delivery of Ready from the call to tick() to reduce commit latency. HOT 1
- How to watch the follower become leader? HOT 2
- How do I use the `ConfChangeType::AddLearnerNode` in `propose_conf_change`? HOT 4
- Panic in `become_leader` HOT 7
- Are there any plans to support [tracing](https://github.com/tokio-rs/tracing)? HOT 1
- File name too long during cargo build
- Make request_snapshot more safer
- Please release 0.7.0 on `crates.io` HOT 2
- Why is MsgSnapStatus considered a local message? HOT 3
- Question - Migrate `raft-rs 0.6.0a` to `raft-rs 0.7.0` HOT 1
- Initialization with term=0 shouldn't be allowed HOT 10
- Picked - Support asynchronous storage writes HOT 2
- Confusion about comments in raft.rs at line 1630 HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from raft-rs.