From 063f58e123c6eead17f0a0da01099e2d92d3ed35 Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Fri, 22 Sep 2023 16:08:57 +0800 Subject: [PATCH] refactor(watermark): avoid sending min watermark (#12462) --- e2e_test/streaming/watermark.slt | 2 +- .../executor/backfill/no_shuffle_backfill.rs | 3 +- src/stream/src/executor/watermark_filter.rs | 156 ++++++++++-------- src/stream/src/from_proto/chain.rs | 1 + src/stream/src/from_proto/watermark_filter.rs | 1 + 5 files changed, 94 insertions(+), 69 deletions(-) diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index f145a3a46e28..4bbb5e1d8388 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -21,7 +21,7 @@ statement ok insert into t values ('2023-05-06 16:56:01', 1); skipif in-memory -sleep 5s +sleep 10s skipif in-memory query TI diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index f796ff0dcb69..b6241acdea56 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -104,12 +104,13 @@ where pk_indices: PkIndices, metrics: Arc, chunk_size: usize, + executor_id: u64, ) -> Self { Self { info: ExecutorInfo { schema, pk_indices, - identity: "BackfillExecutor".to_owned(), + identity: format!("BackfillExecutor {:X}", executor_id), }, upstream_table, upstream, diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 5f2a92e4e7f7..7b6a1e47f28f 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -58,8 +58,13 @@ impl WatermarkFilterExecutor { event_time_col_idx: usize, ctx: ActorContextRef, table: StateTable, + executor_id: u64, ) -> Self { - let info = input.info(); + let info = ExecutorInfo { + schema: input.info().schema, + pk_indices: input.info().pk_indices, + identity: format!("WatermarkFilterExecutor {:X}", executor_id), + }; Self { input, @@ -119,16 +124,17 @@ impl WatermarkFilterExecutor { yield Message::Barrier(first_barrier); // Initiate and yield the first watermark. - let mut current_watermark = - Self::get_global_max_watermark(&table, watermark_type.clone()).await?; + let mut current_watermark = Self::get_global_max_watermark(&table).await?; - let mut last_checkpoint_watermark = watermark_type.min_value(); + let mut last_checkpoint_watermark = None; - yield Message::Watermark(Watermark::new( - event_time_col_idx, - watermark_type.clone(), - current_watermark.clone(), - )); + if let Some(watermark) = current_watermark.clone() { + yield Message::Watermark(Watermark::new( + event_time_col_idx, + watermark_type.clone(), + watermark.clone(), + )); + } // If the input is idle let mut idle_input = true; @@ -152,11 +158,16 @@ impl WatermarkFilterExecutor { .await; // Build the expression to calculate watermark filter. - let watermark_filter_expr = Self::build_watermark_filter_expr( - watermark_type.clone(), - event_time_col_idx, - current_watermark.clone(), - )?; + let watermark_filter_expr = current_watermark + .clone() + .map(|watermark| { + Self::build_watermark_filter_expr( + watermark_type.clone(), + event_time_col_idx, + watermark, + ) + }) + .transpose()?; // NULL watermark should not be considered. let max_watermark = watermark_array @@ -166,41 +177,53 @@ impl WatermarkFilterExecutor { if let Some(max_watermark) = max_watermark { // Assign a new watermark. - current_watermark = cmp::max_by( - current_watermark, + current_watermark = Some(current_watermark.map_or( max_watermark.into_scalar_impl(), - DefaultOrd::default_cmp, - ); + |watermark| { + cmp::max_by( + watermark, + max_watermark.into_scalar_impl(), + DefaultOrd::default_cmp, + ) + }, + )); } - let pred_output = watermark_filter_expr - .eval_infallible(chunk.data_chunk(), |err| { - ctx.on_compute_error(err, &info.identity) - }) - .await; + if let Some(expr) = watermark_filter_expr { + let pred_output = expr + .eval_infallible(chunk.data_chunk(), |err| { + ctx.on_compute_error(err, &info.identity) + }) + .await; - if let Some(output_chunk) = FilterExecutor::filter(chunk, pred_output)? { - yield Message::Chunk(output_chunk); - }; + if let Some(output_chunk) = FilterExecutor::filter(chunk, pred_output)? { + yield Message::Chunk(output_chunk); + }; + } else { + // No watermark + yield Message::Chunk(chunk); + } - idle_input = false; - yield Message::Watermark(Watermark::new( - event_time_col_idx, - watermark_type.clone(), - current_watermark.clone(), - )); + if let Some(watermark) = current_watermark.clone() { + idle_input = false; + yield Message::Watermark(Watermark::new( + event_time_col_idx, + watermark_type.clone(), + watermark, + )); + } } Message::Watermark(watermark) => { if watermark.col_idx == event_time_col_idx { tracing::warn!("WatermarkFilterExecutor received a watermark on the event it is filtering."); let watermark = watermark.val; - if current_watermark.default_cmp(&watermark).is_lt() { - current_watermark = watermark; + if let Some(cur_watermark) = current_watermark.clone() && cur_watermark.default_cmp(&watermark).is_lt() { + current_watermark = Some(watermark.clone()); idle_input = false; yield Message::Watermark(Watermark::new( event_time_col_idx, watermark_type.clone(), - current_watermark.clone(), + watermark, )); } } else { @@ -215,9 +238,7 @@ impl WatermarkFilterExecutor { // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap { - current_watermark = - Self::get_global_max_watermark(&table, watermark_type.clone()) - .await?; + current_watermark = Self::get_global_max_watermark(&table).await?; } } @@ -226,12 +247,14 @@ impl WatermarkFilterExecutor { { last_checkpoint_watermark = current_watermark.clone(); // Persist the watermark when checkpoint arrives. - let vnodes = table.get_vnodes(); - for vnode in vnodes.iter_vnodes() { - let pk = Some(ScalarImpl::Int16(vnode.to_scalar())); - let row = [pk, Some(current_watermark.clone())]; - // FIXME(yuhao): use upsert. - table.insert(row); + if let Some(watermark) = current_watermark.clone() { + let vnodes = table.get_vnodes(); + for vnode in vnodes.iter_vnodes() { + let pk = Some(ScalarImpl::Int16(vnode.to_scalar())); + let row = [pk, Some(watermark.clone())]; + // This is an upsert. + table.insert(row); + } } table.commit(barrier.epoch).await?; } else { @@ -242,18 +265,24 @@ impl WatermarkFilterExecutor { if idle_input { // Align watermark let global_max_watermark = - Self::get_global_max_watermark(&table, watermark_type.clone()) - .await?; - current_watermark = cmp::max_by( - current_watermark, - global_max_watermark, - DefaultOrd::default_cmp, - ); - yield Message::Watermark(Watermark::new( - event_time_col_idx, - watermark_type.clone(), - current_watermark.clone(), - )); + Self::get_global_max_watermark(&table).await?; + + current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() && let Some(watermark) = current_watermark.clone(){ + Some(cmp::max_by( + watermark, + global_max_watermark, + DefaultOrd::default_cmp, + )) + } else { + current_watermark.or(global_max_watermark) + }; + if let Some(watermark) = current_watermark.clone() { + yield Message::Watermark(Watermark::new( + event_time_col_idx, + watermark_type.clone(), + watermark, + )); + } } else { idle_input = true; } @@ -280,10 +309,10 @@ impl WatermarkFilterExecutor { ) } + /// If the returned if `Ok(None)`, it means there is no global max watermark. async fn get_global_max_watermark( table: &StateTable, - watermark_type: DataType, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult> { let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move { let pk = row::once(Some(ScalarImpl::Int16(vnode as _))); let watermark_row: Option = table.get_row(pk).await?; @@ -307,8 +336,7 @@ impl WatermarkFilterExecutor { let watermark = watermarks .into_iter() .flatten() - .max_by(DefaultOrd::default_cmp) - .unwrap_or_else(|| watermark_type.min_value()); + .max_by(DefaultOrd::default_cmp); Ok(watermark) } @@ -389,6 +417,7 @@ mod tests { 1, ActorContext::create(123), table, + 0, ) .boxed(), tx, @@ -431,13 +460,6 @@ mod tests { }; } - // Init watermark - let watermark = executor.next().await.unwrap().unwrap(); - assert_eq!( - watermark.into_watermark().unwrap(), - watermark!(WATERMARK_TYPE.min_value()), - ); - // push the 1st chunk tx.push_chunk(chunk1); let chunk = executor.next().await.unwrap().unwrap(); diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 667772fcfdd6..81030526b82f 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -172,6 +172,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { params.pk_indices, stream.streaming_metrics.clone(), params.env.config().developer.chunk_size, + params.executor_id, ) .boxed() } diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index 84b39288c704..62607e053b22 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -55,6 +55,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { event_time_col_idx, params.actor_context, table, + params.executor_id, ) .boxed()) }