Giter Site home page Giter Site logo

Comments (3)

BusyJay avatar BusyJay commented on July 29, 2024

There is also other examples that should answer your questions. https://github.com/tikv/raft-rs/tree/master/examples.

I'm unsure how to trigger tick in a thread-safe manner.

You can use locks or use message queue, it's OK as long as the Rust compiler doesn't complain.

Will it advance the inner timer

There is no inner timer, you have to call tick to let RawNode know time has passed. If IO has to be happen in the thread that drives Raft, I'm afraid it has to delay the timer and cause unexpected elections.

BTW, shall we discuss this kind of questions on discussions, or just reuse issues?

Both are OK. This is a small project that doesn't need complicated rules. :)

from raft-rs.

tisonkun avatar tisonkun commented on July 29, 2024

Thank you!

Both are OK

I'd prefer Discussions and thus check if it works for you :D

There is no inner timer

Yeah. I see there is only tick counter. But I don't check if other method increase those counters internally. As you said, if only tick advance the counter (effective timer), and heavy IO delays that logic, the raft state can be unexpected unstable.

May I ask how TiKV handles or works around this risk?

locks or use message queue

You're right that tick take a mut reference instead of interior mutability and Rust compiler should help here.

other examples

I checked them roughly. But those examples seems quite handmade that preconfigure role and state a bit. My examination case is running a raft nodes cluster in different process/hosts and now I'm trying to make three nodes campaign works stably.

I'll bring more concrete questions back if any. Now the main loop is like:

    pub fn start(mut self) -> anyhow::Result<()> {
        // simulate self.node.campaign
        self.tx_message.send(RaftMessage {
            msg_type: MessageType::MsgHup as i32,
            ..Default::default()
        })?;

        loop {
            self.select_any();

            match self.shutdown.try_recv() {
                Err(TryRecvError::Empty) => {
                    // no shutdown signal so far
                }
                Ok(()) | Err(TryRecvError::Disconnected) => {
                    info!(self.logger, "shutting down because server stopped");
                    return Ok(());
                }
            }

            for raft_msg in self.rx_message.try_iter() {
                if raw_node::is_local_msg(raft_msg.msg_type()) {
                    self.node.raft.step(raft_msg)?;
                } else {
                    self.node.step(raft_msg)?;
                }
            }

            self.on_ready();
        }
    }

I'm understanding how to handle and advance the raft core state and which state I should hold in server layer now.

from raft-rs.

tisonkun avatar tisonkun commented on July 29, 2024

FYI I can run a three nodes cluster with a stable leader now. Below is the main loop I'd like to share. Perhaps I will disclose the source once an MVP is constructed.

For questions above:

call several steps in batch and then call ready

It seems correct. While I don't know if some ready state must be handled to update the raft store before step more messages. But that order should be considered and coordinated in the server.

In the main loop below, for each run, the node processes in -

  1. Tick (while there is an issue crossbeam-rs/crossbeam#989 on timer semantic)
  2. Process local messages ("local" should be well-defined later, that I'm unsure if it's different from is_local_msg in raft-rs. So far, it means messages sent from the same server)
  3. Process remote messages.
  4. Handle possible Ready.

difference between the three or four variants of advance

It seems advance is enough now. When I work on the store part, perhaps more variants are needed.

pub struct RaftNode {
    node: RawNode<MemStorage>,
    state: NodeState,

    shutdown: Receiver<()>,
    tx_message: Sender<RaftMessage>,
    rx_message: Receiver<RaftMessage>,
    #[allow(dead_code)] // prevent unnecessary recv error
    tx_api: Sender<ServiceMessage>,
    rx_api: Receiver<ServiceMessage>,

    tick: Receiver<Instant>,

    peers: PeerProxyMap,

    logger: Logger,
}

pub struct NodeInitContext {
    pub id: u64,
    pub peers: Vec<Peer>,
    pub shutdown: Receiver<()>,
    pub tx_message: Sender<RaftMessage>,
    pub rx_message: Receiver<RaftMessage>,
    pub tx_api: Sender<ServiceMessage>,
    pub rx_api: Receiver<ServiceMessage>,
    pub logger: Logger,
}

#[derive(Debug)]
struct NodeState {
    role: StateRole,
}

impl RaftNode {
    pub fn new(context: NodeInitContext) -> RaftNode {
        let NodeInitContext {
            id,
            peers,
            shutdown,
            tx_message,
            rx_message,
            tx_api,
            rx_api,
            logger,
        } = context;

        let config = {
            let mut config = Config::new(id);
            config.priority = (1 << id) as i64;
            config.election_tick = 10;
            config.heartbeat_tick = 1;
            config.max_size_per_msg = 1024 * 1024 * 1024;
            config.validate().unwrap();
            config
        };

        let voters = peers.iter().map(|p| p.id).collect::<Vec<_>>();
        let storage = MemStorage::new_with_conf_state((voters, vec![]));
        let logger = logger.new(o!("tag" => format!("peer_{id}")));
        let node = RawNode::new(&config, storage, &logger).unwrap();

        let peers = PeerProxyMap::new(peers, logger.clone());
        let state = NodeState {
            role: StateRole::Follower,
        };

        let tick = crossbeam_channel::tick(Duration::from_millis(100));

        RaftNode {
            node,
            state,
            peers,
            tx_message,
            rx_message,
            tx_api,
            rx_api,
            tick,
            shutdown,
            logger,
        }
    }

    pub fn do_main(self) {
        let logger = self.logger.clone();
        if let Err(err) = self.start() {
            error!(logger, "raft node shutdown improperly"; "err" => ?err);
        }
    }

    fn start(mut self) -> anyhow::Result<()> {
        // simulate self.node.campaign
        self.tx_message.send(RaftMessage::campaign())?;

        loop {
            self.select_any();

            match self.shutdown.try_recv() {
                Err(TryRecvError::Empty) => {
                    // no shutdown signal so far
                }
                Ok(()) | Err(TryRecvError::Disconnected) => {
                    info!(self.logger, "shutting down because server stopped");
                    return Ok(());
                }
            }

            if self.tick.try_recv().is_ok() {
                self.node.tick();
            }

            for msg in self.rx_message.try_iter() {
                let msg = msg.0;
                if suraft::raw_node::is_local_msg(msg.msg_type()) {
                    self.node.raft.step(msg)?;
                } else {
                    self.node.step(msg)?;
                }
            }

            for msg in self.rx_api.try_iter() {
                self.node.step(msg.0)?;
            }

            self.on_ready();
        }
    }

    fn on_ready(&mut self) {
        if !self.node.has_ready() {
            return;
        }
        trace!(self.logger, "on_ready");

        let mut ready = self.node.ready();
        // trace!(self.logger, "{:#?}", ready);

        if let Some(ss) = ready.ss() {
            if ss.raft_state != self.state.role {
                info!(
                    self.logger,
                    "changing raft node role from {:?} to {:?}", self.state.role, ss.raft_state
                );
                self.state.role = ss.raft_state;
            }
        }

        for msg in ready.take_messages() {
            match msg.msg_type() {
                eraftpb::MessageType::MsgAppend => {
                    self.tx_message.send(RaftMessage(msg)).unwrap();
                }
                eraftpb::MessageType::MsgHeartbeat => {
                    let peer = self.peers.get_peer(msg.to).unwrap();
                    std::thread::spawn(move || peer.tell(msg));
                }
                msg => {
                    error!(self.logger, "unimplemented {:?}", msg);
                    unimplemented!("{:?}", msg)
                }
            }
        }

        for msg in ready.take_persisted_messages() {
            match msg.msg_type() {
                eraftpb::MessageType::MsgRequestVote
                | eraftpb::MessageType::MsgRequestVoteResponse
                | eraftpb::MessageType::MsgHeartbeatResponse => {
                    let peer = self.peers.get_peer(msg.to).unwrap();
                    std::thread::spawn(move || peer.tell(msg));
                }
                msg => {
                    error!(self.logger, "unimplemented {:?}", msg);
                    unimplemented!("{:?}", msg)
                }
            }
        }

        self.node.advance(ready);
    }

    fn select_any(&self) {
        let mut select = Select::new();
        select.recv(&self.shutdown);
        select.recv(&self.rx_message);
        select.recv(&self.rx_api);
        select.recv(&self.tick);
        select.ready();
    }
}

from raft-rs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.