Skip to content

Commit

Permalink
Proposed merger at high concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jun 4, 2022
1 parent 8e9ebb5 commit b30b711
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 104 deletions.
72 changes: 72 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,75 @@ impl Status {
self.leader_id == self.id
}
}

pub(crate) enum ReplyChan {
One(Sender<RaftResponse>),
More(Vec<Sender<RaftResponse>>),
}

#[derive(Serialize, Deserialize)]
pub(crate) enum Proposals {
One(Vec<u8>),
More(Vec<Vec<u8>>),
}


pub(crate) struct Merger {
proposals: Vec<Vec<u8>>,
chans: Vec<Sender<RaftResponse>>,
start_collection_time: i64,
}


impl Merger {
pub fn new() -> Self {
Self {
proposals: Vec::new(),
chans: Vec::new(),
start_collection_time: 0,
}
}

#[inline]
pub fn add(&mut self, proposal: Vec<u8>, chan: Sender<RaftResponse>) {
self.proposals.push(proposal);
self.chans.push(chan);
}

#[inline]
pub fn len(&self) -> usize {
self.proposals.len()
}

#[inline]
pub fn take(&mut self) -> Option<(Proposals, ReplyChan)> {
let max = 50; //@TODO configurable, 50
let len = self.len();
let len = if len > max { max } else { len };
if len > 0 && (len == max || self.timeout()) {
let data = if len == 1 {
match (self.proposals.pop(), self.chans.pop()) {
(Some(proposal), Some(chan)) => {
Some((Proposals::One(proposal), ReplyChan::One(chan)))
}
_ => unreachable!()
}
} else {
let mut proposals = self.proposals.drain(0..len).collect::<Vec<_>>();
let mut chans = self.chans.drain(0..len).collect::<Vec<_>>();
proposals.reverse();
chans.reverse();
Some((Proposals::More(proposals), ReplyChan::More(chans)))
};
self.start_collection_time = chrono::Local::now().timestamp_millis();
data
} else {
None
}
}

#[inline]
fn timeout(&self) -> bool {
chrono::Local::now().timestamp_millis() > (self.start_collection_time + 200) //@TODO configurable, 200 millisecond
}
}
Loading

0 comments on commit b30b711

Please sign in to comment.