diff --git a/Cargo.lock b/Cargo.lock index 0c8b66655c0ec..3719b79b47149 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8882,6 +8882,7 @@ dependencies = [ "easy-ext", "either", "enum-as-inner", + "expect-test", "fail", "function_name", "futures", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index c770f60a916bd..229907ab3cee8 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -94,6 +94,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] assert_matches = "1" +expect-test = "1.4" maplit = "1.0.2" rand = "0.8" risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 3308ae43287f6..0fecc15c2e4db 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -93,7 +93,7 @@ impl ScaleService for ScaleServiceImpl { let actor_splits = self .source_manager - .get_actor_splits() + .list_assignments() .await .into_iter() .map(|(actor_id, splits)| { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 5f881e8d33138..59f54471d5206 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -84,8 +84,8 @@ pub struct ReplaceTablePlan { pub init_split_assignment: SplitAssignment, } -/// [`Command`] is the action of [`crate::barrier::GlobalBarrierManager`]. For different commands, -/// we'll build different barriers to send, and may do different stuffs after the barrier is +/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands, +/// it will build different barriers to send, and may do different stuffs after the barrier is /// collected. #[derive(Debug, Clone, strum::Display)] pub enum Command { @@ -156,8 +156,8 @@ pub enum Command { /// of the Merge executors are changed additionally. ReplaceTable(ReplaceTablePlan), - /// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or - /// newly added splits. + /// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or + /// changed splits. SourceSplitAssignment(SplitAssignment), /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 0fc30da991d45..313dfce976410 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -104,7 +104,7 @@ impl GlobalBarrierManager { // clean up source connector dirty changes. self.source_manager - .drop_source_change(&to_drop_table_fragments) + .drop_source_fragments(&to_drop_table_fragments) .await; Ok(()) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 0eaecd0b1c039..0dae4206cc4a8 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1091,7 +1091,7 @@ impl ScaleController { let actor_splits = self .source_manager - .reallocate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids) + .migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids) .await?; fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits); diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index bd5fd96db32d0..e1fca4cc64e38 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::dispatch_source_prop; @@ -109,6 +108,8 @@ impl ConnectorSourceWorker

{ Ok(()) } + /// On creation, connection to the external source service will be established, but `splits` + /// will not be updated until `tick` is called. pub async fn create( connector_client: &Option, source: &Source, @@ -250,7 +251,12 @@ impl SourceManagerCore { } } - async fn diff(&self) -> MetaResult { + /// Checks whether the external source metadata has changed, + /// and re-assigns splits if there's a diff. + /// + /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`, + /// after the mutation barrier has been collected. + async fn reassign_splits(&self) -> MetaResult { let mut split_assignment: SplitAssignment = HashMap::new(); for (source_id, handle) in &self.managed_sources { @@ -261,47 +267,48 @@ impl SourceManagerCore { } }; - if let Some(discovered_splits) = handle.discovered_splits().await { - for fragment_id in fragment_ids { - let actor_ids = match self - .fragment_manager - .get_running_actors_of_fragment(*fragment_id) - .await - { - Ok(actor_ids) => actor_ids, - Err(err) => { - tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); - continue; - } - }; - - let prev_actor_splits: HashMap<_, _> = actor_ids - .into_iter() - .map(|actor_id| { - ( - actor_id, - self.actor_splits - .get(&actor_id) - .cloned() - .unwrap_or_default(), - ) - }) - .collect(); - - if discovered_splits.is_empty() { - tracing::warn!("No splits discovered for source {}", source_id); - } + let Some(discovered_splits) = handle.discovered_splits().await else { + return Ok(split_assignment); + }; + if discovered_splits.is_empty() { + tracing::warn!("No splits discovered for source {}", source_id); + } - if let Some(change) = diff_splits( - *fragment_id, - prev_actor_splits, - &discovered_splits, - SplitDiffOptions { - enable_scale_in: handle.enable_scale_in, - }, - ) { - split_assignment.insert(*fragment_id, change); + for fragment_id in fragment_ids { + let actor_ids = match self + .fragment_manager + .get_running_actors_of_fragment(*fragment_id) + .await + { + Ok(actor_ids) => actor_ids, + Err(err) => { + tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); + continue; } + }; + + let prev_actor_splits: HashMap<_, _> = actor_ids + .into_iter() + .map(|actor_id| { + ( + actor_id, + self.actor_splits + .get(&actor_id) + .cloned() + .unwrap_or_default(), + ) + }) + .collect(); + + if let Some(new_assignment) = reassign_splits( + *fragment_id, + prev_actor_splits, + &discovered_splits, + SplitDiffOptions { + enable_scale_in: handle.enable_scale_in, + }, + ) { + split_assignment.insert(*fragment_id, new_assignment); } } } @@ -309,7 +316,7 @@ impl SourceManagerCore { Ok(split_assignment) } - pub fn apply_source_change( + fn apply_source_change( &mut self, source_fragments: Option>>, split_assignment: Option, @@ -343,7 +350,7 @@ impl SourceManagerCore { } } - pub fn drop_source_change( + fn drop_source_fragments( &mut self, source_fragments: HashMap>, actor_splits: &HashSet, @@ -369,12 +376,9 @@ impl SourceManagerCore { self.actor_splits.remove(actor_id); } } - - pub fn get_actor_splits(&self) -> HashMap> { - self.actor_splits.clone() - } } +/// Note: the `PartialEq` and `Ord` impl just compares the number of splits. #[derive(Debug)] struct ActorSplitsAssignment { actor_id: ActorId, @@ -415,7 +419,11 @@ impl Default for SplitDiffOptions { } } -fn diff_splits( +/// Reassigns splits if there are new splits or dropped splits, +/// i.e., `actor_splits` and `discovered_splits` differ. +/// +/// - `fragment_id`: just for logging +fn reassign_splits( fragment_id: FragmentId, actor_splits: HashMap>, discovered_splits: &BTreeMap, @@ -552,16 +560,17 @@ impl SourceManager { }) } - pub async fn drop_source_change(&self, table_fragments_vec: &[TableFragments]) { + /// For dropping MV. + pub async fn drop_source_fragments(&self, table_fragments: &[TableFragments]) { let mut core = self.core.lock().await; // Extract the fragments that include source operators. - let source_fragments = table_fragments_vec + let source_fragments = table_fragments .iter() .flat_map(|table_fragments| table_fragments.stream_source_fragments()) .collect::>(); - let fragments = table_fragments_vec + let fragments = table_fragments .iter() .flat_map(|table_fragments| &table_fragments.fragments) .collect::>(); @@ -573,9 +582,10 @@ impl SourceManager { .map(|actor| actor.get_actor_id()) .collect::>(); - core.drop_source_change(source_fragments, &dropped_actors); + core.drop_source_fragments(source_fragments, &dropped_actors); } + /// Updates states after split change (`post_collect` barrier) or scaling (`post_apply_reschedule`). pub async fn apply_source_change( &self, source_fragments: Option>>, @@ -586,11 +596,13 @@ impl SourceManager { core.apply_source_change(source_fragments, split_assignment, dropped_actors); } - // After introducing the remove function for split, there may be a very occasional split removal - // during scaling, in which case we need to use the old splits for reallocation instead of the - // latest splits (which may be missing), so that we can resolve the split removal in the next - // command. - pub async fn reallocate_splits( + /// Migrates splits from previous actors to the new actors for a rescheduled fragment. + /// + /// Very occasionally split removal may happen + /// during scaling, in which case we need to use the old splits for reallocation instead of the + /// latest splits (which may be missing), so that we can resolve the split removal in the next + /// command. + pub async fn migrate_splits( &self, fragment_id: FragmentId, prev_actor_ids: &[ActorId], @@ -601,20 +613,15 @@ impl SourceManager { let prev_splits = prev_actor_ids .iter() .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap()) - .cloned() - .collect_vec(); + .map(|split| (split.id(), split.clone())) + .collect(); let empty_actor_splits = curr_actor_ids .iter() .map(|actor_id| (*actor_id, vec![])) .collect(); - let prev_splits = prev_splits - .into_iter() - .map(|split| (split.id(), split)) - .collect(); - - let diff = diff_splits( + let diff = reassign_splits( fragment_id, empty_actor_splits, &prev_splits, @@ -626,7 +633,8 @@ impl SourceManager { Ok(diff) } - pub async fn pre_allocate_splits(&self, table_id: &TableId) -> MetaResult { + /// Allocates splits to actors for a newly created source executor. + pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult { let core = self.core.lock().await; let table_fragments = core .fragment_manager @@ -670,7 +678,7 @@ impl SourceManager { .map(|actor| (actor.actor_id, vec![])) .collect(); - if let Some(diff) = diff_splits( + if let Some(diff) = reassign_splits( fragment_id, empty_actor_splits, &splits, @@ -694,7 +702,6 @@ impl SourceManager { self.env.connector_client(), source, &mut core.managed_sources, - true, self.metrics.clone(), ) .await?; @@ -702,7 +709,17 @@ impl SourceManager { Ok(()) } - /// Used on startup. Failed sources will not block meta startup. + /// unregister connector worker for source. + pub async fn unregister_sources(&self, source_ids: Vec) { + let mut core = self.core.lock().await; + for source_id in source_ids { + if let Some(handle) = core.managed_sources.remove(&source_id) { + handle.handle.abort(); + } + } + } + + /// Used on startup ([`Self::new`]). Failed sources will not block meta startup. fn create_source_worker_async( connector_client: Option, source: Source, @@ -761,12 +778,13 @@ impl SourceManager { Ok(()) } - /// Used when registering new sources. + /// Used when registering new sources (`Self::register_source`). + /// + /// It will call `ConnectorSourceWorker::tick()` to fetch split metadata once before returning. async fn create_source_worker( connector_client: Option, source: &Source, managed_sources: &mut HashMap, - force_tick: bool, metrics: Arc, ) -> MetaResult<()> { tracing::info!("spawning new watcher for source {}", source.id); @@ -789,23 +807,19 @@ impl SourceManager { ) .await?; - // don't force tick in process of recovery. One source down should not lead to meta - // recovery failure. - if force_tick { - // if fail to fetch meta info, will refuse to create source - - // todo: make the timeout configurable, longer than `properties.sync.call.timeout` - // in kafka - tokio::time::timeout(Self::DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) - .await - .map_err(|_e| { - anyhow!( - "failed to fetch meta info for source {}, error: timeout {}", - source.id, - Self::DEFAULT_SOURCE_TICK_TIMEOUT.as_secs() - ) - })??; - } + // if fail to fetch meta info, will refuse to create source + + // todo: make the timeout configurable, longer than `properties.sync.call.timeout` + // in kafka + tokio::time::timeout(Self::DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) + .await + .map_err(|_e| { + anyhow!( + "failed to fetch meta info for source {}, error: timeout {}", + source.id, + Self::DEFAULT_SOURCE_TICK_TIMEOUT.as_secs() + ) + })??; tokio::spawn(async move { worker.run(sync_call_rx).await }) }); @@ -823,16 +837,6 @@ impl SourceManager { Ok(()) } - /// unregister connector worker for source. - pub async fn unregister_sources(&self, source_ids: Vec) { - let mut core = self.core.lock().await; - for source_id in source_ids { - if let Some(handle) = core.managed_sources.remove(&source_id) { - handle.handle.abort(); - } - } - } - pub async fn list_assignments(&self) -> HashMap> { let core = self.core.lock().await; core.actor_splits.clone() @@ -840,14 +844,20 @@ impl SourceManager { /// Checks whether the external source metadata has changed, and sends a split assignment command /// if it has. + /// + /// This is also how a newly created `SourceExecutor` is initialized. + /// (force `tick` in `Self::create_source_worker`) + /// + /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change` + /// to update states in `SourceManager`. async fn tick(&self) -> MetaResult<()> { - let diff = { + let split_assignment = { let core_guard = self.core.lock().await; - core_guard.diff().await? + core_guard.reassign_splits().await? }; - if !diff.is_empty() { - let command = Command::SourceSplitAssignment(diff); + if !split_assignment.is_empty() { + let command = Command::SourceSplitAssignment(split_assignment); tracing::info!(command = ?command, "pushing down split assignment command"); self.barrier_scheduler.run_command(command).await?; } @@ -869,10 +879,6 @@ impl SourceManager { } } } - - pub async fn get_actor_splits(&self) -> HashMap> { - self.core.lock().await.get_actor_splits() - } } pub fn build_actor_connector_splits( @@ -918,7 +924,7 @@ mod tests { use serde::{Deserialize, Serialize}; use crate::model::{ActorId, FragmentId}; - use crate::stream::source_manager::{diff_splits, SplitDiffOptions}; + use crate::stream::source_manager::{reassign_splits, SplitDiffOptions}; #[derive(Debug, Copy, Clone, Serialize, Deserialize)] struct TestSplit { @@ -988,7 +994,7 @@ mod tests { .flat_map(|splits| splits.iter().map(|split| split.id())) .collect(); - let diff = diff_splits( + let diff = reassign_splits( FragmentId::default(), actor_splits, &discovered_splits, @@ -1028,7 +1034,7 @@ mod tests { enable_scale_in: true, }; - let diff = diff_splits( + let diff = reassign_splits( FragmentId::default(), actor_splits, &discovered_splits, @@ -1040,30 +1046,47 @@ mod tests { } #[test] - fn test_diff_splits() { + fn test_reassign_splits() { + fn check( + actor_splits: HashMap>, + discovered_splits: BTreeMap, + expected: expect_test::Expect, + ) { + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default(), + ) + .map(BTreeMap::from_iter); // ensure deterministic result + expected.assert_debug_eq(&diff); + } + let actor_splits = HashMap::new(); let discovered_splits: BTreeMap = BTreeMap::new(); - assert!(diff_splits( - FragmentId::default(), + check( actor_splits, - &discovered_splits, - Default::default() - ) - .is_none()); + discovered_splits, + expect_test::expect![[r#" + None + "#]], + ); let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = BTreeMap::new(); - let diff = diff_splits( - FragmentId::default(), + check( actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - assert!(splits.is_empty()) - } + discovered_splits, + expect_test::expect![[r#" + Some( + { + 0: [], + 1: [], + 2: [], + }, + ) + "#]], + ); let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = (0..3) @@ -1072,20 +1095,31 @@ mod tests { (split.id(), split) }) .collect(); - - let diff = diff_splits( - FragmentId::default(), + check( actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - assert_eq!(splits.len(), 1); - } - - check_all_splits(&discovered_splits, &diff); + discovered_splits, + expect_test::expect![[r#" + Some( + { + 0: [ + TestSplit { + id: 2, + }, + ], + 1: [ + TestSplit { + id: 0, + }, + ], + 2: [ + TestSplit { + id: 1, + }, + ], + }, + ) + "#]], + ); let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); let discovered_splits: BTreeMap = (0..5) @@ -1094,46 +1128,82 @@ mod tests { (split.id(), split) }) .collect(); - - let diff = diff_splits( - FragmentId::default(), + check( actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - let len = splits.len(); - assert!(len == 1 || len == 2); - } - - check_all_splits(&discovered_splits, &diff); + discovered_splits, + expect_test::expect![[r#" + Some( + { + 0: [ + TestSplit { + id: 0, + }, + ], + 1: [ + TestSplit { + id: 1, + }, + TestSplit { + id: 3, + }, + ], + 2: [ + TestSplit { + id: 2, + }, + TestSplit { + id: 4, + }, + ], + }, + ) + "#]], + ); let mut actor_splits: HashMap> = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); actor_splits.insert(3, vec![]); actor_splits.insert(4, vec![]); - let discovered_splits: BTreeMap = (0..5) .map(|i| { let split = TestSplit { id: i }; (split.id(), split) }) .collect(); - - let diff = diff_splits( - FragmentId::default(), + check( actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 5); - for splits in diff.values() { - assert_eq!(splits.len(), 1); - } - - check_all_splits(&discovered_splits, &diff); + discovered_splits, + expect_test::expect![[r#" + Some( + { + 0: [ + TestSplit { + id: 0, + }, + ], + 1: [ + TestSplit { + id: 1, + }, + ], + 2: [ + TestSplit { + id: 2, + }, + ], + 3: [ + TestSplit { + id: 3, + }, + ], + 4: [ + TestSplit { + id: 4, + }, + ], + }, + ) + "#]], + ); } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b3a05dd21c759..c60db7a0f8d52 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -513,10 +513,8 @@ impl GlobalStreamManager { let dummy_table_id = table_fragments.table_id(); - let init_split_assignment = self - .source_manager - .pre_allocate_splits(&dummy_table_id) - .await?; + let init_split_assignment = + self.source_manager.allocate_splits(&dummy_table_id).await?; replace_table_command = Some(ReplaceTablePlan { old_table_fragments: context.old_table_fragments, @@ -536,7 +534,7 @@ impl GlobalStreamManager { let table_id = table_fragments.table_id(); - let init_split_assignment = self.source_manager.pre_allocate_splits(&table_id).await?; + let init_split_assignment = self.source_manager.allocate_splits(&table_id).await?; let command = Command::CreateStreamingJob { table_fragments, @@ -587,10 +585,7 @@ impl GlobalStreamManager { let dummy_table_id = table_fragments.table_id(); - let init_split_assignment = self - .source_manager - .pre_allocate_splits(&dummy_table_id) - .await?; + let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; if let Err(err) = self .barrier_scheduler @@ -633,7 +628,7 @@ impl GlobalStreamManager { .await?; self.source_manager - .drop_source_change(&table_fragments_vec) + .drop_source_fragments(&table_fragments_vec) .await; self.barrier_scheduler