Skip to content

Commit

Permalink
Fix sink flush deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Mar 7, 2024
1 parent f86cfd4 commit 1e1605c
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 32 deletions.
5 changes: 3 additions & 2 deletions dozer-core/src/executor/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ pub trait Node {
}

impl<T: ReceiverLoop + Debug> Node for T {
fn run(mut self) -> Result<(), ExecutionError> {
self.receiver_loop(self.initial_epoch_id())
fn run(self) -> Result<(), ExecutionError> {
let initial_epoch_id = self.initial_epoch_id();
self.receiver_loop(initial_epoch_id)
}
}
81 changes: 54 additions & 27 deletions dozer-core/src/executor/receiver_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub trait ReceiverLoop: Name {
) -> Result<(), ExecutionError>;

/// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> {
fn receiver_loop(mut self, initial_epoch_id: u64) -> Result<(), ExecutionError>
where
Self: Sized,
{
let receivers = self.receivers();
debug_assert!(
!receivers.is_empty(),
Expand Down Expand Up @@ -105,7 +108,7 @@ pub(crate) fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Selec

#[cfg(test)]
mod tests {
use std::{mem::swap, sync::Arc, time::SystemTime};
use std::{cell::RefCell, mem::swap, rc::Rc, sync::Arc, time::SystemTime};

use crossbeam::channel::{unbounded, Sender};
use dozer_types::{
Expand All @@ -117,15 +120,20 @@ mod tests {

use super::*;

struct TestReceiverLoop {
receivers: Vec<Receiver<ExecutorOperation>>,
#[derive(Clone)]
struct TestReceiverLoopState {
ops: Vec<(usize, TableOperation)>,
commits: Vec<Epoch>,
snapshotting_started: Vec<String>,
snapshotting_done: Vec<(String, Option<OpIdentifier>)>,
num_terminations: usize,
}

struct TestReceiverLoop {
receivers: Vec<Receiver<ExecutorOperation>>,
state: Rc<RefCell<TestReceiverLoopState>>,
}

impl Name for TestReceiverLoop {
fn name(&self) -> Cow<str> {
Cow::Borrowed("TestReceiverLoop")
Expand All @@ -148,25 +156,28 @@ mod tests {
}

fn on_op(&mut self, index: usize, op: TableOperation) -> Result<(), ExecutionError> {
self.ops.push((index, op));
self.state.borrow_mut().ops.push((index, op));
Ok(())
}

fn on_commit(&mut self, epoch: Epoch) -> Result<(), ExecutionError> {
self.commits.push(epoch);
self.state.borrow_mut().commits.push(epoch);
Ok(())
}

fn on_terminate(&mut self) -> Result<(), ExecutionError> {
self.num_terminations += 1;
self.state.borrow_mut().num_terminations += 1;
Ok(())
}

fn on_snapshotting_started(
&mut self,
connection_name: String,
) -> Result<(), ExecutionError> {
self.snapshotting_started.push(connection_name);
self.state
.borrow_mut()
.snapshotting_started
.push(connection_name);
Ok(())
}

Expand All @@ -175,41 +186,55 @@ mod tests {
connection_name: String,
state: Option<OpIdentifier>,
) -> Result<(), ExecutionError> {
self.snapshotting_done.push((connection_name, state));
self.state
.borrow_mut()
.snapshotting_done
.push((connection_name, state));
Ok(())
}
}

impl TestReceiverLoop {
fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
fn new(
num_receivers: usize,
) -> (
TestReceiverLoop,
Vec<Sender<ExecutorOperation>>,
Rc<RefCell<TestReceiverLoopState>>,
) {
let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
let state = Rc::new(RefCell::new(TestReceiverLoopState {
ops: vec![],
commits: vec![],
snapshotting_started: vec![],
snapshotting_done: vec![],
num_terminations: 0,
}));
(
TestReceiverLoop {
receivers,
ops: vec![],
commits: vec![],
snapshotting_started: vec![],
snapshotting_done: vec![],
num_terminations: 0,
state: state.clone(),
},
senders,
state,
)
}
}

#[test]
fn receiver_loop_stops_on_terminate() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let (test_loop, senders, state) = TestReceiverLoop::new(2);
let test_loop = Box::new(test_loop);
senders[0].send(ExecutorOperation::Terminate).unwrap();
senders[1].send(ExecutorOperation::Terminate).unwrap();
test_loop.receiver_loop(0).unwrap();
assert_eq!(test_loop.num_terminations, 1);
assert_eq!(state.borrow().num_terminations, 1);
}

#[test]
fn receiver_loop_forwards_snapshotting_done() {
let connection_name = "test_connection".to_string();
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let (test_loop, senders, state) = TestReceiverLoop::new(2);
senders[0]
.send(ExecutorOperation::SnapshottingDone {
connection_name: connection_name.clone(),
Expand All @@ -219,12 +244,13 @@ mod tests {
senders[0].send(ExecutorOperation::Terminate).unwrap();
senders[1].send(ExecutorOperation::Terminate).unwrap();
test_loop.receiver_loop(0).unwrap();
assert_eq!(test_loop.snapshotting_done, vec![(connection_name, None)])
let snapshotting_done = state.borrow().snapshotting_done.clone();
assert_eq!(snapshotting_done, vec![(connection_name, None)])
}

#[test]
fn receiver_loop_forwards_op() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let (test_loop, senders, state) = TestReceiverLoop::new(2);
let record = Record::new(vec![Field::Int(1)]);
senders[0]
.send(ExecutorOperation::Op {
Expand All @@ -240,7 +266,7 @@ mod tests {
senders[1].send(ExecutorOperation::Terminate).unwrap();
test_loop.receiver_loop(0).unwrap();
assert_eq!(
test_loop.ops,
state.borrow().ops,
vec![(
0,
TableOperation::without_id(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE,)
Expand All @@ -250,7 +276,7 @@ mod tests {

#[test]
fn receiver_loop_increases_epoch_id() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let (test_loop, senders, state) = TestReceiverLoop::new(2);
let mut source_states = SourceStates::default();
source_states.insert(
NodeHandle::new(None, "0".to_string()),
Expand Down Expand Up @@ -290,16 +316,17 @@ mod tests {
senders[1].send(ExecutorOperation::Terminate).unwrap();
test_loop.receiver_loop(0).unwrap();

assert_eq!(test_loop.commits[0].common_info.id, 0);
assert_eq!(test_loop.commits[0].decision_instant, decision_instant);
assert_eq!(test_loop.commits[1].common_info.id, 1);
assert_eq!(test_loop.commits[1].decision_instant, decision_instant);
let state = state.borrow();
assert_eq!(state.commits[0].common_info.id, 0);
assert_eq!(state.commits[0].decision_instant, decision_instant);
assert_eq!(state.commits[1].common_info.id, 1);
assert_eq!(state.commits[1].decision_instant, decision_instant);
}

#[test]
#[should_panic]
fn receiver_loop_panics_on_inconsistent_commit_epoch() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let (test_loop, senders, _) = TestReceiverLoop::new(2);
let mut source_states = SourceStates::new();
source_states.insert(
NodeHandle::new(None, "0".to_string()),
Expand Down
15 changes: 12 additions & 3 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,16 @@ impl ReceiverLoop for SinkNode {
Cow::Owned(self.node_handles[index].to_string())
}

fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> {
fn receiver_loop(mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> {
// This is just copied from ReceiverLoop
let receivers = self.receivers();
let should_flush_receiver = {
// Take the receiver. This is fine, as long as we exclusively use the
// returned receiver and not the one in `self`.
let (_, mut tmp_recv) = crossbeam::channel::bounded(0);
swap(&mut self.should_flush_receiver, &mut tmp_recv);
tmp_recv
};
debug_assert!(
!receivers.is_empty(),
"Processor or sink must have at least 1 incoming edge"
Expand All @@ -223,15 +230,17 @@ impl ReceiverLoop for SinkNode {
.send(self.max_flush_interval)
.unwrap();
let mut sel = init_select(&receivers);
let flush_idx = sel.recv(&should_flush_receiver);
loop {
if self.should_flush_receiver.try_recv().is_ok() {
let index = sel.ready();
if index == flush_idx {
should_flush_receiver.recv().unwrap();
if let Some(epoch) = self.last_op_if_commit.take() {
self.flush(epoch)?;
} else {
self.flush_scheduled_on_next_commit = true;
}
}
let index = sel.ready();
let op = receivers[index]
.recv()
.map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
Expand Down

0 comments on commit 1e1605c

Please sign in to comment.