diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 5cae3485e4..b8f3c62e04 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -18,8 +18,7 @@ use tokio::sync::broadcast; use crate::{ builder_dag::NodeKind, epoch::Epoch, error_manager::ErrorManager, errors::ExecutionError, - event::Event, executor::receiver_loop::init_select, executor_operation::ExecutorOperation, - node::Sink, + event::Event, executor_operation::ExecutorOperation, node::Sink, }; use super::execution_dag::ExecutionDag; @@ -192,6 +191,61 @@ impl Name for SinkNode { } } +struct Select<'a> { + op_receivers: &'a [Receiver], + flush_receiver: &'a Receiver<()>, + inner: crossbeam::channel::Select<'a>, + flush_idx: usize, +} + +enum ReceiverMsg { + Op(usize, ExecutorOperation), + Flush, +} + +impl<'a> Select<'a> { + fn new( + op_receivers: &'a [Receiver], + flush_receiver: &'a Receiver<()>, + ) -> Self { + let mut inner = crossbeam::channel::Select::new(); + for recv in op_receivers { + let _ = inner.recv(recv); + } + let flush_idx = inner.recv(flush_receiver); + Self { + inner, + flush_idx, + op_receivers, + flush_receiver, + } + } + + fn remove(&mut self, idx: usize) { + self.inner.remove(idx); + } + + fn reinit(&mut self) { + self.inner = crossbeam::channel::Select::new(); + for recv in self.op_receivers { + let _ = self.inner.recv(recv); + } + self.flush_idx = self.inner.recv(self.flush_receiver); + } + + fn recv(&mut self) -> Result { + let msg = self.inner.select(); + let index = msg.index(); + let res = if index == self.flush_idx { + msg.recv(self.flush_receiver).map(|_| ReceiverMsg::Flush) + } else { + msg.recv(&self.op_receivers[index]) + .map(|op| ReceiverMsg::Op(index, op)) + }; + res.map_err(|_| ExecutionError::CannotReceiveFromChannel) + } +} + impl ReceiverLoop for SinkNode { fn initial_epoch_id(&self) -> u64 { self.initial_epoch_id @@ -229,22 +283,16 @@ impl ReceiverLoop for SinkNode { self.flush_scheduler_sender .send(self.max_flush_interval) .unwrap(); - let mut sel = init_select(&receivers); - let flush_idx = sel.recv(&should_flush_receiver); + let mut sel = Select::new(&receivers, &should_flush_receiver); loop { - let index = sel.ready(); - if index == flush_idx { - should_flush_receiver.recv().unwrap(); + let ReceiverMsg::Op(index, op) = sel.recv()? else { if let Some(epoch) = self.last_op_if_commit.take() { self.flush(epoch)?; } else { self.flush_scheduled_on_next_commit = true; } continue; - } - let op = receivers[index] - .recv() - .map_err(|_| ExecutionError::CannotReceiveFromChannel)?; + }; match op { ExecutorOperation::Op { op } => { @@ -259,7 +307,7 @@ impl ReceiverLoop for SinkNode { self.on_commit(epoch)?; epoch_id += 1; commits_received = 0; - sel = init_select(&receivers); + sel.reinit(); } } ExecutorOperation::Terminate => {