Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): recover no_shuffle_backfill #12493

Merged
merged 9 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
# Hence keeping it in case we ever need to debug backfill again.

# USAGE:
# Start a rw cluster then run this script.
# ```sh
# ./risedev d
# cargo make ci-start ci-backfill
# ./ci/scripts/run-backfill-tests.sh
# ```

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/sql/backfill/insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ insert into t1
SELECT
generate_series,
'{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}'::jsonb
FROM generate_series(1, 200000);
FROM generate_series(1, 100000);
FLUSH;
11 changes: 9 additions & 2 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,17 @@ impl CreateMviewProgressTracker {
) -> Option<TrackingCommand> {
let actor = progress.chain_actor_id;
let Some(epoch) = self.actor_map.get(&actor).copied() else {
panic!(
"no tracked progress for actor {}, is it already finished?",
// On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
// even if backfill is finished on recovery.
// This is because we don't know if only this actor is finished,
// OR the entire stream job is finished.
// For the first case, we must notify meta.
// For the second case, we can still notify meta, but ignore it here.
tracing::info!(
"no tracked progress for actor {}, the stream job could already be finished",
actor
);
return None;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
};

let new_state = if progress.done {
Expand Down
223 changes: 146 additions & 77 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::row::OwnedRow;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::Datum;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::EpochPair;
Expand All @@ -34,8 +35,8 @@ use risingwave_storage::StateStore;
use crate::common::table::state_table::StateTable;
use crate::executor::backfill::utils;
use crate::executor::backfill::utils::{
check_all_vnode_finished, compute_bounds, construct_initial_finished_state, get_new_pos,
get_row_count_state, iter_chunks, mapping_chunk, mapping_message, mark_chunk, owned_row_iter,
compute_bounds, construct_initial_finished_state, get_new_pos, iter_chunks, mapping_chunk,
mapping_message, mark_chunk, owned_row_iter,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand All @@ -44,6 +45,19 @@ use crate::executor::{
};
use crate::task::{ActorId, CreateMviewProgress};

/// vnode, `is_finished`, `row_count`, all occupy 1 column each.
const METADATA_STATE_LEN: usize = 3;

/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
/// We can decode that into `BackfillState` on recovery.
#[derive(Debug, Eq, PartialEq)]
pub struct BackfillState {
current_pos: Option<OwnedRow>,
old_state: Option<Vec<Datum>>,
is_finished: bool,
row_count: u64,
}

/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
/// `BackfillExecutor` is used to create a materialized view on another materialized view.
///
Expand Down Expand Up @@ -128,9 +142,7 @@ where
// The primary key columns, in the output columns of the upstream_table scan.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();

// schema: | vnode | pk ... | backfill_finished | row_count |
// +1 for vnode, +1 for backfill_finished, +1 for row_count.
let state_len = pk_in_output_indices.len() + 3;
let state_len = pk_in_output_indices.len() + METADATA_STATE_LEN;

let pk_order = self.upstream_table.pk_serializer().get_order_types();

Expand All @@ -145,16 +157,13 @@ where
state_table.init_epoch(first_barrier.epoch);
}

let is_finished = if let Some(state_table) = self.state_table.as_ref() {
let is_finished = check_all_vnode_finished(state_table, state_len).await?;
if is_finished {
assert!(!first_barrier.is_newly_added(self.actor_id));
}
is_finished
} else {
// Maintain backwards compatibility with no state table
!first_barrier.is_newly_added(self.actor_id)
};
let BackfillState {
mut current_pos,
is_finished,
row_count,
mut old_state,
} = Self::recover_backfill_state(self.state_table.as_ref(), pk_in_output_indices.len())
.await?;

let mut builder =
DataChunkBuilder::new(self.upstream_table.schema().data_types(), self.chunk_size);
Expand Down Expand Up @@ -191,17 +200,9 @@ where
// | f | f | t |
let to_backfill = !is_finished && !is_snapshot_empty;

// Current position of the upstream_table storage primary key.
// `None` means it starts from the beginning.
let mut current_pos: Option<OwnedRow> = None;

// Use these to persist state.
// They contain the backfill position,
// as well as the progress.
// However, they do not contain the vnode key at index 0.
// That is filled in when we flush the state table.
// Use this buffer to construct state,
// which will then be persisted.
let mut current_state: Vec<Datum> = vec![None; state_len];
let mut old_state: Option<Vec<Datum>> = None;

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
Expand All @@ -220,13 +221,7 @@ where
let mut snapshot_read_epoch = init_epoch;

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 =
if let Some(state_table) = self.state_table.as_ref() {
get_row_count_state(state_table, state_len).await?
} else {
// Maintain backwards compatibility with no state_table.
0
};
let mut total_snapshot_processed_rows: u64 = row_count;

// Backfill Algorithm:
//
Expand Down Expand Up @@ -415,13 +410,6 @@ where
total_snapshot_processed_rows,
);

tracing::trace!(
actor = self.actor_id,
epoch = ?barrier.epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill position persisted"
);
// Persist state on barrier
Self::persist_state(
barrier.epoch,
Expand All @@ -434,64 +422,91 @@ where
)
.await?;

tracing::trace!(
epoch = ?barrier.epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill state persisted"
);

yield Message::Barrier(barrier);

// We will switch snapshot at the start of the next iteration of the backfill loop.
}
}

tracing::trace!(
actor = self.actor_id,
"Backfill has already finished and forward messages directly to the downstream"
);
tracing::trace!("Backfill has finished, waiting for barrier");

// Wait for first barrier to come after backfill is finished.
// So we can update our progress + persist the status.
while let Some(Ok(msg)) = upstream.next().await {
if let Some(msg) = mapping_message(msg, &self.output_indices) {
// If not finished then we need to update state, otherwise no need.
if let Message::Barrier(barrier) = &msg && !is_finished {
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
if is_snapshot_empty {
current_pos =
Some(construct_initial_finished_state(pk_in_output_indices.len()))
}
if let Message::Barrier(barrier) = &msg {
if is_finished {
// In the event that some actors already finished,
// on recovery we want to recover the backfill progress for them too,
// so that we can provide an accurate estimate.
// so we call that here, before finally completing the backfill progress.
self.progress.update(
barrier.epoch.curr,
snapshot_read_epoch,
total_snapshot_processed_rows,
);
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
} else {
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
if is_snapshot_empty {
current_pos =
Some(construct_initial_finished_state(pk_in_output_indices.len()))
}

// We will update current_pos at least once,
// since snapshot read has to be non-empty,
// Or snapshot was empty and we construct a placeholder state.
debug_assert_ne!(current_pos, None);
// We will update current_pos at least once,
// since snapshot read has to be non-empty,
// Or snapshot was empty and we construct a placeholder state.
debug_assert_ne!(current_pos, None);

Self::persist_state(
barrier.epoch,
&mut self.state_table,
true,
&current_pos,
total_snapshot_processed_rows,
&mut old_state,
&mut current_state,
)
.await?;
tracing::trace!(
epoch = ?barrier.epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill position persisted after completion"
);
}

// For both backfill finished before recovery,
// and backfill which just finished, we need to update mview tracker,
// it does not persist this information.
self.progress.finish(barrier.epoch.curr);
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
tracing::trace!(
actor = self.actor_id,
epoch = ?barrier.epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill position persisted after completion"
"Updated CreateMaterializedTracker"
);
Self::persist_state(
barrier.epoch,
&mut self.state_table,
true,
&current_pos,
total_snapshot_processed_rows,
&mut old_state,
&mut current_state,
)
.await?;
self.progress.finish(barrier.epoch.curr);
yield msg;
break;
}
yield msg;
}
}

tracing::trace!(
"Backfill has already finished and forward messages directly to the downstream"
);

// After progress finished + state persisted,
// we can forward messages directly to the downstream,
// as backfill is finished.
Expand All @@ -500,14 +515,68 @@ where
#[for_await]
for msg in upstream {
if let Some(msg) = mapping_message(msg?, &self.output_indices) {
if let Some(state_table) = self.state_table.as_mut() && let Message::Barrier(barrier) = &msg {
state_table.commit_no_data_expected(barrier.epoch);
}
yield msg;
}
}
}

async fn recover_backfill_state(
state_table: Option<&StateTable<S>>,
pk_len: usize,
) -> StreamExecutorResult<BackfillState> {
let Some(state_table) = state_table else {
// If no state table, but backfill is present, it must be from an old cluster.
// In that case backfill must be finished, otherwise it won't have been persisted.
return Ok(BackfillState {
current_pos: None,
is_finished: true,
row_count: 0,
old_state: None,
});
};
let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
let first_vnode = vnodes.next().unwrap();
let key: &[Datum] = &[Some(first_vnode.into())];
let row = state_table.get_row(key).await?;
let expected_state = Self::deserialize_backfill_state(row, pk_len);

// All vnode partitions should have same state (no scale-in supported).
for vnode in vnodes {
let key: &[Datum] = &[Some(vnode.into())];
let row = state_table.get_row(key).await?;
let state = Self::deserialize_backfill_state(row, pk_len);
assert_eq!(state, expected_state);
}
Ok(expected_state)
}

fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
let Some(row) = row else {
return BackfillState {
current_pos: None,
is_finished: false,
row_count: 0,
old_state: None,
};
};
let row = row.into_inner();
let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
old_state[1..row.len() + 1].clone_from_slice(&row);
let current_pos = Some((&row[0..pk_len]).into_owned_row());
let is_finished = row[pk_len].clone().map_or(false, |d| d.into_bool());
let row_count = row
.get(pk_len + 1)
.cloned()
.unwrap_or(None)
.map_or(0, |d| d.into_int64() as u64);
BackfillState {
current_pos,
is_finished,
row_count,
old_state: Some(old_state),
}
}

/// Snapshot read the upstream mv.
/// The rows from upstream snapshot read will be buffered inside the `builder`.
/// If snapshot is dropped before its rows are consumed,
Expand Down
Loading
Loading