From fdf7cb3a64d013ad21c4e13566bf5e026002cd5f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 13 Mar 2023 15:49:11 +0800 Subject: [PATCH 1/3] map watermark Signed-off-by: Bugen Zhao --- src/stream/src/executor/dispatch.rs | 37 +++++++++++++++++++++-------- src/stream/src/executor/mod.rs | 9 +++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index d8d569c382902..f115985bf63ba 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -446,14 +446,20 @@ pub trait Dispatcher: Debug + 'static { #[derive(Debug)] pub struct RoundRobinDataDispatcher { outputs: Vec, + output_indices: Vec, cur: usize, dispatcher_id: DispatcherId, } impl RoundRobinDataDispatcher { - pub fn new(outputs: Vec, dispatcher_id: DispatcherId) -> Self { + pub fn new( + outputs: Vec, + output_indices: Vec, + dispatcher_id: DispatcherId, + ) -> Self { Self { outputs, + output_indices, cur: 0, dispatcher_id, } @@ -465,6 +471,7 @@ impl Dispatcher for RoundRobinDataDispatcher { fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { async move { + let chunk = chunk.reorder_columns(&self.output_indices); self.outputs[self.cur].send(Message::Chunk(chunk)).await?; self.cur += 1; self.cur %= self.outputs.len(); @@ -484,9 +491,11 @@ impl Dispatcher for RoundRobinDataDispatcher { fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { async move { - // always broadcast watermark - for output in &mut self.outputs { - output.send(Message::Watermark(watermark.clone())).await?; + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in &mut self.outputs { + output.send(Message::Watermark(watermark.clone())).await?; + } } Ok(()) } @@ -569,9 +578,11 @@ impl Dispatcher for HashDataDispatcher { fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { async move { - // always broadcast watermark - for output in &mut self.outputs { - output.send(Message::Watermark(watermark.clone())).await?; + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in &mut self.outputs { + output.send(Message::Watermark(watermark.clone())).await?; + } } Ok(()) } @@ -751,8 +762,11 @@ impl Dispatcher for BroadcastDispatcher { fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { async move { - for output in self.outputs.values_mut() { - output.send(Message::Watermark(watermark.clone())).await?; + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in self.outputs.values_mut() { + output.send(Message::Watermark(watermark.clone())).await?; + } } Ok(()) } @@ -851,7 +865,10 @@ impl Dispatcher for SimpleDispatcher { .exactly_one() .expect("expect exactly one output"); - output.send(Message::Watermark(watermark)).await + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + output.send(Message::Watermark(watermark)).await?; + } + Ok(()) } } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index c24f0e3e71ceb..57858819808e9 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -615,6 +615,15 @@ impl Watermark { }) } + /// Transform the watermark with the given output indices. If this watermark is not in the + /// output, return `None`. + pub fn transform_with_indices(self, output_indices: &[usize]) -> Option { + output_indices + .iter() + .position(|p| *p == self.col_idx) + .map(|new_col_idx| self.with_idx(new_col_idx)) + } + pub fn to_protobuf(&self) -> ProstWatermark { ProstWatermark { column: Some(ProstInputRef { From c907a563e9de9f8cddc53421d7b0948faa86ef15 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 13 Mar 2023 16:45:48 +0800 Subject: [PATCH 2/3] dry Signed-off-by: Bugen Zhao --- src/stream/src/executor/backfill.rs | 5 +---- src/stream/src/executor/chain.rs | 5 +---- src/stream/src/executor/rearranged_chain.rs | 5 +---- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 338963018cba5..bd0831457e45b 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -398,10 +398,7 @@ where } fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { - upstream_indices - .iter() - .position(|&idx| idx == watermark.col_idx) - .map(|idx| watermark.with_idx(idx)) + watermark.transform_with_indices(upstream_indices) } fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option { diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index b899d45f80eab..1311e1b927dfc 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -53,10 +53,7 @@ fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk } fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { - upstream_indices - .iter() - .position(|&idx| idx == watermark.col_idx) - .map(|idx| watermark.with_idx(idx)) + watermark.transform_with_indices(upstream_indices) } impl ChainExecutor { diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index fd3cf9591bab9..c307c0d43d99b 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -71,10 +71,7 @@ fn mapping(upstream_indices: &[usize], msg: Message) -> Option { } fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { - upstream_indices - .iter() - .position(|&idx| idx == watermark.col_idx) - .map(|idx| watermark.with_idx(idx)) + watermark.transform_with_indices(upstream_indices) } #[derive(Debug)] From 9b9c8f9ec0de4a28c804be9a18620f3a75db6b58 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 13 Mar 2023 18:23:43 +0800 Subject: [PATCH 3/3] fix unit test Signed-off-by: Bugen Zhao --- src/stream/src/executor/integration_tests.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 73ddc2b8e40c5..eb9d70679b78e 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -124,7 +124,9 @@ async fn test_merger_sum_aggr() { let dispatcher = DispatchExecutor::new( receiver_op, vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new( - inputs, 0, + inputs, + vec![0], + 0, ))], 0, ctx,