Skip to content

Commit

Permalink
refactor(watermark): avoid sending min watermark (risingwavelabs#12462)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Sep 22, 2023
1 parent 4fb087c commit 063f58e
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 69 deletions.
2 changes: 1 addition & 1 deletion e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
insert into t values ('2023-05-06 16:56:01', 1);

skipif in-memory
sleep 5s
sleep 10s

skipif in-memory
query TI
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ where
pk_indices: PkIndices,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
executor_id: u64,
) -> Self {
Self {
info: ExecutorInfo {
schema,
pk_indices,
identity: "BackfillExecutor".to_owned(),
identity: format!("BackfillExecutor {:X}", executor_id),
},
upstream_table,
upstream,
Expand Down
156 changes: 89 additions & 67 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
event_time_col_idx: usize,
ctx: ActorContextRef,
table: StateTable<S>,
executor_id: u64,
) -> Self {
let info = input.info();
let info = ExecutorInfo {
schema: input.info().schema,
pk_indices: input.info().pk_indices,
identity: format!("WatermarkFilterExecutor {:X}", executor_id),
};

Self {
input,
Expand Down Expand Up @@ -119,16 +124,17 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
yield Message::Barrier(first_barrier);

// Initiate and yield the first watermark.
let mut current_watermark =
Self::get_global_max_watermark(&table, watermark_type.clone()).await?;
let mut current_watermark = Self::get_global_max_watermark(&table).await?;

let mut last_checkpoint_watermark = watermark_type.min_value();
let mut last_checkpoint_watermark = None;

yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
current_watermark.clone(),
));
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark.clone(),
));
}

// If the input is idle
let mut idle_input = true;
Expand All @@ -152,11 +158,16 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
.await;

// Build the expression to calculate watermark filter.
let watermark_filter_expr = Self::build_watermark_filter_expr(
watermark_type.clone(),
event_time_col_idx,
current_watermark.clone(),
)?;
let watermark_filter_expr = current_watermark
.clone()
.map(|watermark| {
Self::build_watermark_filter_expr(
watermark_type.clone(),
event_time_col_idx,
watermark,
)
})
.transpose()?;

// NULL watermark should not be considered.
let max_watermark = watermark_array
Expand All @@ -166,41 +177,53 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

if let Some(max_watermark) = max_watermark {
// Assign a new watermark.
current_watermark = cmp::max_by(
current_watermark,
current_watermark = Some(current_watermark.map_or(
max_watermark.into_scalar_impl(),
DefaultOrd::default_cmp,
);
|watermark| {
cmp::max_by(
watermark,
max_watermark.into_scalar_impl(),
DefaultOrd::default_cmp,
)
},
));
}

let pred_output = watermark_filter_expr
.eval_infallible(chunk.data_chunk(), |err| {
ctx.on_compute_error(err, &info.identity)
})
.await;
if let Some(expr) = watermark_filter_expr {
let pred_output = expr
.eval_infallible(chunk.data_chunk(), |err| {
ctx.on_compute_error(err, &info.identity)
})
.await;

if let Some(output_chunk) = FilterExecutor::filter(chunk, pred_output)? {
yield Message::Chunk(output_chunk);
};
if let Some(output_chunk) = FilterExecutor::filter(chunk, pred_output)? {
yield Message::Chunk(output_chunk);
};
} else {
// No watermark
yield Message::Chunk(chunk);
}

idle_input = false;
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
current_watermark.clone(),
));
if let Some(watermark) = current_watermark.clone() {
idle_input = false;
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
}
}
Message::Watermark(watermark) => {
if watermark.col_idx == event_time_col_idx {
tracing::warn!("WatermarkFilterExecutor received a watermark on the event it is filtering.");
let watermark = watermark.val;
if current_watermark.default_cmp(&watermark).is_lt() {
current_watermark = watermark;
if let Some(cur_watermark) = current_watermark.clone() && cur_watermark.default_cmp(&watermark).is_lt() {
current_watermark = Some(watermark.clone());
idle_input = false;
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
current_watermark.clone(),
watermark,
));
}
} else {
Expand All @@ -215,9 +238,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// Take the global max watermark when scaling happens.
if previous_vnode_bitmap != vnode_bitmap {
current_watermark =
Self::get_global_max_watermark(&table, watermark_type.clone())
.await?;
current_watermark = Self::get_global_max_watermark(&table).await?;
}
}

Expand All @@ -226,12 +247,14 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
{
last_checkpoint_watermark = current_watermark.clone();
// Persist the watermark when checkpoint arrives.
let vnodes = table.get_vnodes();
for vnode in vnodes.iter_vnodes() {
let pk = Some(ScalarImpl::Int16(vnode.to_scalar()));
let row = [pk, Some(current_watermark.clone())];
// FIXME(yuhao): use upsert.
table.insert(row);
if let Some(watermark) = current_watermark.clone() {
let vnodes = table.get_vnodes();
for vnode in vnodes.iter_vnodes() {
let pk = Some(ScalarImpl::Int16(vnode.to_scalar()));
let row = [pk, Some(watermark.clone())];
// This is an upsert.
table.insert(row);
}
}
table.commit(barrier.epoch).await?;
} else {
Expand All @@ -242,18 +265,24 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
if idle_input {
// Align watermark
let global_max_watermark =
Self::get_global_max_watermark(&table, watermark_type.clone())
.await?;
current_watermark = cmp::max_by(
current_watermark,
global_max_watermark,
DefaultOrd::default_cmp,
);
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
current_watermark.clone(),
));
Self::get_global_max_watermark(&table).await?;

current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() && let Some(watermark) = current_watermark.clone(){
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
}
} else {
idle_input = true;
}
Expand All @@ -280,10 +309,10 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
)
}

/// If the returned if `Ok(None)`, it means there is no global max watermark.
async fn get_global_max_watermark(
table: &StateTable<S>,
watermark_type: DataType,
) -> StreamExecutorResult<ScalarImpl> {
) -> StreamExecutorResult<Option<ScalarImpl>> {
let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move {
let pk = row::once(Some(ScalarImpl::Int16(vnode as _)));
let watermark_row: Option<OwnedRow> = table.get_row(pk).await?;
Expand All @@ -307,8 +336,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
let watermark = watermarks
.into_iter()
.flatten()
.max_by(DefaultOrd::default_cmp)
.unwrap_or_else(|| watermark_type.min_value());
.max_by(DefaultOrd::default_cmp);

Ok(watermark)
}
Expand Down Expand Up @@ -389,6 +417,7 @@ mod tests {
1,
ActorContext::create(123),
table,
0,
)
.boxed(),
tx,
Expand Down Expand Up @@ -431,13 +460,6 @@ mod tests {
};
}

// Init watermark
let watermark = executor.next().await.unwrap().unwrap();
assert_eq!(
watermark.into_watermark().unwrap(),
watermark!(WATERMARK_TYPE.min_value()),
);

// push the 1st chunk
tx.push_chunk(chunk1);
let chunk = executor.next().await.unwrap().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ExecutorBuilder for ChainExecutorBuilder {
params.pk_indices,
stream.streaming_metrics.clone(),
params.env.config().developer.chunk_size,
params.executor_id,
)
.boxed()
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder {
event_time_col_idx,
params.actor_context,
table,
params.executor_id,
)
.boxed())
}
Expand Down

0 comments on commit 063f58e

Please sign in to comment.