diff --git a/.gitignore b/.gitignore index 5f54a467b21b7..81e8609f3334e 100644 --- a/.gitignore +++ b/.gitignore @@ -83,7 +83,7 @@ e2e_test/iceberg/metastore_db **/*.sqlite **/*.sqlite-journal -*.slt.temp +*.slt*.temp .direnv/ diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index f9c91fe9b6123..3397f90f081da 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -28,10 +28,10 @@ sleep 2s system ok internal_table.mjs --name mv_before_produce --type sourcebackfill ---- -0,"""Finished""" -1,"""Finished""" -2,"""Finished""" -3,"""Finished""" +0,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}" +1,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}" +2,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}" +3,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}" system ok @@ -39,6 +39,9 @@ cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0 0 {"v1": 1, "v2": "a"} 1 {"v1": 2, "v2": "b"} 2 {"v1": 3, "v2": "c"} +2 {"v1": 3, "v2": "c"} +3 {"v1": 4, "v2": "d"} +3 {"v1": 4, "v2": "d"} 3 {"v1": 4, "v2": "d"} EOF @@ -83,10 +86,10 @@ internal_table.mjs --name s0 --type source system ok internal_table.mjs --name mv_1 --type sourcebackfill ---- -0,"{""SourceCachingUp"": ""0""}" -1,"{""SourceCachingUp"": ""0""}" -2,"{""SourceCachingUp"": ""0""}" -3,"{""SourceCachingUp"": ""0""}" +0,"{""num_consumed_rows"": 1, ""state"": {""SourceCachingUp"": ""0""}, ""target_offset"": ""0""}" +1,"{""num_consumed_rows"": 1, ""state"": {""SourceCachingUp"": ""0""}, ""target_offset"": ""0""}" +2,"{""num_consumed_rows"": 2, ""state"": {""SourceCachingUp"": ""1""}, ""target_offset"": ""1""}" +3,"{""num_consumed_rows"": 3, ""state"": {""SourceCachingUp"": ""2""}, ""target_offset"": ""2""}" # This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. @@ -101,26 +104,35 @@ sleep 2s query ?? rowsort select v1, v2 from s0; ---- -1 a -2 b -3 c -4 d +1 a +2 b +3 c +3 c +4 d +4 d +4 d query ?? rowsort select v1, v2 from mv_1; ---- -1 a -2 b -3 c -4 d +1 a +2 b +3 c +3 c +4 d +4 d +4 d query ?? rowsort select v1, v2 from mv_2; ---- -1 a -2 b -3 c -4 d +1 a +2 b +3 c +3 c +4 d +4 d +4 d system ok cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0 @@ -138,33 +150,39 @@ internal_table.mjs --name s0 --type source ---- 0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" 1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" query ?? rowsort select v1, v2 from s0; ---- -1 a -1 aa -2 b -2 bb -3 c -3 cc -4 d -4 dd +1 a +1 aa +2 b +2 bb +3 c +3 c +3 cc +4 d +4 d +4 d +4 dd query ?? rowsort select v1, v2 from mv_1; ---- -1 a -1 aa -2 b -2 bb -3 c -3 cc -4 d -4 dd +1 a +1 aa +2 b +2 bb +3 c +3 c +3 cc +4 d +4 d +4 d +4 dd # start_offset changed to 1 @@ -173,18 +191,18 @@ internal_table.mjs --name s0 --type source ---- 0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" 1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" # Transition from SourceCachingUp to Finished after consuming one upstream message. system ok internal_table.mjs --name mv_1 --type sourcebackfill ---- -0,"""Finished""" -1,"""Finished""" -2,"""Finished""" -3,"""Finished""" +0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}" +1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}" +2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}" +3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}" system ok @@ -203,26 +221,26 @@ sleep 3s query ?? rowsort select v1, count(*) from s0 group by v1; ---- -1 12 -2 12 -3 12 -4 12 +1 12 +2 12 +3 13 +4 14 query ?? rowsort select v1, count(*) from mv_1 group by v1; ---- -1 12 -2 12 -3 12 -4 12 +1 12 +2 12 +3 13 +4 14 query ?? rowsort select v1, count(*) from mv_before_produce group by v1; ---- -1 12 -2 12 -3 12 -4 12 +1 12 +2 12 +3 13 +4 14 # start_offset changed to 11 @@ -231,8 +249,8 @@ internal_table.mjs --name s0 --type source ---- 0,"{""split_info"": {""partition"": 0, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" 1,"{""split_info"": {""partition"": 1, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -2,"{""split_info"": {""partition"": 2, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 12, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 13, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" # # Note: the parallelism depends on the risedev profile. diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index bd166f8082c4c..987e0827da965 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -54,9 +54,21 @@ pub enum BackfillState { SourceCachingUp(String), Finished, } -pub type BackfillStates = HashMap; +pub type BackfillStates = HashMap; -impl BackfillState { +/// Only `state` field is the real state for fail-over. +/// Other fields are for observability (but we still need to persist them). +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct BackfillStateWithProgress { + pub state: BackfillState, + pub num_consumed_rows: u64, + /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling. + /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it) + /// so that we can finish backfilling even when upstream doesn't emit any data. + pub target_offset: Option, +} + +impl BackfillStateWithProgress { pub fn encode_to_json(self) -> JsonbVal { serde_json::to_value(self).unwrap().into() } @@ -108,13 +120,13 @@ struct BackfillStage { /// /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`). splits: Vec, - /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling. - /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it) - /// so that we can finish backfilling even when upstream doesn't emit any data. - target_offsets: HashMap>, } impl BackfillStage { + fn total_backfilled_rows(&self) -> u64 { + self.states.values().map(|s| s.num_consumed_rows).sum() + } + fn debug_assert_consistent(&self) { if cfg!(debug_assertions) { let all_splits: HashSet<_> = @@ -123,10 +135,6 @@ impl BackfillStage { self.states.keys().cloned().collect::>(), all_splits ); - assert_eq!( - self.target_offsets.keys().cloned().collect::>(), - all_splits - ); } } @@ -134,7 +142,7 @@ impl BackfillStage { fn get_latest_unfinished_splits(&self) -> StreamExecutorResult> { let mut unfinished_splits = Vec::new(); for split in &self.splits { - let state = self.states.get(split.id().as_ref()).unwrap(); + let state = &self.states.get(split.id().as_ref()).unwrap().state; match state { BackfillState::Backfilling(Some(offset)) => { let mut updated_split = split.clone(); @@ -152,7 +160,8 @@ impl BackfillStage { fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool { let mut vis = false; let state = self.states.get_mut(split_id).unwrap(); - match state { + let state_inner = &mut state.state; + match state_inner { BackfillState::Backfilling(None) => { // backfilling for this split is not started yet. Ignore this row } @@ -163,12 +172,12 @@ impl BackfillStage { } Ordering::Equal => { // backfilling for this split is finished just right. - *state = BackfillState::Finished; + *state_inner = BackfillState::Finished; } Ordering::Greater => { // backfilling for this split produced more data than current source's progress. // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset. - *state = BackfillState::SourceCachingUp(backfill_offset.clone()); + *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone()); } } } @@ -178,11 +187,11 @@ impl BackfillStage { // Source caught up, but doesn't contain the last backfilled row. // This may happen e.g., if Kafka performed compaction. vis = true; - *state = BackfillState::Finished; + *state_inner = BackfillState::Finished; } Ordering::Equal => { // Source just caught up with backfilling. - *state = BackfillState::Finished; + *state_inner = BackfillState::Finished; } Ordering::Greater => { // Source is still behind backfilling. @@ -194,11 +203,11 @@ impl BackfillStage { // This split's backfilling is finished, we are waiting for other splits } } - if matches!(state, BackfillState::Backfilling(_)) { - *self.target_offsets.get_mut(split_id).unwrap() = Some(offset.to_string()); + if matches!(state_inner, BackfillState::Backfilling(_)) { + state.target_offset = Some(offset.to_string()); } if vis { - debug_assert_eq!(*state, BackfillState::Finished); + debug_assert_eq!(*state_inner, BackfillState::Finished); } vis } @@ -206,10 +215,11 @@ impl BackfillStage { /// Updates backfill states and returns whether the row backfilled from external system is visible. fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool { let state = self.states.get_mut(split_id).unwrap(); - match state { + state.num_consumed_rows += 1; + let state_inner = &mut state.state; + match state_inner { BackfillState::Backfilling(_old_offset) => { - let target_offset = self.target_offsets.get(split_id).unwrap(); - if let Some(target_offset) = target_offset + if let Some(target_offset) = &state.target_offset && compare_kafka_offset(offset, target_offset).is_ge() { // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up @@ -221,9 +231,9 @@ impl BackfillStage { // // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will // keep backfilling. - *state = BackfillState::SourceCachingUp(offset.to_string()); + *state_inner = BackfillState::SourceCachingUp(offset.to_string()); } else { - *state = BackfillState::Backfilling(Some(offset.to_string())); + *state_inner = BackfillState::Backfilling(Some(offset.to_string())); } true } @@ -336,22 +346,20 @@ impl SourceBackfillExecutorInner { self.backfill_state_store.init_epoch(barrier.epoch); let mut backfill_states: BackfillStates = HashMap::new(); - for split in &owned_splits { let split_id = split.id(); let backfill_state = self .backfill_state_store .try_recover_from_state_store(&split_id) .await? - .unwrap_or(BackfillState::Backfilling(None)); + .unwrap_or(BackfillStateWithProgress { + state: BackfillState::Backfilling(None), + num_consumed_rows: 0, + target_offset: None, // init with None + }); backfill_states.insert(split_id, backfill_state); } let mut backfill_stage = BackfillStage { - // init with None - target_offsets: backfill_states - .keys() - .map(|split_id| (split_id.clone(), None)) - .collect(), states: backfill_states, splits: owned_splits, }; @@ -368,14 +376,14 @@ impl SourceBackfillExecutorInner { .instrument_await("source_build_reader") .await?; for (split_id, info) in &backfill_info { + let state = backfill_stage.states.get_mut(split_id).unwrap(); match info { BackfillInfo::NoDataToBackfill => { - *backfill_stage.states.get_mut(split_id).unwrap() = BackfillState::Finished; + state.state = BackfillState::Finished; } BackfillInfo::HasDataToBackfill { latest_offset } => { // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. - *backfill_stage.target_offsets.get_mut(split_id).unwrap() = - Some(latest_offset.clone()); + state.target_offset = Some(latest_offset.clone()); } } } @@ -586,8 +594,10 @@ impl SourceBackfillExecutorInner { // progress based on the number of consumed rows and an estimated total number of rows from hummock. // For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%. tracing::debug!("progress finish"); - let epoch = barrier.epoch; - self.progress.finish(epoch, 114514); + self.progress.finish( + barrier.epoch, + backfill_stage.total_backfilled_rows(), + ); // yield barrier after reporting progress yield Message::Barrier(barrier); @@ -671,16 +681,9 @@ impl SourceBackfillExecutorInner { } } - let mut splits: HashSet = backfill_stage.states.keys().cloned().collect(); + let mut states = backfill_stage.states; // Make sure `Finished` state is persisted. - self.backfill_state_store - .set_states( - splits - .iter() - .map(|s| (s.clone(), BackfillState::Finished)) - .collect(), - ) - .await?; + self.backfill_state_store.set_states(states.clone()).await?; // All splits finished backfilling. Now we only forward the source data. #[for_await] @@ -700,7 +703,7 @@ impl SourceBackfillExecutorInner { ); self.apply_split_change_forward_stage( actor_splits, - &mut splits, + &mut states, true, ) .await?; @@ -708,7 +711,7 @@ impl SourceBackfillExecutorInner { Mutation::Update(UpdateMutation { actor_splits, .. }) => { self.apply_split_change_forward_stage( actor_splits, - &mut splits, + &mut states, false, ) .await?; @@ -743,7 +746,7 @@ impl SourceBackfillExecutorInner { fn should_report_finished(&self, states: &BackfillStates) -> bool { states.values().all(|state| { matches!( - state, + state.state, BackfillState::Finished | BackfillState::SourceCachingUp(_) ) }) @@ -763,13 +766,13 @@ impl SourceBackfillExecutorInner { async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult { Ok(states .values() - .all(|state| matches!(state, BackfillState::Finished)) + .all(|state| matches!(state.state, BackfillState::Finished)) && self .backfill_state_store .scan() .await? .into_iter() - .all(|state| matches!(state, BackfillState::Finished))) + .all(|state| matches!(state.state, BackfillState::Finished))) } /// For newly added splits, we do not need to backfill and can directly forward from upstream. @@ -823,8 +826,15 @@ impl SourceBackfillExecutorInner { match backfill_state { None => { // Newly added split. We don't need to backfill. - // Note that this branch is different from the initial barrier (BackfillState::Backfilling(None) there). - target_state.insert(split_id, BackfillState::Finished); + // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there). + target_state.insert( + split_id, + BackfillStateWithProgress { + state: BackfillState::Finished, + num_consumed_rows: 0, + target_offset: None, + }, + ); } Some(backfill_state) => { // Migrated split. Backfill if unfinished. @@ -858,17 +868,6 @@ impl SourceBackfillExecutorInner { } stage.states = target_state; stage.splits = target_splits; - let old_target_offsets = std::mem::take(&mut stage.target_offsets); - stage.target_offsets = stage - .states - .keys() - .map(|split_id| { - ( - split_id.clone(), - old_target_offsets.get(split_id).cloned().flatten(), - ) - }) - .collect(); stage.debug_assert_consistent(); Ok(split_changed) } @@ -878,12 +877,12 @@ impl SourceBackfillExecutorInner { async fn apply_split_change_forward_stage( &mut self, split_assignment: &HashMap>, - splits: &mut HashSet, + states: &mut BackfillStates, should_trim_state: bool, ) -> StreamExecutorResult<()> { self.source_split_change_count.inc(); if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() { - self.update_state_if_changed_forward_stage(target_splits, splits, should_trim_state) + self.update_state_if_changed_forward_stage(target_splits, states, should_trim_state) .await?; } @@ -893,7 +892,7 @@ impl SourceBackfillExecutorInner { async fn update_state_if_changed_forward_stage( &mut self, target_splits: Vec, - current_splits: &mut HashSet, + states: &mut BackfillStates, should_trim_state: bool, ) -> StreamExecutorResult<()> { let target_splits: HashSet = target_splits @@ -902,23 +901,25 @@ impl SourceBackfillExecutorInner { .collect(); let mut split_changed = false; + let mut newly_added_splits = vec![]; // Checks added splits for split_id in &target_splits { - if !current_splits.contains(split_id) { + if !states.contains_key(split_id) { split_changed = true; let backfill_state = self .backfill_state_store .try_recover_from_state_store(split_id) .await?; - match backfill_state { + match &backfill_state { None => { // Newly added split. We don't need to backfill! + newly_added_splits.push(split_id.clone()); } Some(backfill_state) => { // Migrated split. It should also be finished since we are in forwarding stage. - match backfill_state { + match backfill_state.state { BackfillState::Finished => {} _ => { return Err(anyhow::anyhow!( @@ -930,11 +931,19 @@ impl SourceBackfillExecutorInner { } } } + states.insert( + split_id.clone(), + backfill_state.unwrap_or(BackfillStateWithProgress { + state: BackfillState::Finished, + num_consumed_rows: 0, + target_offset: None, + }), + ); } } // Checks dropped splits - for existing_split_id in current_splits.iter() { + for existing_split_id in states.keys() { if !target_splits.contains(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; @@ -947,19 +956,31 @@ impl SourceBackfillExecutorInner { "apply split change" ); - let dropped_splits = - current_splits.extract_if(|split_id| !target_splits.contains(split_id)); + let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id)); if should_trim_state { // trim dropped splits' state - self.backfill_state_store.trim_state(dropped_splits).await?; + self.backfill_state_store + .trim_state(dropped_splits.map(|(k, _v)| k)) + .await?; } + // For migrated splits, and existing splits, we do not need to update + // state store, but only for newly added splits. self.backfill_state_store .set_states( - target_splits + newly_added_splits .into_iter() - .map(|split_id| (split_id, BackfillState::Finished)) + .map(|split_id| { + ( + split_id, + BackfillStateWithProgress { + state: BackfillState::Finished, + num_consumed_rows: 0, + target_offset: None, + }, + ) + }) .collect(), ) .await?; diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index 3579aff2ec4fb..b0ca8d363b9d7 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -23,7 +23,7 @@ use risingwave_connector::source::SplitId; use risingwave_pb::catalog::PbTable; use risingwave_storage::StateStore; -use super::source_backfill_executor::{BackfillState, BackfillStates}; +use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorError; use crate::executor::StreamExecutorResult; @@ -56,7 +56,7 @@ impl BackfillStateTableHandler { } /// XXX: we might get stale data for other actors' writes, but it's fine? - pub async fn scan(&self) -> StreamExecutorResult> { + pub async fn scan(&self) -> StreamExecutorResult> { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self @@ -70,7 +70,7 @@ impl BackfillStateTableHandler { let row = item?.into_owned_row(); let state = match row.datum_at(1) { Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())? + BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())? } _ => unreachable!(), }; @@ -80,7 +80,11 @@ impl BackfillStateTableHandler { Ok(ret) } - async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> { + async fn set( + &mut self, + key: SplitId, + state: BackfillStateWithProgress, + ) -> StreamExecutorResult<()> { let row = [ Some(Self::string_to_scalar(key.as_ref())), Some(ScalarImpl::Jsonb(state.encode_to_json())), @@ -126,13 +130,13 @@ impl BackfillStateTableHandler { pub async fn try_recover_from_state_store( &mut self, split_id: &SplitId, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { Ok(self .get(split_id) .await? .map(|row| match row.datum_at(1) { Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - BackfillState::restore_from_json(jsonb_ref.to_owned_scalar()) + BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar()) } _ => unreachable!(), })