Skip to content

Commit

Permalink
Abstract sink node select and fix reinit bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Mar 7, 2024
1 parent 9dabcc9 commit 8c723f5
Showing 1 changed file with 60 additions and 12 deletions.
72 changes: 60 additions & 12 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use tokio::sync::broadcast;

use crate::{
builder_dag::NodeKind, epoch::Epoch, error_manager::ErrorManager, errors::ExecutionError,
event::Event, executor::receiver_loop::init_select, executor_operation::ExecutorOperation,
node::Sink,
event::Event, executor_operation::ExecutorOperation, node::Sink,
};

use super::execution_dag::ExecutionDag;
Expand Down Expand Up @@ -192,6 +191,61 @@ impl Name for SinkNode {
}
}

struct Select<'a> {
op_receivers: &'a [Receiver<ExecutorOperation>],
flush_receiver: &'a Receiver<()>,
inner: crossbeam::channel::Select<'a>,
flush_idx: usize,
}

enum ReceiverMsg {
Op(usize, ExecutorOperation),
Flush,
}

impl<'a> Select<'a> {
fn new(
op_receivers: &'a [Receiver<ExecutorOperation>],
flush_receiver: &'a Receiver<()>,
) -> Self {
let mut inner = crossbeam::channel::Select::new();
for recv in op_receivers {
let _ = inner.recv(recv);
}
let flush_idx = inner.recv(flush_receiver);
Self {
inner,
flush_idx,
op_receivers,
flush_receiver,
}
}

fn remove(&mut self, idx: usize) {
self.inner.remove(idx);
}

fn reinit(&mut self) {
self.inner = crossbeam::channel::Select::new();
for recv in self.op_receivers {
let _ = self.inner.recv(recv);
}
self.flush_idx = self.inner.recv(self.flush_receiver);
}

fn recv(&mut self) -> Result<ReceiverMsg, ExecutionError> {
let msg = self.inner.select();
let index = msg.index();
let res = if index == self.flush_idx {
msg.recv(self.flush_receiver).map(|_| ReceiverMsg::Flush)
} else {
msg.recv(&self.op_receivers[index])
.map(|op| ReceiverMsg::Op(index, op))
};
res.map_err(|_| ExecutionError::CannotReceiveFromChannel)
}
}

impl ReceiverLoop for SinkNode {
fn initial_epoch_id(&self) -> u64 {
self.initial_epoch_id
Expand Down Expand Up @@ -229,22 +283,16 @@ impl ReceiverLoop for SinkNode {
self.flush_scheduler_sender
.send(self.max_flush_interval)
.unwrap();
let mut sel = init_select(&receivers);
let flush_idx = sel.recv(&should_flush_receiver);
let mut sel = Select::new(&receivers, &should_flush_receiver);
loop {
let index = sel.ready();
if index == flush_idx {
should_flush_receiver.recv().unwrap();
let ReceiverMsg::Op(index, op) = sel.recv()? else {
if let Some(epoch) = self.last_op_if_commit.take() {
self.flush(epoch)?;
} else {
self.flush_scheduled_on_next_commit = true;
}
continue;
}
let op = receivers[index]
.recv()
.map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
};

match op {
ExecutorOperation::Op { op } => {
Expand All @@ -259,7 +307,7 @@ impl ReceiverLoop for SinkNode {
self.on_commit(epoch)?;
epoch_id += 1;
commits_received = 0;
sel = init_select(&receivers);
sel.reinit();
}
}
ExecutorOperation::Terminate => {
Expand Down

0 comments on commit 8c723f5

Please sign in to comment.