Skip to content

Commit

Permalink
refactor(stream): improve arrangement backfill logs (#16860)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored May 21, 2024
1 parent 43f78cd commit 0d421ed
Showing 1 changed file with 4 additions and 20 deletions.
24 changes: 4 additions & 20 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
tracing::debug!("Arrangement Backfill Executor started");
tracing::debug!("backfill executor started");
// The primary key columns, in the output columns of the upstream_table scan.
// Table scan scans a subset of the columns of the upstream table.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
Expand Down Expand Up @@ -486,7 +486,6 @@ where
.await?;

tracing::trace!(
actor = self.actor_id,
barrier = ?barrier,
"barrier persisted"
);
Expand All @@ -508,9 +507,8 @@ where
if new_rate_limit != rate_limit {
rate_limit = new_rate_limit;
tracing::info!(
id = self.actor_id,
new_rate_limit = ?rate_limit,
"actor rate limit changed",
"rate limit changed",
);
// The builder is emptied above via `DataChunkBuilder::consume_all`.
for (_, builder) in builders {
Expand Down Expand Up @@ -549,22 +547,14 @@ where
}
}

tracing::trace!(
actor = self.actor_id,
"Arrangement Backfill has finished and forward messages directly to the downstream"
);
tracing::debug!("snapshot read finished, wait to commit state on next barrier");

// Update our progress as finished in state table.

// Wait for first barrier to come after backfill is finished.
// So we can update our progress + persist the status.
while let Some(Ok(msg)) = upstream.next().await {
if let Some(msg) = mapping_message(msg, &self.output_indices) {
tracing::trace!(
actor = self.actor_id,
message = ?msg,
"backfill_finished_wait_for_barrier"
);
// If not finished then we need to update state, otherwise no need.
if let Message::Barrier(barrier) = &msg {
if is_completely_finished {
Expand Down Expand Up @@ -594,10 +584,6 @@ where

self.progress
.finish(barrier.epoch.curr, total_snapshot_processed_rows);
tracing::trace!(
epoch = ?barrier.epoch,
"Updated CreateMaterializedTracker"
);
yield msg;
break;
}
Expand All @@ -608,9 +594,7 @@ where
}
}

tracing::trace!(
"Arrangement Backfill has already finished and forward messages directly to the downstream"
);
tracing::debug!("backfill finished");

// After progress finished + state persisted,
// we can forward messages directly to the downstream,
Expand Down

0 comments on commit 0d421ed

Please sign in to comment.