Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backfill): make tombstone iteration progress across all vnodes per epoch #17266

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1621f45
add frontend changes
kwannoel Jun 14, 2024
493a47e
decode pk_inclusive flag, add it to in-memory progress
kwannoel Jun 14, 2024
38239ab
check inclusive flag when computing bounds
kwannoel Jun 14, 2024
157a6cf
rename pk_inclusive to yielded + include logic when considering updat…
kwannoel Jun 14, 2024
8463b7b
handle state encoding and committing
kwannoel Jun 14, 2024
381e81c
assert that all pk must be yielded downstream before marking as finished
kwannoel Jun 14, 2024
51b9966
iterate at least 1 record per vnode
kwannoel Jun 14, 2024
0be7fde
add append one row
kwannoel Jun 14, 2024
f7d3973
handle backwards compat
kwannoel Jun 15, 2024
3248c73
convert to mutable bitset instead
kwannoel Jun 15, 2024
519db4c
fix no shuffle should be exclusive
kwannoel Jun 15, 2024
d5260b4
no need to assert current pos is yielded
kwannoel Jun 15, 2024
4955a52
fix
kwannoel Jun 15, 2024
e5529ed
expose missing records
kwannoel Jun 16, 2024
2934095
check if yield + no bump position is causing the error
kwannoel Jun 16, 2024
a31f61f
bump timeout
kwannoel Jun 16, 2024
5618d48
revert + skip snapshot read for completed partition
kwannoel Jun 16, 2024
c552c97
cannot finish progress
kwannoel Jun 16, 2024
0072373
Revert "cannot finish progress"
kwannoel Jun 16, 2024
f51e190
optimize
kwannoel Jun 16, 2024
d79205f
fix
kwannoel Jun 16, 2024
cbb96e1
fix
kwannoel Jun 17, 2024
36d0539
remove skip buffer
kwannoel Jun 17, 2024
f91cb57
no need to yield
kwannoel Jun 17, 2024
db6dbb8
just do a snapshot read
kwannoel Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ test_snapshot_and_upstream_read() {

# Lots of upstream tombstone, backfill should still proceed.
test_backfill_tombstone() {
echo "--- e2e, test_backfill_tombstone"
echo "--- e2e, test_backfill_tombstone, streaming_use_arrangement_backfill=$1"
risedev ci-start $CLUSTER_PROFILE
risedev psql -c "
CREATE TABLE tomb (v1 int)
Expand All @@ -151,7 +151,7 @@ test_backfill_tombstone() {
done
' 1>deletes.log 2>&1 &

risedev psql -c "CREATE MATERIALIZED VIEW m1 as select * from tomb;"
risedev psql -c "SET STREAMING_USE_ARRANGEMENT_BACKFILL=$1; CREATE MATERIALIZED VIEW m1 as select * from tomb;"
echo "--- Kill cluster"
kill_cluster
wait
Expand Down Expand Up @@ -284,7 +284,8 @@ test_backfill_snapshot_with_wider_rows() {
main() {
set -euo pipefail
test_snapshot_and_upstream_read
test_backfill_tombstone
test_backfill_tombstone "true"
test_backfill_tombstone "false"
test_replication_with_column_pruning
test_sink_backfill_recovery

Expand Down
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 22
timeout_in_minutes: 35
retry: *auto-retry

- label: "e2e standalone binary test"
Expand Down Expand Up @@ -872,7 +872,7 @@ steps:
key: "e2e-sqlserver-sink-tests"
command: "ci/scripts/e2e-sqlserver-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-sqlserver-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-sqlserver-sink-tests?(,|$$)/
depends_on:
Expand Down
7 changes: 5 additions & 2 deletions e2e_test/backfill/runtime/validate_rows_arrangement.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
query I
select (select count(*) from arrangement_backfill) = (select count(*) from t);
select v1 from arrangement_backfill where v1 not in (select v1 from t);
----
t

query I
select v1 from t where v1 not in (select v1 from arrangement_backfill);
----
7 changes: 7 additions & 0 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
use std::iter::{self, TrustedLen};
use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, RangeInclusive};

use fixedbitset::FixedBitSet;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::PbBuffer;
Expand Down Expand Up @@ -460,6 +461,12 @@ impl Bitmap {
count_ones: range.len(),
}
}

/// Used to convert to a mutable bitmap.
pub fn to_fixed_bitset(&self) -> FixedBitSet {
let iter = self.iter_ones();
FixedBitSet::from_iter(iter)
}
}

impl From<usize> for Bitmap {
Expand Down
32 changes: 16 additions & 16 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,19 @@ impl StreamTableScan {

/// Build catalog for backfill state
///
/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
/// Schema: | vnode | pk ... | `yielded` | `backfill_finished` | `row_count` |
///
/// key: | vnode |
/// value: | pk ... | `backfill_finished` | `row_count` |
/// value: | pk ... | `yielded` | `backfill_finished` | `row_count` |
///
/// When we update the backfill progress,
/// we update it for all vnodes.
///
/// `pk` refers to the upstream pk which we use to track the backfill progress.
///
/// `yielded` is a boolean which indicates if the backfill has also yielded this pk,
/// OR it just maintained the progress up to this pk, but we can still yield it.
///
/// `vnode` is the corresponding vnode of the upstream's distribution key.
/// It should also match the vnode of the backfill executor.
///
Expand All @@ -140,21 +143,10 @@ impl StreamTableScan {
/// `row_count` is a count of rows which indicates the # of rows per executor.
/// We used to track this in memory.
/// But for backfill persistence we have to also persist it.
///
/// FIXME(kwannoel):
/// - Across all vnodes, the values are the same.
/// - e.g. | vnode | pk ... | `backfill_finished` | `row_count` |
/// | 1002 | Int64(1) | t | 10 |
/// | 1003 | Int64(1) | t | 10 |
/// | 1003 | Int64(1) | t | 10 |
///
/// Eventually we should track progress per vnode, to support scaling with both mview and
/// the corresponding `no_shuffle_backfill`.
/// However this is not high priority, since we are working on supporting arrangement backfill,
/// which already has this capability.
pub fn build_backfill_state_catalog(
&self,
state: &mut BuildFragmentGraphState,
is_arrangement_backfill: bool,
) -> TableCatalog {
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();
Expand All @@ -170,6 +162,12 @@ impl StreamTableScan {
catalog_builder.add_column(&Field::from(col));
}

// `yielded` column
// Only present for arrangement backfill.
if is_arrangement_backfill {
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "yielded"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure backward compatibility after adding this new column? In other words, is it possible that an arrangement backfill is triggered in old version and resume in new version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I will handle it later.

Copy link
Contributor Author

@kwannoel kwannoel Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in 66f44cb.

We only need to handle the deserialization path. Because backfill jobs should fall into one of 3 categories:

  1. Finished -> They won't write again to state table, no need to handle encoding path. Need to still read from state table, so must handle decoding path.
  2. Not finished -> They will be cleaned after upgrade, no need to handle backwards compat at all.
  3. Background ddl -> They won't be cleaned upgrade, and will panic. We can let users recreate these jobs. I think it's unlikely to encounter this since stream job creation should not take a long time. But if they do encounter, recreate stream job is an easy solution.

}

// `backfill_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));

Expand Down Expand Up @@ -288,8 +286,10 @@ impl StreamTableScan {
column_ids: upstream_column_ids.clone(),
};

let is_arrangement_backfill = self.stream_scan_type == StreamScanType::ArrangementBackfill;

let catalog = self
.build_backfill_state_catalog(state)
.build_backfill_state_catalog(state, is_arrangement_backfill)
.to_internal_table_prost();

// For backfill, we first read pk + output_indices from upstream.
Expand All @@ -307,7 +307,7 @@ impl StreamTableScan {
})
.collect_vec();

let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
let arrangement_table = if is_arrangement_backfill {
let upstream_table_catalog = self.get_upstream_state_table();
Some(upstream_table_catalog.to_internal_table_prost())
} else {
Expand Down
103 changes: 79 additions & 24 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use either::Either;
Expand All @@ -21,6 +20,7 @@ use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bail;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::RowExt;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
Expand Down Expand Up @@ -109,6 +109,7 @@ where
let upstream_table_id = self.upstream_table.table_id();
let mut upstream_table = self.upstream_table;
let vnodes = upstream_table.vnodes().clone();
let vnode_bitset = vnodes.to_fixed_bitset();
let mut rate_limit = self.rate_limit;

// These builders will build data chunks.
Expand Down Expand Up @@ -139,7 +140,8 @@ where
let first_epoch = first_barrier.epoch;
self.state_table.init_epoch(first_barrier.epoch);

let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
let progress_per_vnode =
get_progress_per_vnode(&self.state_table, pk_in_output_indices.len()).await?;

let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
matches!(
Expand Down Expand Up @@ -344,6 +346,12 @@ where
if !has_snapshot_read && !paused && rate_limit_ready {
debug_assert!(builders.values().all(|b| b.is_empty()));
let (_, snapshot) = backfill_stream.into_inner();
let mut remaining_vnodes = vnode_bitset.clone();
// Records which were purely used to update the current pos,
// to persist the tombstone iteration progress.
// They need to be buffered in case the snapshot read completes,
// Then we must yield them downstream.

#[for_await]
for msg in snapshot {
let Either::Right(msg) = msg else {
Expand All @@ -358,20 +366,31 @@ where
break;
}
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
// FIXME(kwannoel):
// Perhaps we should introduce a custom stream combinator,
// so we can iterate on individual streams here.
// Otherwise here we will continue iterating across all vnodes,
// even if some vnode is already done.
// We could actually drop the snapshot iteration stream
// for that vnode once we have read at least 1 record from it.
let vnode_idx = vnode.to_index();
if remaining_vnodes.contains(vnode_idx) {
let new_pos = row.project(&pk_in_output_indices);
assert_eq!(new_pos.len(), pk_in_output_indices.len());
backfill_state.update_progress(
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
new_pos.to_owned_row(),
false,
0,
)?;

remaining_vnodes.set(vnode_idx, false);
if remaining_vnodes.is_empty() {
break;
}
} else {
continue;
}

break;
}
}
}
Expand All @@ -388,11 +407,45 @@ where
};

// Process barrier:
// - snapshot read if we forced tombstone iteration progress.
// - consume snapshot rows left in builder.
// - consume upstream buffer chunk
// - handle mutations
// - switch snapshot

// snapshot read if we forced tombstone iteration progress.
if snapshot_read_complete {
let snapshot = Self::make_snapshot_stream(
&upstream_table,
backfill_state.clone(),
paused,
&rate_limiter,
);
#[for_await]
for msg in snapshot {
let msg = msg?;
match msg {
None => {
break;
}
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
}
}
}
}
}

// consume snapshot rows left in builder.
// NOTE(kwannoel): `zip_eq_debug` does not work here,
// we encounter "higher-ranked lifetime error".
Expand Down Expand Up @@ -562,7 +615,7 @@ where
// (there's no epoch before the first epoch).
for vnode in upstream_table.vnodes().iter_vnodes() {
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
.finish_progress(vnode, upstream_table.pk_indices().len())?;
}

persist_state_per_vnode(
Expand Down Expand Up @@ -674,9 +727,6 @@ where
/// The `StreamChunk` is the chunk that contains the rows from the vnode.
/// If it's `None`, it means the vnode has no more rows for this snapshot read.
///
/// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
/// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
///
/// The rows from upstream snapshot read will be buffered inside the `builder`.
/// If snapshot is dropped before its rows are consumed,
/// remaining data in `builder` must be flushed manually.
Expand All @@ -690,15 +740,20 @@ where
let mut iterators = vec![];
for vnode in upstream_table.vnodes().iter_vnodes() {
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
BackfillProgressPerVnode::Completed { current_pos, .. }
| BackfillProgressPerVnode::InProgress { current_pos, .. } => {
Some(current_pos.clone())
let (current_pos, exclusive) = match backfill_progress {
BackfillProgressPerVnode::NotStarted => (None, false),
BackfillProgressPerVnode::Completed { .. } => {
continue;
}
BackfillProgressPerVnode::InProgress {
current_pos,
yielded,
..
} => (Some(current_pos.clone()), *yielded),
};

let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
let range_bounds =
compute_bounds(upstream_table.pk_indices(), exclusive, current_pos.clone());
if range_bounds.is_none() {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ where
epoch: u64,
current_pos: Option<OwnedRow>,
) {
let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
let range_bounds = compute_bounds(upstream_table.pk_indices(), true, current_pos);
let range_bounds = match range_bounds {
None => {
yield None;
Expand Down
Loading
Loading