Skip to content

Commit

Permalink
Introduce Raft log Term and Index types, and make fields private.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 14, 2023
1 parent 18a53d9 commit 888da02
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 71 deletions.
47 changes: 32 additions & 15 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ use ::log::debug;
use serde::{Deserialize, Serialize};
use std::ops::RangeBounds;

/// A log index.
pub type Index = u64;

/// A Raft leadership term.
/// TODO: Consider moving this to the module root.
pub type Term = u64;

/// A replicated log entry
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Entry {
/// The index of the entry.
pub index: u64,
pub index: Index,
/// The term in which the entry was added.
pub term: u64,
pub term: Term,
/// The state machine command. None is used to commit noops during leader election.
pub command: Option<Vec<u8>>,
}
Expand All @@ -39,13 +46,13 @@ pub struct Log {
/// The underlying log store.
pub(super) store: Box<dyn log::Store>,
/// The index of the last stored entry.
pub(super) last_index: u64,
last_index: Index,
/// The term of the last stored entry.
pub(super) last_term: u64,
/// The last entry known to be committed.
pub(super) commit_index: u64,
last_term: Term,
/// The index of the last committed entry.
commit_index: Index,
/// The term of the last committed entry.
pub(super) commit_term: u64,
commit_term: Term,
}

impl Log {
Expand All @@ -72,8 +79,18 @@ impl Log {
Ok(Self { store, last_index, last_term, commit_index, commit_term })
}

/// Returns the last committed index and term.
pub fn get_commit_index(&self) -> (Index, Term) {
(self.commit_index, self.commit_term)
}

/// Returns the last log index and term.
pub fn get_last_index(&self) -> (Index, Term) {
(self.last_index, self.last_term)
}

/// Appends a command to the log, returning the entry.
pub fn append(&mut self, term: u64, command: Option<Vec<u8>>) -> Result<Entry> {
pub fn append(&mut self, term: Term, command: Option<Vec<u8>>) -> Result<Entry> {
let entry = Entry { index: self.last_index + 1, term, command };
debug!("Appending log entry {}: {:?}", entry.index, entry);
self.store.append(bincode::serialize(&entry)?)?;
Expand All @@ -83,7 +100,7 @@ impl Log {
}

/// Commits entries up to and including an index.
pub fn commit(&mut self, index: u64) -> Result<u64> {
pub fn commit(&mut self, index: Index) -> Result<u64> {
let entry = self
.get(index)?
.ok_or_else(|| Error::Internal(format!("Entry {} not found", index)))?;
Expand All @@ -94,12 +111,12 @@ impl Log {
}

/// Fetches an entry at an index
pub fn get(&self, index: u64) -> Result<Option<Entry>> {
pub fn get(&self, index: Index) -> Result<Option<Entry>> {
self.store.get(index)?.map(|v| bincode::deserialize(&v)).transpose()
}

/// Checks if the log contains an entry
pub fn has(&self, index: u64, term: u64) -> Result<bool> {
pub fn has(&self, index: Index, term: Term) -> Result<bool> {
match self.get(index)? {
Some(entry) => Ok(entry.term == term),
None if index == 0 && term == 0 => Ok(true),
Expand All @@ -108,7 +125,7 @@ impl Log {
}

/// Iterates over log entries
pub fn scan(&self, range: impl RangeBounds<u64>) -> Scan {
pub fn scan(&self, range: impl RangeBounds<Index>) -> Scan {
Box::new(
self.store.scan(Range::from(range)).map(|r| r.and_then(|v| bincode::deserialize(&v))),
)
Expand Down Expand Up @@ -140,7 +157,7 @@ impl Log {

/// Truncates the log such that its last item is at most index.
/// Refuses to remove entries that have been applied or committed.
pub fn truncate(&mut self, index: u64) -> Result<u64> {
pub fn truncate(&mut self, index: Index) -> Result<u64> {
debug!("Truncating log from entry {}", index);
let (index, term) = match self.store.truncate(index)? {
0 => (0, 0),
Expand All @@ -159,7 +176,7 @@ impl Log {

/// Loads information about the most recent term known by the log, containing the term number (0
/// if none) and candidate voted for in current term (if any).
pub fn load_term(&self) -> Result<(u64, Option<String>)> {
pub fn load_term(&self) -> Result<(Term, Option<String>)> {
let (term, voted_for) = self
.store
.get_metadata(&Key::TermVote.encode())?
Expand All @@ -171,7 +188,7 @@ impl Log {
}

/// Saves information about the most recent term.
pub fn save_term(&mut self, term: u64, voted_for: Option<&str>) -> Result<()> {
pub fn save_term(&mut self, term: Term, voted_for: Option<&str>) -> Result<()> {
self.store.set_metadata(&Key::TermVote.encode(), bincode::serialize(&(term, voted_for))?)
}
}
Expand Down
20 changes: 5 additions & 15 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,10 @@ impl RoleNode<Candidate> {
fn become_leader(self) -> Result<RoleNode<Leader>> {
info!("Won election for term {}, becoming leader", self.term);
let peers = self.peers.clone();
let last_index = self.log.last_index;
let (last_index, _) = self.log.get_last_index();
let (commit_index, commit_term) = self.log.get_commit_index();
let mut node = self.become_role(Leader::new(peers, last_index))?;
node.send(
Address::Peers,
Event::Heartbeat {
commit_index: node.log.commit_index,
commit_term: node.log.commit_term,
},
)?;
node.send(Address::Peers, Event::Heartbeat { commit_index, commit_term })?;
node.append(None)?;
node.abort_proxied()?;
Ok(node)
Expand Down Expand Up @@ -117,16 +112,11 @@ impl RoleNode<Candidate> {
self.role.election_ticks += 1;
if self.role.election_ticks >= self.role.election_timeout {
info!("Election timed out, starting new election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
self.term += 1;
self.log.save_term(self.term, None)?;
self.role = Candidate::new();
self.send(
Address::Peers,
Event::SolicitVote {
last_index: self.log.last_index,
last_term: self.log.last_term,
},
)?;
self.send(Address::Peers, Event::SolicitVote { last_index, last_term })?;
}
Ok(self.into())
}
Expand Down
15 changes: 7 additions & 8 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ impl RoleNode<Follower> {
/// Transforms the node into a candidate.
fn become_candidate(self) -> Result<RoleNode<Candidate>> {
info!("Starting election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
let mut node = self.become_role(Candidate::new())?;
node.term += 1;
node.log.save_term(node.term, None)?;
node.send(
Address::Peers,
Event::SolicitVote { last_index: node.log.last_index, last_term: node.log.last_term },
)?;
node.send(Address::Peers, Event::SolicitVote { last_index, last_term })?;
Ok(node)
}

Expand Down Expand Up @@ -86,8 +84,8 @@ impl RoleNode<Follower> {
Event::Heartbeat { commit_index, commit_term } => {
if self.is_leader(&msg.from) {
let has_committed = self.log.has(commit_index, commit_term)?;
if has_committed && commit_index > self.log.commit_index {
let old_commit_index = self.log.commit_index;
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index);
while let Some(entry) = scan.next().transpose()? {
Expand All @@ -104,10 +102,11 @@ impl RoleNode<Follower> {
return Ok(self.into());
}
}
if last_term < self.log.last_term {
let (log_last_index, log_last_term) = self.log.get_last_index();
if last_term < log_last_term {
return Ok(self.into());
}
if last_term == self.log.last_term && last_index < self.log.last_index {
if last_term == log_last_term && last_index < log_last_index {
return Ok(self.into());
}
if let Address::Peer(from) = msg.from {
Expand Down
38 changes: 14 additions & 24 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,26 @@ impl RoleNode<Leader> {

/// Commits any pending log entries.
fn commit(&mut self) -> Result<u64> {
let mut last_indexes = vec![self.log.last_index];
let mut last_indexes = vec![self.log.get_last_index().0];
last_indexes.extend(self.role.peer_last_index.values());
last_indexes.sort_unstable();
last_indexes.reverse();
let quorum_index = last_indexes[self.quorum() as usize - 1];

// We can only safely commit up to an entry from our own term, see figure 8 in Raft paper.
if quorum_index > self.log.commit_index {
let (commit_index, _) = self.log.get_commit_index();
if quorum_index > commit_index {
if let Some(entry) = self.log.get(quorum_index)? {
if entry.term == self.term {
let old_commit_index = self.log.commit_index;
self.log.commit(quorum_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=self.log.commit_index);
let mut scan = self.log.scan((commit_index + 1)..=quorum_index);
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
}
}
Ok(self.log.commit_index)
Ok(quorum_index)
}

/// Replicates the log to a peer.
Expand Down Expand Up @@ -144,27 +144,22 @@ impl RoleNode<Leader> {
}

Event::ClientRequest { id, request: Request::Query(command) } => {
let (commit_index, commit_term) = self.log.get_commit_index();
self.state_tx.send(Instruction::Query {
id,
address: msg.from,
command,
term: self.term,
index: self.log.commit_index,
index: commit_index,
quorum: self.quorum(),
})?;
self.state_tx.send(Instruction::Vote {
term: self.term,
index: self.log.commit_index,
index: commit_index,
address: Address::Local,
})?;
if !self.peers.is_empty() {
self.send(
Address::Peers,
Event::Heartbeat {
commit_index: self.log.commit_index,
commit_term: self.log.commit_term,
},
)?;
self.send(Address::Peers, Event::Heartbeat { commit_index, commit_term })?;
}
}

Expand All @@ -182,12 +177,12 @@ impl RoleNode<Leader> {
leader: self.id.clone(),
term: self.term,
node_last_index: self.role.peer_last_index.clone(),
commit_index: self.log.commit_index,
commit_index: self.log.get_commit_index().0,
apply_index: 0,
storage: self.log.store.to_string(),
storage_size: self.log.store.size(),
});
status.node_last_index.insert(self.id.clone(), self.log.last_index);
status.node_last_index.insert(self.id.clone(), self.log.get_last_index().0);
self.state_tx.send(Instruction::Status { id, address: msg.from, status })?
}

Expand Down Expand Up @@ -216,13 +211,8 @@ impl RoleNode<Leader> {
self.role.heartbeat_ticks += 1;
if self.role.heartbeat_ticks >= HEARTBEAT_INTERVAL {
self.role.heartbeat_ticks = 0;
self.send(
Address::Peers,
Event::Heartbeat {
commit_index: self.log.commit_index,
commit_term: self.log.commit_term,
},
)?;
let (commit_index, commit_term) = self.log.get_commit_index();
self.send(Address::Peers, Event::Heartbeat { commit_index, commit_term })?;
}
}
Ok(self.into())
Expand Down Expand Up @@ -260,7 +250,7 @@ mod tests {
id: "a".into(),
peers: peers.clone(),
term: 3,
role: Leader::new(peers, log.last_index),
role: Leader::new(peers, log.get_last_index().0),
log,
node_tx,
state_tx,
Expand Down
19 changes: 10 additions & 9 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ impl Node {
node_tx: mpsc::UnboundedSender<Message>,
) -> Result<Self> {
let applied_index = state.applied_index();
if applied_index > log.commit_index {
let (commit_index, _) = log.get_commit_index();
if applied_index > commit_index {
return Err(Error::Internal(format!(
"State machine applied index {} greater than log committed index {}",
applied_index, log.commit_index
applied_index, commit_index
)));
}

let (state_tx, state_rx) = mpsc::unbounded_channel();
let mut driver = Driver::new(state_rx, node_tx.clone());
if log.commit_index > applied_index {
info!("Replaying log entries {} to {}", applied_index + 1, log.commit_index);
driver.replay(&mut *state, log.scan((applied_index + 1)..=log.commit_index))?;
if commit_index > applied_index {
info!("Replaying log entries {} to {}", applied_index + 1, commit_index);
driver.replay(&mut *state, log.scan((applied_index + 1)..=commit_index))?;
};
tokio::spawn(driver.drive(state));

Expand All @@ -81,7 +82,7 @@ impl Node {
};
if node.peers.is_empty() {
info!("No peers specified, starting as leader");
let last_index = node.log.last_index;
let (last_index, _) = node.log.get_last_index();
Ok(node.become_role(Leader::new(vec![], last_index))?.into())
} else {
Ok(node.into())
Expand Down Expand Up @@ -274,17 +275,17 @@ mod tests {
}

pub fn committed(self, index: u64) -> Self {
assert_eq!(index, self.log().commit_index, "Unexpected committed index");
assert_eq!(index, self.log().get_commit_index().0, "Unexpected committed index");
self
}

pub fn last(self, index: u64) -> Self {
assert_eq!(index, self.log().last_index, "Unexpected last index");
assert_eq!(index, self.log().get_last_index().0, "Unexpected last index");
self
}

pub fn entry(self, entry: Entry) -> Self {
assert!(entry.index <= self.log().last_index, "Index beyond last entry");
assert!(entry.index <= self.log().get_last_index().0, "Index beyond last entry");
assert_eq!(entry, self.log().get(entry.index).unwrap().unwrap());
self
}
Expand Down

0 comments on commit 888da02

Please sign in to comment.