Skip to content

Commit

Permalink
A0-3186: Simplify DoublingDelayScheduler (#344)
Browse files Browse the repository at this point in the history
Removed unnecessary channel communication between `add_task` and `next_task`.
Added unit tests for `DoublingDelayScheduler`
  • Loading branch information
woocash2 authored Sep 6, 2023
1 parent d60d5b0 commit 381ad9f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "../README.md"
description = "AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications."

[dependencies]
aleph-bft-rmc = { path = "../rmc", version = "0.9" }
aleph-bft-rmc = { path = "../rmc", version = "0.10" }
aleph-bft-types = { path = "../types", version = "0.8" }
anyhow = "1.0"
async-trait = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions rmc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft-rmc"
version = "0.9.0"
version = "0.10.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "cryptography"]
Expand All @@ -23,4 +23,4 @@ log = "0.4"
[dev-dependencies]
aleph-bft-mock = { path = "../mock" }
rand = "0.8"
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "time"] }
159 changes: 115 additions & 44 deletions rmc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use codec::{Decode, Encode};
use core::fmt::Debug;
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
future::pending,
FutureExt, StreamExt,
};
use futures_timer::Delay;
Expand All @@ -17,8 +18,8 @@ use std::{
collections::{BinaryHeap, HashMap},
fmt::Formatter,
hash::Hash,
time,
time::Duration,
ops::{Add, Div, Mul},
time::{Duration, Instant},
};

/// Abstraction of a task-scheduling logic
Expand Down Expand Up @@ -61,22 +62,21 @@ pub enum Task<H: Signable, MK: MultiKeychain> {
#[derive(Clone, Debug, Eq, PartialEq)]
struct ScheduledTask<T> {
task: T,
delay: time::Duration,
delay: Duration,
}

impl<T> ScheduledTask<T> {
fn new(task: T, delay: time::Duration) -> Self {
fn new(task: T, delay: Duration) -> Self {
ScheduledTask { task, delay }
}
}

#[derive(Ord, PartialOrd, Eq, PartialEq)]
struct IndexedInstant(time::Instant, usize);
struct IndexedInstant(Instant, usize);

impl IndexedInstant {
fn now(i: usize) -> Self {
let curr_time = time::Instant::now();
IndexedInstant(curr_time, i)
fn at(instant: Instant, i: usize) -> Self {
IndexedInstant(instant, i)
}
}

Expand All @@ -87,11 +87,9 @@ impl IndexedInstant {
/// `initial_delay`, and each following delay for that task is two times longer than the previous
/// one.
pub struct DoublingDelayScheduler<T> {
initial_delay: time::Duration,
initial_delay: Duration,
scheduled_instants: BinaryHeap<Reverse<IndexedInstant>>,
scheduled_tasks: Vec<ScheduledTask<T>>,
on_new_task_tx: UnboundedSender<T>,
on_new_task_rx: UnboundedReceiver<T>,
}

impl<T> Debug for DoublingDelayScheduler<T> {
Expand All @@ -105,53 +103,53 @@ impl<T> Debug for DoublingDelayScheduler<T> {
}

impl<T> DoublingDelayScheduler<T> {
pub fn new(initial_delay: time::Duration) -> Self {
let (on_new_task_tx, on_new_task_rx) = unbounded();
DoublingDelayScheduler {
pub fn new(initial_delay: Duration) -> Self {
DoublingDelayScheduler::with_tasks(vec![], initial_delay)
}

pub fn with_tasks(initial_tasks: Vec<T>, initial_delay: Duration) -> Self {
let mut scheduler = DoublingDelayScheduler {
initial_delay,
scheduled_instants: BinaryHeap::new(),
scheduled_tasks: Vec::new(),
on_new_task_tx,
on_new_task_rx,
};
if initial_tasks.is_empty() {
return scheduler;
}
let delta = initial_delay.div((initial_tasks.len()) as u32); // safety: len is non-zero
for (i, task) in initial_tasks.into_iter().enumerate() {
scheduler.add_task_after(task, delta.mul(i as u32));
}
scheduler
}

fn add_task_after(&mut self, task: T, delta: Duration) {
let i = self.scheduled_tasks.len();
let instant = Instant::now().add(delta);
let indexed_instant = IndexedInstant::at(instant, i);
self.scheduled_instants.push(Reverse(indexed_instant));
let scheduled_task = ScheduledTask::new(task, self.initial_delay);
self.scheduled_tasks.push(scheduled_task);
}
}

#[async_trait]
impl<T: Send + Sync + Clone> TaskScheduler<T> for DoublingDelayScheduler<T> {
fn add_task(&mut self, task: T) {
self.on_new_task_tx
.unbounded_send(task)
.expect("We own the the rx, so this can't fail");
self.add_task_after(task, Duration::ZERO);
}

async fn next_task(&mut self) -> Option<T> {
let mut delay: futures::future::Fuse<_> = match self.scheduled_instants.peek() {
match self.scheduled_instants.peek() {
Some(&Reverse(IndexedInstant(instant, _))) => {
let now = time::Instant::now();
if now > instant {
Delay::new(Duration::new(0, 0)).fuse()
} else {
Delay::new(instant - now).fuse()
}
}
None => futures::future::Fuse::terminated(),
};
// wait until either the scheduled time of the peeked task or a next call of add_task
futures::select! {
_ = delay => {},
task = self.on_new_task_rx.next() => {
if let Some(task) = task {
let i = self.scheduled_tasks.len();
let indexed_instant = IndexedInstant::now(i);
self.scheduled_instants.push(Reverse(indexed_instant));
let scheduled_task = ScheduledTask::new(task, self.initial_delay);
self.scheduled_tasks.push(scheduled_task);
} else {
return None;
let now = Instant::now();
if now < instant {
Delay::new(instant - now).await;
}
}
None => pending().await,
}

let Reverse(IndexedInstant(instant, i)) = self
.scheduled_instants
.pop()
Expand All @@ -163,7 +161,6 @@ impl<T: Send + Sync + Clone> TaskScheduler<T> for DoublingDelayScheduler<T> {
.push(Reverse(IndexedInstant(instant + scheduled_task.delay, i)));

scheduled_task.delay *= 2;

Some(task)
}
}
Expand Down Expand Up @@ -326,7 +323,7 @@ impl<H: Signable + Hash + Eq + Clone + Debug, MK: MultiKeychain> ReliableMultica

#[cfg(test)]
mod tests {
use crate::{DoublingDelayScheduler, Message, ReliableMulticast};
use crate::{DoublingDelayScheduler, Message, ReliableMulticast, TaskScheduler};
use aleph_bft_crypto::{Multisigned, NodeCount, NodeIndex, Signed};
use aleph_bft_mock::{BadSigning, Keychain, PartialMultisignature, Signable, Signature};
use futures::{
Expand All @@ -336,7 +333,12 @@ mod tests {
FutureExt, StreamExt,
};
use rand::Rng;
use std::{collections::HashMap, pin::Pin, time::Duration};
use std::{
collections::HashMap,
ops::{Add, Mul},
pin::Pin,
time::{Duration, Instant},
};

type TestMessage = Message<Signable, Signature, PartialMultisignature>;

Expand Down Expand Up @@ -545,4 +547,73 @@ mod tests {
assert_eq!(multisignatures[0].as_signable(), &hash);
}
}

#[tokio::test]
async fn scheduler_yields_proper_order_of_tasks() {
let mut scheduler = DoublingDelayScheduler::new(Duration::from_millis(25));

scheduler.add_task(0);
tokio::time::sleep(Duration::from_millis(2)).await;
scheduler.add_task(1);

let task = scheduler.next_task().await;
assert_eq!(task, Some(0));
let task = scheduler.next_task().await;
assert_eq!(task, Some(1));
let task = scheduler.next_task().await;
assert_eq!(task, Some(0));
let task = scheduler.next_task().await;
assert_eq!(task, Some(1));

tokio::time::sleep(Duration::from_millis(2)).await;
scheduler.add_task(2);

let task = scheduler.next_task().await;
assert_eq!(task, Some(2));
let task = scheduler.next_task().await;
assert_eq!(task, Some(2));
let task = scheduler.next_task().await;
assert_eq!(task, Some(0));
let task = scheduler.next_task().await;
assert_eq!(task, Some(1));
let task = scheduler.next_task().await;
assert_eq!(task, Some(2));
}

#[tokio::test]
async fn scheduler_properly_handles_initial_bunch_of_tasks() {
let tasks = (0..5).collect();
let before = Instant::now();
let mut scheduler = DoublingDelayScheduler::with_tasks(tasks, Duration::from_millis(25));

for i in 0..5 {
let task = scheduler.next_task().await;
assert_eq!(task, Some(i));
let now = Instant::now();
// 0, 5, 10, 15, 20
assert!(now - before >= Duration::from_millis(5).mul(i));
}

for i in 0..5 {
let task = scheduler.next_task().await;
assert_eq!(task, Some(i));
let now = Instant::now();
// 25, 30, 35, 40, 45
assert!(
now - before
>= Duration::from_millis(5)
.mul(i)
.add(Duration::from_millis(25))
);
}
}

#[tokio::test]
async fn asking_empty_scheduler_for_next_task_blocks() {
let mut scheduler: DoublingDelayScheduler<u32> =
DoublingDelayScheduler::new(Duration::from_millis(25));
let future = tokio::time::timeout(Duration::from_millis(30), scheduler.next_task());
let result = future.await;
assert!(result.is_err()); // elapsed
}
}

0 comments on commit 381ad9f

Please sign in to comment.