diff --git a/dozer-core/src/executor/node.rs b/dozer-core/src/executor/node.rs index e7d645da0d..2e841a5a1d 100644 --- a/dozer-core/src/executor/node.rs +++ b/dozer-core/src/executor/node.rs @@ -11,7 +11,8 @@ pub trait Node { } impl Node for T { - fn run(mut self) -> Result<(), ExecutionError> { - self.receiver_loop(self.initial_epoch_id()) + fn run(self) -> Result<(), ExecutionError> { + let initial_epoch_id = self.initial_epoch_id(); + self.receiver_loop(initial_epoch_id) } } diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs index e3cae72a11..f4f3bf7b8c 100644 --- a/dozer-core/src/executor/receiver_loop.rs +++ b/dozer-core/src/executor/receiver_loop.rs @@ -33,7 +33,10 @@ pub trait ReceiverLoop: Name { ) -> Result<(), ExecutionError>; /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times. - fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> { + fn receiver_loop(mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> + where + Self: Sized, + { let receivers = self.receivers(); debug_assert!( !receivers.is_empty(), @@ -105,7 +108,7 @@ pub(crate) fn init_select(receivers: &Vec>) -> Selec #[cfg(test)] mod tests { - use std::{mem::swap, sync::Arc, time::SystemTime}; + use std::{cell::RefCell, mem::swap, rc::Rc, sync::Arc, time::SystemTime}; use crossbeam::channel::{unbounded, Sender}; use dozer_types::{ @@ -117,8 +120,8 @@ mod tests { use super::*; - struct TestReceiverLoop { - receivers: Vec>, + #[derive(Clone)] + struct TestReceiverLoopState { ops: Vec<(usize, TableOperation)>, commits: Vec, snapshotting_started: Vec, @@ -126,6 +129,11 @@ mod tests { num_terminations: usize, } + struct TestReceiverLoop { + receivers: Vec>, + state: Rc>, + } + impl Name for TestReceiverLoop { fn name(&self) -> Cow { Cow::Borrowed("TestReceiverLoop") @@ -148,17 +156,17 @@ mod tests { } fn on_op(&mut self, index: usize, op: TableOperation) -> Result<(), ExecutionError> { - self.ops.push((index, op)); + self.state.borrow_mut().ops.push((index, op)); Ok(()) } fn on_commit(&mut self, epoch: Epoch) -> Result<(), ExecutionError> { - self.commits.push(epoch); + self.state.borrow_mut().commits.push(epoch); Ok(()) } fn on_terminate(&mut self) -> Result<(), ExecutionError> { - self.num_terminations += 1; + self.state.borrow_mut().num_terminations += 1; Ok(()) } @@ -166,7 +174,10 @@ mod tests { &mut self, connection_name: String, ) -> Result<(), ExecutionError> { - self.snapshotting_started.push(connection_name); + self.state + .borrow_mut() + .snapshotting_started + .push(connection_name); Ok(()) } @@ -175,41 +186,55 @@ mod tests { connection_name: String, state: Option, ) -> Result<(), ExecutionError> { - self.snapshotting_done.push((connection_name, state)); + self.state + .borrow_mut() + .snapshotting_done + .push((connection_name, state)); Ok(()) } } impl TestReceiverLoop { - fn new(num_receivers: usize) -> (TestReceiverLoop, Vec>) { + fn new( + num_receivers: usize, + ) -> ( + TestReceiverLoop, + Vec>, + Rc>, + ) { let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip(); + let state = Rc::new(RefCell::new(TestReceiverLoopState { + ops: vec![], + commits: vec![], + snapshotting_started: vec![], + snapshotting_done: vec![], + num_terminations: 0, + })); ( TestReceiverLoop { receivers, - ops: vec![], - commits: vec![], - snapshotting_started: vec![], - snapshotting_done: vec![], - num_terminations: 0, + state: state.clone(), }, senders, + state, ) } } #[test] fn receiver_loop_stops_on_terminate() { - let (mut test_loop, senders) = TestReceiverLoop::new(2); + let (test_loop, senders, state) = TestReceiverLoop::new(2); + let test_loop = Box::new(test_loop); senders[0].send(ExecutorOperation::Terminate).unwrap(); senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); - assert_eq!(test_loop.num_terminations, 1); + assert_eq!(state.borrow().num_terminations, 1); } #[test] fn receiver_loop_forwards_snapshotting_done() { let connection_name = "test_connection".to_string(); - let (mut test_loop, senders) = TestReceiverLoop::new(2); + let (test_loop, senders, state) = TestReceiverLoop::new(2); senders[0] .send(ExecutorOperation::SnapshottingDone { connection_name: connection_name.clone(), @@ -219,12 +244,13 @@ mod tests { senders[0].send(ExecutorOperation::Terminate).unwrap(); senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); - assert_eq!(test_loop.snapshotting_done, vec![(connection_name, None)]) + let snapshotting_done = state.borrow().snapshotting_done.clone(); + assert_eq!(snapshotting_done, vec![(connection_name, None)]) } #[test] fn receiver_loop_forwards_op() { - let (mut test_loop, senders) = TestReceiverLoop::new(2); + let (test_loop, senders, state) = TestReceiverLoop::new(2); let record = Record::new(vec![Field::Int(1)]); senders[0] .send(ExecutorOperation::Op { @@ -240,7 +266,7 @@ mod tests { senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); assert_eq!( - test_loop.ops, + state.borrow().ops, vec![( 0, TableOperation::without_id(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE,) @@ -250,7 +276,7 @@ mod tests { #[test] fn receiver_loop_increases_epoch_id() { - let (mut test_loop, senders) = TestReceiverLoop::new(2); + let (test_loop, senders, state) = TestReceiverLoop::new(2); let mut source_states = SourceStates::default(); source_states.insert( NodeHandle::new(None, "0".to_string()), @@ -290,16 +316,17 @@ mod tests { senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); - assert_eq!(test_loop.commits[0].common_info.id, 0); - assert_eq!(test_loop.commits[0].decision_instant, decision_instant); - assert_eq!(test_loop.commits[1].common_info.id, 1); - assert_eq!(test_loop.commits[1].decision_instant, decision_instant); + let state = state.borrow(); + assert_eq!(state.commits[0].common_info.id, 0); + assert_eq!(state.commits[0].decision_instant, decision_instant); + assert_eq!(state.commits[1].common_info.id, 1); + assert_eq!(state.commits[1].decision_instant, decision_instant); } #[test] #[should_panic] fn receiver_loop_panics_on_inconsistent_commit_epoch() { - let (mut test_loop, senders) = TestReceiverLoop::new(2); + let (test_loop, senders, _) = TestReceiverLoop::new(2); let mut source_states = SourceStates::new(); source_states.insert( NodeHandle::new(None, "0".to_string()), diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 028975aacd..1d3a07792e 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -207,9 +207,16 @@ impl ReceiverLoop for SinkNode { Cow::Owned(self.node_handles[index].to_string()) } - fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> { + fn receiver_loop(mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> { // This is just copied from ReceiverLoop let receivers = self.receivers(); + let should_flush_receiver = { + // Take the receiver. This is fine, as long as we exclusively use the + // returned receiver and not the one in `self`. + let (_, mut tmp_recv) = crossbeam::channel::bounded(0); + swap(&mut self.should_flush_receiver, &mut tmp_recv); + tmp_recv + }; debug_assert!( !receivers.is_empty(), "Processor or sink must have at least 1 incoming edge" @@ -223,15 +230,17 @@ impl ReceiverLoop for SinkNode { .send(self.max_flush_interval) .unwrap(); let mut sel = init_select(&receivers); + let flush_idx = sel.recv(&should_flush_receiver); loop { - if self.should_flush_receiver.try_recv().is_ok() { + let index = sel.ready(); + if index == flush_idx { + should_flush_receiver.recv().unwrap(); if let Some(epoch) = self.last_op_if_commit.take() { self.flush(epoch)?; } else { self.flush_scheduled_on_next_commit = true; } } - let index = sel.ready(); let op = receivers[index] .recv() .map_err(|_| ExecutionError::CannotReceiveFromChannel)?;