Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support row count for arrangement backfill #14836

Merged
merged 12 commits into from
Jan 30, 2024
2 changes: 1 addition & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ profile:
steps:
- use: minio
api-requests-max: 30
api-requests-deadline: 2s
api-requests-deadline: 3s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
Expand Down
18 changes: 11 additions & 7 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
let mut snapshot_read_epoch;

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 = 0;
let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();

// Arrangement Backfill Algorithm:
//
Expand Down Expand Up @@ -278,9 +278,8 @@ where
// mark.
for chunk in upstream_chunk_buffer.drain(..) {
let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows +=
cur_barrier_upstream_processed_rows +=
chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
Expand All @@ -290,6 +289,8 @@ where
break 'backfill_loop;
}
Some((vnode, chunk)) => {
let chunk_cardinality = chunk.cardinality() as u64;

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand All @@ -298,9 +299,9 @@ where
&chunk,
&pk_in_output_indices,
&mut backfill_state,
chunk_cardinality,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(
Expand Down Expand Up @@ -354,6 +355,7 @@ where
})
})) {
if let Some(chunk) = chunk {
let chunk_cardinality = chunk.cardinality() as u64;
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand All @@ -362,9 +364,9 @@ where
&chunk,
&pk_in_output_indices,
&mut backfill_state,
chunk_cardinality,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
Expand Down Expand Up @@ -585,8 +587,10 @@ where
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
BackfillProgressPerVnode::Completed(current_pos)
| BackfillProgressPerVnode::InProgress(current_pos) => Some(current_pos.clone()),
BackfillProgressPerVnode::Completed { current_pos, .. }
| BackfillProgressPerVnode::InProgress { current_pos, .. } => {
Some(current_pos.clone())
}
};

let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
Expand Down
Loading
Loading