diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index c69992fb679e1..7fc4cfa27e553 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -333,30 +333,12 @@ where break; } else { // Process barrier: - // - consume upstream buffer chunk // - switch snapshot - - // Consume upstream buffer chunk - // If no current_pos, means we did not process any snapshot - // yet. In that case - // we can just ignore the upstream buffer chunk, but still need to clean it. - if let Some(current_pos) = ¤t_pos { - for chunk in upstream_chunk_buffer.drain(..) { - cur_barrier_upstream_processed_rows += - chunk.cardinality() as u64; - yield Message::Chunk(mapping_chunk( - mark_chunk( - chunk, - current_pos, - &pk_in_output_indices, - pk_order, - ), - &self.output_indices, - )); - } - } else { - upstream_chunk_buffer.clear() - } + // Upstream updates should only be read after the + // Nth barrier. + // Otherwise they would get filtered out, + // if they are larger than current pos, + // and we will lose them. self.metrics .backfill_snapshot_read_row_count @@ -366,14 +348,6 @@ where ]) .inc_by(cur_barrier_snapshot_processed_rows); - self.metrics - .backfill_upstream_output_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), - ]) - .inc_by(cur_barrier_upstream_processed_rows); - // Update snapshot read epoch. snapshot_read_epoch = barrier.epoch.prev;