From 93adb82326f76d7737ef98bfdae3dfb9384cc4e7 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 26 Mar 2024 15:46:54 +0800 Subject: [PATCH] feat(storage): register and unregister member table id at commit epoch (#15852) --- src/meta/node/src/server.rs | 20 --- src/meta/src/barrier/command.rs | 36 +++-- src/meta/src/barrier/mod.rs | 23 +++- src/meta/src/barrier/recovery.rs | 113 ++++++++------- src/meta/src/barrier/schedule.rs | 10 +- src/meta/src/controller/catalog.rs | 8 +- src/meta/src/controller/streaming_job.rs | 21 ++- .../manager/compaction_group_manager.rs | 40 ++---- src/meta/src/hummock/manager/mod.rs | 130 +++++++++++++++--- src/meta/src/hummock/manager/worker.rs | 34 ++--- .../src/hummock/mock_hummock_meta_client.rs | 1 + src/meta/src/manager/notification.rs | 1 - src/meta/src/model/stream.rs | 13 ++ src/meta/src/rpc/ddl_controller.rs | 5 +- src/meta/src/rpc/ddl_controller_v2.rs | 6 +- src/meta/src/stream/stream_manager.rs | 97 +++++-------- 16 files changed, 319 insertions(+), 239 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index b4c4576d93ff7..fd822c52bb8f9 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -19,7 +19,6 @@ use anyhow::Context; use either::Either; use etcd_client::ConnectOptions; use futures::future::join_all; -use itertools::Itertools; use otlp_embedded::TraceServiceServer; use regex::Regex; use risingwave_common::config::MetaBackend; @@ -562,31 +561,12 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), barrier_scheduler.clone(), source_manager.clone(), - hummock_manager.clone(), stream_rpc_manager, scale_controller.clone(), ) .unwrap(), ); - let all_state_table_ids = match &metadata_manager { - MetadataManager::V1(mgr) => mgr - .catalog_manager - .list_tables() - .await - .into_iter() - .map(|t| t.id) - .collect_vec(), - MetadataManager::V2(mgr) => mgr - .catalog_controller - .list_all_state_table_ids() - .await? - .into_iter() - .map(|id| id as u32) - .collect_vec(), - }; - hummock_manager.purge(&all_state_table_ids).await; - // Initialize services. let backup_manager = BackupManager::new( env.clone(), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 545faaa5847a4..c9554b4ecfaa1 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,7 +41,7 @@ use thiserror_ext::AsReport; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; use super::trace::TracedEpoch; use crate::barrier::GlobalBarrierManagerContext; -use crate::manager::{DdlType, LocalNotification, MetadataManager, WorkerId}; +use crate::manager::{DdlType, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; @@ -141,7 +141,11 @@ pub enum Command { /// Barriers from the actors to be dropped will STILL be collected. /// After the barrier is collected, it notifies the local stream manager of compute nodes to /// drop actors, and then delete the table fragments info from meta store. - DropStreamingJobs(Vec), + DropStreamingJobs { + actors: Vec, + unregistered_table_fragment_ids: HashSet, + unregistered_state_table_ids: HashSet, + }, /// `CreateStreamingJob` command generates a `Add` barrier by given info. /// @@ -214,7 +218,7 @@ impl Command { Command::Plain(_) => None, Command::Pause(_) => None, Command::Resume(_) => None, - Command::DropStreamingJobs(actors) => Some(CommandActorChanges { + Command::DropStreamingJobs { actors, .. } => Some(CommandActorChanges { to_add: Default::default(), to_remove: actors.iter().cloned().collect(), }), @@ -401,7 +405,7 @@ impl CommandContext { })) } - Command::DropStreamingJobs(actors) => Some(Mutation::Stop(StopMutation { + Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation { actors: actors.clone(), })), @@ -793,9 +797,22 @@ impl CommandContext { .await; } - Command::DropStreamingJobs(actors) => { + Command::DropStreamingJobs { + actors, + unregistered_state_table_ids, + .. + } => { // Tell compute nodes to drop actors. self.clean_up(actors.clone()).await?; + + let unregistered_state_table_ids = unregistered_state_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(); + self.barrier_manager_context + .hummock_manager + .unregister_table_ids(&unregistered_state_table_ids) + .await?; } Command::CancelStreamingJob(table_fragments) => { @@ -815,12 +832,9 @@ impl CommandContext { let mut table_ids = table_fragments.internal_table_ids(); table_ids.push(table_id); self.barrier_manager_context - .env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - table_ids, - )) - .await; + .hummock_manager + .unregister_table_ids(&table_ids) + .await?; match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 17031bc12404c..cb453a4608fd3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -58,7 +58,7 @@ use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::state::BarrierManagerState; -use crate::hummock::{CommitEpochInfo, HummockManagerRef}; +use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, @@ -819,7 +819,7 @@ impl GlobalBarrierManagerContext { assert!(state.node_to_collect.is_empty()); let resps = state.resps; let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); + let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps, &command_ctx); if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { for notifier in notifiers { notifier.notify_collection_failed(e.clone()); @@ -1106,6 +1106,7 @@ pub type BarrierManagerRef = GlobalBarrierManagerContext; fn collect_commit_epoch_info( resps: Vec, + command_ctx: &CommandContext, ) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; @@ -1125,6 +1126,23 @@ fn collect_commit_epoch_info( table_watermarks.push(resp.table_watermarks); progresses.extend(resp.create_mview_progress); } + let new_table_fragment_info = if let Command::CreateStreamingJob { + table_fragments, .. + } = &command_ctx.command + { + Some(NewTableFragmentInfo { + table_id: table_fragments.table_id(), + mv_table_id: table_fragments.mv_table_id().map(TableId::new), + internal_table_ids: table_fragments + .internal_table_ids() + .into_iter() + .map(TableId::new) + .collect(), + }) + } else { + None + }; + let info = CommitEpochInfo::new( synced_ssts, merge_multiple_new_table_watermarks( @@ -1144,6 +1162,7 @@ fn collect_commit_epoch_info( .collect_vec(), ), sst_to_worker, + new_table_fragment_info, ); (info, progresses) } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 6ecb89aeeb370..3fb1fd0b77f10 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -21,7 +21,6 @@ use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; -use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::State; @@ -47,7 +46,7 @@ use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; -use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetadataManager, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; @@ -91,41 +90,19 @@ impl GlobalBarrierManagerContext { .collect(); debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids); - let unregister_table_ids = mgr + let _unregister_table_ids = mgr .fragment_manager .drop_table_fragments_vec(&to_drop_streaming_ids) .await?; - // unregister compaction group for dirty table fragments. - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - unregister_table_ids, - )) - .await; - // clean up source connector dirty changes. self.source_manager .drop_source_fragments(&to_drop_table_fragments) .await; } MetadataManager::V2(mgr) => { - let ReleaseContext { - state_table_ids, - source_ids, - .. - } = mgr.catalog_controller.clean_dirty_creating_jobs().await?; - - // unregister compaction group for cleaned state tables. - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - state_table_ids - .into_iter() - .map(|id| id as StateTableId) - .collect_vec(), - )) - .await; + let ReleaseContext { source_ids, .. } = + mgr.catalog_controller.clean_dirty_creating_jobs().await?; // unregister cleaned sources. self.source_manager @@ -137,6 +114,27 @@ impl GlobalBarrierManagerContext { Ok(()) } + async fn purge_state_table_from_hummock(&self) -> MetaResult<()> { + let all_state_table_ids = match &self.metadata_manager { + MetadataManager::V1(mgr) => mgr + .catalog_manager + .list_tables() + .await + .into_iter() + .map(|t| t.id) + .collect_vec(), + MetadataManager::V2(mgr) => mgr + .catalog_controller + .list_all_state_table_ids() + .await? + .into_iter() + .map(|id| id as u32) + .collect_vec(), + }; + self.hummock_manager.purge(&all_state_table_ids).await?; + Ok(()) + } + async fn recover_background_mv_progress(&self) -> MetaResult<()> { match &self.metadata_manager { MetadataManager::V1(_) => self.recover_background_mv_progress_v1().await, @@ -317,27 +315,30 @@ impl GlobalBarrierManagerContext { let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.metadata_manager { + let unregister_table_ids = match &self.metadata_manager { MetadataManager::V1(mgr) => { - let unregister_table_ids = mgr - .fragment_manager + mgr.fragment_manager .drop_table_fragments_vec(&cancelled) - .await?; - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - unregister_table_ids, - )) - .await; + .await? } MetadataManager::V2(mgr) => { + let mut unregister_table_ids = Vec::new(); for job_id in cancelled { - mgr.catalog_controller + let (_, table_ids_to_unregister) = mgr + .catalog_controller .try_abort_creating_streaming_job(job_id.table_id as _, true) .await?; + unregister_table_ids.extend(table_ids_to_unregister); } + unregister_table_ids + .into_iter() + .map(|table_id| table_id as u32) + .collect() } - } + }; + self.hummock_manager + .unregister_table_ids(&unregister_table_ids) + .await?; } Ok(applied) } @@ -364,22 +365,8 @@ impl GlobalBarrierManager { .abort_and_mark_blocked("cluster is under recovering"); tracing::info!("recovery start!"); - self.context - .clean_dirty_streaming_jobs() - .await - .expect("clean dirty streaming jobs"); - - self.context.sink_manager.reset().await; let retry_strategy = Self::get_retry_strategy(); - // Mview progress needs to be recovered. - tracing::info!("recovering mview progress"); - self.context - .recover_background_mv_progress() - .await - .expect("recover mview progress should not fail"); - tracing::info!("recovered mview progress"); - // We take retry into consideration because this is the latency user sees for a cluster to // get recovered. let recovery_timer = self.context.metrics.recovery_latency.start_timer(); @@ -387,6 +374,24 @@ impl GlobalBarrierManager { let new_state = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { + self.context + .clean_dirty_streaming_jobs() + .await + .context("clean dirty streaming jobs")?; + + self.context + .purge_state_table_from_hummock() + .await + .context("purge state table from hummock")?; + + // Mview progress needs to be recovered. + tracing::info!("recovering mview progress"); + self.context + .recover_background_mv_progress() + .await + .context("recover mview progress should not fail")?; + tracing::info!("recovered mview progress"); + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. let _ = self .context @@ -449,6 +454,8 @@ impl GlobalBarrierManager { warn!(error = %err.as_report(), "reset compute nodes failed"); })?; + self.context.sink_manager.reset().await; + if self .context .pre_apply_drop_cancel(&self.scheduled_barriers) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 662a1006baae9..af0badc0710d0 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -90,7 +90,7 @@ impl ScheduledQueue { if let QueueStatus::Blocked(reason) = &self.status && !matches!( scheduled.command, - Command::DropStreamingJobs(_) | Command::CancelStreamingJob(_) + Command::DropStreamingJobs { .. } | Command::CancelStreamingJob(_) ) { return Err(MetaError::unavailable(reason)); @@ -423,15 +423,15 @@ impl ScheduledBarriers { pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { let mut queue = self.inner.queue.lock(); assert_matches!(queue.status, QueueStatus::Blocked(_)); - let (mut drop_table_ids, mut cancel_table_ids) = (vec![], HashSet::new()); + let (mut dropped_actors, mut cancel_table_ids) = (vec![], HashSet::new()); while let Some(Scheduled { notifiers, command, .. }) = queue.queue.pop_front() { match command { - Command::DropStreamingJobs(actor_ids) => { - drop_table_ids.extend(actor_ids); + Command::DropStreamingJobs { actors, .. } => { + dropped_actors.extend(actors); } Command::CancelStreamingJob(table_fragments) => { let table_id = table_fragments.table_id(); @@ -446,7 +446,7 @@ impl ScheduledBarriers { notify.notify_finished(); }); } - (drop_table_ids, cancel_table_ids) + (dropped_actors, cancel_table_ids) } /// Whether the barrier(checkpoint = true) should be injected. diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 7429b6098613f..d2d89dab586e0 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -83,6 +83,7 @@ pub struct CatalogController { #[derive(Clone, Default)] pub struct ReleaseContext { + pub(crate) streaming_job_ids: Vec, /// Dropped state table list, need to unregister from hummock. pub(crate) state_table_ids: Vec, /// Dropped source list, need to unregister from source manager. @@ -274,7 +275,7 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; @@ -299,6 +300,7 @@ impl CatalogController { .await; Ok(( ReleaseContext { + streaming_job_ids: streaming_jobs, state_table_ids, source_ids, connections, @@ -2051,7 +2053,8 @@ impl CatalogController { let (source_fragments, removed_actors) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs).await?; + let fragment_mappings = + get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; // Find affect users with privileges on all this objects. let to_update_user_ids: Vec = UserPrivilege::find() @@ -2155,6 +2158,7 @@ impl CatalogController { Ok(( ReleaseContext { + streaming_job_ids: to_drop_streaming_jobs, state_table_ids: to_drop_state_table_ids, source_ids: to_drop_source_ids, connections: vec![], diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index a44f7273694b7..2b05f5f54ef2e 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -395,7 +395,7 @@ impl CatalogController { &self, job_id: ObjectId, is_cancelled: bool, - ) -> MetaResult { + ) -> MetaResult<(bool, Vec)> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -405,7 +405,7 @@ impl CatalogController { id = job_id, "streaming job not found when aborting creating, might be cleaned by recovery" ); - return Ok(true); + return Ok((true, Vec::new())); } if !is_cancelled { @@ -420,7 +420,7 @@ impl CatalogController { id = job_id, "streaming job is created in background and still in creating status" ); - return Ok(false); + return Ok((false, Vec::new())); } } } @@ -433,6 +433,13 @@ impl CatalogController { .all(&txn) .await?; + let mv_table_id: Option = Table::find_by_id(job_id) + .select_only() + .column(table::Column::TableId) + .into_tuple() + .one(&txn) + .await?; + let associated_source_id: Option = Table::find_by_id(job_id) .select_only() .column(table::Column::OptionalAssociatedSourceId) @@ -444,7 +451,7 @@ impl CatalogController { Object::delete_by_id(job_id).exec(&txn).await?; if !internal_table_ids.is_empty() { Object::delete_many() - .filter(object::Column::Oid.is_in(internal_table_ids)) + .filter(object::Column::Oid.is_in(internal_table_ids.iter().cloned())) .exec(&txn) .await?; } @@ -453,7 +460,11 @@ impl CatalogController { } txn.commit().await?; - Ok(true) + let mut state_table_ids = internal_table_ids; + + state_table_ids.extend(mv_table_id.into_iter()); + + Ok((true, state_table_ids)) } pub async fn post_collect_table_fragments( diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 5665ef5bc9973..e7e737405fc37 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -56,7 +56,6 @@ use crate::model::{ BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, ValTransaction, }; use crate::storage::MetaStore; -use crate::stream::CreateStreamingJobOption; impl HummockManager { pub(super) async fn build_compaction_group_manager( @@ -102,12 +101,12 @@ impl HummockManager { .clone() } + #[cfg(test)] /// Registers `table_fragments` to compaction groups. pub async fn register_table_fragments( &self, mv_table: Option, mut internal_tables: Vec, - create_stream_job_option: CreateStreamingJobOption, ) -> Result> { let mut pairs = vec![]; if let Some(mv_table) = mv_table { @@ -117,22 +116,14 @@ impl HummockManager { // materialized_view pairs.push(( mv_table, - if create_stream_job_option.new_independent_compaction_group { - CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) - } else { - CompactionGroupId::from(StaticCompactionGroupId::MaterializedView) - }, + CompactionGroupId::from(StaticCompactionGroupId::MaterializedView), )); } // internal states for table_id in internal_tables { pairs.push(( table_id, - if create_stream_job_option.new_independent_compaction_group { - CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) - } else { - CompactionGroupId::from(StaticCompactionGroupId::StateDefault) - }, + CompactionGroupId::from(StaticCompactionGroupId::StateDefault), )); } self.register_table_ids(&pairs).await?; @@ -158,7 +149,7 @@ impl HummockManager { /// The caller should ensure `table_fragments_list` remain unchanged during `purge`. /// Currently `purge` is only called during meta service start ups. #[named] - pub async fn purge(&self, valid_ids: &[u32]) { + pub async fn purge(&self, valid_ids: &[u32]) -> Result<()> { let registered_members = get_member_table_ids(&read_lock!(self, versioning).await.current_version); let to_unregister = registered_members @@ -167,7 +158,7 @@ impl HummockManager { .collect_vec(); // As we have released versioning lock, the version that `to_unregister` is calculated from // may not be the same as the one used in unregister_table_ids. It is OK. - self.unregister_table_ids_fail_fast(&to_unregister).await; + self.unregister_table_ids(&to_unregister).await } /// The implementation acquires `versioning` lock. @@ -271,7 +262,7 @@ impl HummockManager { } #[named] - async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> { + pub async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> { if table_ids.is_empty() { return Ok(()); } @@ -1001,7 +992,6 @@ mod tests { use crate::hummock::test_utils::setup_compute_env; use crate::hummock::HummockManager; use crate::model::TableFragments; - use crate::stream::CreateStreamingJobOption; #[tokio::test] async fn test_inner() { @@ -1103,9 +1093,6 @@ mod tests { .register_table_fragments( Some(table_fragment_1.table_id().table_id), table_fragment_1.internal_table_ids(), - CreateStreamingJobOption { - new_independent_compaction_group: false, - }, ) .await .unwrap(); @@ -1114,9 +1101,6 @@ mod tests { .register_table_fragments( Some(table_fragment_2.table_id().table_id), table_fragment_2.internal_table_ids(), - CreateStreamingJobOption { - new_independent_compaction_group: false, - }, ) .await .unwrap(); @@ -1131,28 +1115,24 @@ mod tests { // Test purge_stale_members: table fragments compaction_group_manager .purge(&table_fragment_2.all_table_ids().collect_vec()) - .await; + .await + .unwrap(); assert_eq!(registered_number().await, 4); - compaction_group_manager.purge(&[]).await; + compaction_group_manager.purge(&[]).await.unwrap(); assert_eq!(registered_number().await, 0); - // Test `StaticCompactionGroupId::NewCompactionGroup` in `register_table_fragments` assert_eq!(group_number().await, 2); compaction_group_manager .register_table_fragments( Some(table_fragment_1.table_id().table_id), table_fragment_1.internal_table_ids(), - CreateStreamingJobOption { - new_independent_compaction_group: true, - }, ) .await .unwrap(); assert_eq!(registered_number().await, 4); - assert_eq!(group_number().await, 3); + assert_eq!(group_number().await, 2); - // Test `StaticCompactionGroupId::NewCompactionGroup` in `unregister_table_fragments` compaction_group_manager .unregister_table_fragments_vec(&[table_fragment_1]) .await; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index c889ac01d17a0..f1639ea37210c 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,6 +29,7 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; +use risingwave_common::catalog::TableId; use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; @@ -37,8 +38,7 @@ use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, - get_table_compaction_group_id_mapping, try_get_compaction_group_id_by_table_id, - BranchedSstInfo, HummockLevelsExt, + get_table_compaction_group_id_mapping, BranchedSstInfo, HummockLevelsExt, }; use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{ @@ -60,9 +60,10 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{ Event as ResponseEvent, PullTaskAck, }; use risingwave_pb::hummock::{ - CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, - HummockPinnedVersion, HummockSnapshot, HummockVersionStats, IntraLevelDelta, - PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableSchema, + CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, GroupMetaChange, + HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, + IntraLevelDelta, PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, + TableOption, TableSchema, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use rw_futures_util::{pending_on_none, select_all}; @@ -91,7 +92,7 @@ use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; #[cfg(any(test, feature = "test"))] use crate::manager::{ClusterManagerRef, FragmentManagerRef}; -use crate::manager::{MetaSrvEnv, MetadataManager, TableId, META_NODE_ID}; +use crate::manager::{MetaSrvEnv, MetadataManager, META_NODE_ID}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction, BTreeMapTransactionWrapper, ClusterId, MetadataModel, MetadataModelError, ValTransaction, @@ -169,7 +170,7 @@ pub struct HummockManager { // 2. When partition count <=1, compactor will still use table_id as the cutting boundary of sst // 3. Modify the special configuration item hybrid_vnode_count = 0 to remove the table_id in hybrid cg and no longer perform alignment cutting. group_to_table_vnode_partition: - parking_lot::RwLock>>, + parking_lot::RwLock>>, } pub type HummockManagerRef = Arc; @@ -242,22 +243,41 @@ pub static CANCEL_STATUS_SET: LazyLock> = LazyLock::new(|| { .collect() }); +#[derive(Debug, Clone)] +pub struct NewTableFragmentInfo { + pub table_id: TableId, + pub mv_table_id: Option, + pub internal_table_ids: Vec, +} + +impl NewTableFragmentInfo { + pub fn state_table_ids(&self) -> impl Iterator + '_ { + self.mv_table_id + .iter() + .chain(self.internal_table_ids.iter()) + .cloned() + } +} + pub struct CommitEpochInfo { pub sstables: Vec, - pub new_table_watermarks: HashMap, + pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, + pub new_table_fragment_info: Option, } impl CommitEpochInfo { pub fn new( sstables: Vec, - new_table_watermarks: HashMap, + new_table_watermarks: HashMap, sst_to_context: HashMap, + new_table_fragment_info: Option, ) -> Self { Self { sstables, new_table_watermarks, sst_to_context, + new_table_fragment_info, } } @@ -270,6 +290,7 @@ impl CommitEpochInfo { sstables.into_iter().map(Into::into).collect(), HashMap::new(), sst_to_context, + None, ) } } @@ -1566,6 +1587,7 @@ impl HummockManager { mut sstables, new_table_watermarks, sst_to_context, + new_table_fragment_info, } = commit_info; let mut versioning_guard = write_lock!(self, versioning).await; let _timer = start_measure_real_process_timer!(self); @@ -1589,7 +1611,7 @@ impl HummockManager { add_prost_table_stats_map(&mut table_stats_change, &std::mem::take(&mut s.table_stats)); } - let old_version = &versioning.current_version; + let old_version: &HummockVersion = &versioning.current_version; let mut new_version_delta = create_trx_wrapper!( self.sql_meta_store(), BTreeMapEntryTransactionWrapper, @@ -1601,8 +1623,78 @@ impl HummockManager { ); new_version_delta.max_committed_epoch = epoch; new_version_delta.new_table_watermarks = new_table_watermarks; - let mut new_hummock_version = old_version.clone(); - new_hummock_version.id = new_version_delta.id; + + let mut table_compaction_group_mapping = old_version.build_compaction_group_info(); + + // Add new table + if let Some(new_fragment_table_info) = new_table_fragment_info { + if !new_fragment_table_info.internal_table_ids.is_empty() { + if let Some(levels) = old_version + .levels + .get(&(StaticCompactionGroupId::StateDefault as u64)) + { + for table_id in &new_fragment_table_info.internal_table_ids { + if levels.member_table_ids.contains(&table_id.table_id) { + return Err(Error::CompactionGroup(format!( + "table {} already in group {}", + table_id, + StaticCompactionGroupId::StateDefault as u64 + ))); + } + } + } + + let group_deltas = &mut new_version_delta + .group_deltas + .entry(StaticCompactionGroupId::StateDefault as u64) + .or_default() + .group_deltas; + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { + table_ids_add: new_fragment_table_info + .internal_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), + ..Default::default() + })), + }); + + for table_id in &new_fragment_table_info.internal_table_ids { + table_compaction_group_mapping + .insert(*table_id, StaticCompactionGroupId::StateDefault as u64); + } + } + + if let Some(table_id) = new_fragment_table_info.mv_table_id { + if let Some(levels) = old_version + .levels + .get(&(StaticCompactionGroupId::MaterializedView as u64)) + { + if levels.member_table_ids.contains(&table_id.table_id) { + return Err(Error::CompactionGroup(format!( + "table {} already in group {}", + table_id, + StaticCompactionGroupId::MaterializedView as u64 + ))); + } + } + let group_deltas = &mut new_version_delta + .group_deltas + .entry(StaticCompactionGroupId::MaterializedView as u64) + .or_default() + .group_deltas; + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { + table_ids_add: vec![table_id.table_id], + ..Default::default() + })), + }); + let _ = table_compaction_group_mapping + .insert(table_id, StaticCompactionGroupId::MaterializedView as u64); + } + } + let mut incorrect_ssts = vec![]; let mut new_sst_id_number = 0; for ExtendedSstableInfo { @@ -1622,13 +1714,10 @@ impl HummockManager { if !is_sst_belong_to_group_declared { let mut group_table_ids: BTreeMap<_, Vec> = BTreeMap::new(); for table_id in sst.get_table_ids() { - match try_get_compaction_group_id_by_table_id( - &versioning.current_version, - *table_id, - ) { + match table_compaction_group_mapping.get(&TableId::new(*table_id)) { Some(compaction_group_id) => { group_table_ids - .entry(compaction_group_id) + .entry(*compaction_group_id) .or_default() .push(*table_id); } @@ -1729,6 +1818,7 @@ impl HummockManager { group_deltas.push(group_delta); } + let mut new_hummock_version = old_version.clone(); // Create a new_version, possibly merely to bump up the version id and max_committed_epoch. new_hummock_version.apply_version_delta(new_version_delta.deref()); @@ -3255,11 +3345,11 @@ pub enum TableAlignRule { // The table_id is not optimized for alignment. NoOptimization, // Move the table_id to a separate compaction group. Currently, the system only supports separate compaction with one table. - SplitToDedicatedCg((CompactionGroupId, BTreeMap)), + SplitToDedicatedCg((CompactionGroupId, BTreeMap)), // In the current group, partition the table's data according to the granularity of the vnode. - SplitByVnode((TableId, u32)), + SplitByVnode((StateTableId, u32)), // In the current group, partition the table's data at the granularity of the table. - SplitByTable(TableId), + SplitByTable(StateTableId), } fn drop_sst( diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index 7f29acd23e76b..838693e99ede7 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -92,27 +92,21 @@ impl HummockManager { } async fn handle_local_notification(&self, notification: LocalNotification) { - match notification { - LocalNotification::WorkerNodeDeleted(worker_node) => { - if worker_node.get_type().unwrap() == WorkerType::Compactor { - self.compactor_manager.remove_compactor(worker_node.id); - } - self.release_contexts(vec![worker_node.id]) - .await - .unwrap_or_else(|err| { - panic!( - "Failed to release hummock context {}, error={}", - worker_node.id, - err.as_report() - ) - }); - tracing::info!("Released hummock context {}", worker_node.id); - sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); - } - LocalNotification::UnregisterTablesFromHummock(table_ids) => { - self.unregister_table_ids_fail_fast(&table_ids).await; + if let LocalNotification::WorkerNodeDeleted(worker_node) = notification { + if worker_node.get_type().unwrap() == WorkerType::Compactor { + self.compactor_manager.remove_compactor(worker_node.id); } - _ => {} + self.release_contexts(vec![worker_node.id]) + .await + .unwrap_or_else(|err| { + panic!( + "Failed to release hummock context {}, error={}", + worker_node.id, + err.as_report() + ) + }); + tracing::info!("Released hummock context {}", worker_node.id); + sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); } } } diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 65f115e08d332..74c760e25776a 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -111,6 +111,7 @@ impl MockHummockMetaClient { sstables.into_iter().map(Into::into).collect(), new_table_watermarks, sst_to_worker, + None, ), ) .await diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index e3e8ec6b629f4..166f35655aa3c 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -48,7 +48,6 @@ pub enum LocalNotification { SystemParamsChange(SystemParamsReader), FragmentMappingsUpsert(Vec), FragmentMappingsDelete(Vec), - UnregisterTablesFromHummock(Vec), } #[derive(Debug)] diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index ef55f78493f85..fb78c5f9ca112 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -571,6 +571,19 @@ impl TableFragments { actor_map } + pub fn mv_table_id(&self) -> Option { + if self + .fragments + .values() + .flat_map(|f| f.state_table_ids.iter()) + .any(|table_id| *table_id == self.table_id.table_id) + { + Some(self.table_id.table_id) + } else { + None + } + } + /// Returns the internal table ids without the mview table. pub fn internal_table_ids(&self) -> Vec { self.fragments diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3921a3222eb26..006ca20839902 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1469,10 +1469,7 @@ impl DdlController { create_type: stream_job.create_type(), ddl_type: stream_job.into(), replace_table_job_info, - // TODO: https://github.com/risingwavelabs/risingwave/issues/14793 - option: CreateStreamingJobOption { - new_independent_compaction_group: false, - }, + option: CreateStreamingJobOption {}, }; // 4. Mark tables as creating, including internal tables and the table of the stream job. diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0ad4b78cdefd7..46838e596fdef 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -104,7 +104,7 @@ impl DdlController { self.env.event_log_manager_ref().add_event_logs(vec![ risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), ]); - let aborted = mgr + let (aborted, _) = mgr .catalog_controller .try_abort_creating_streaming_job(job_id as _, false) .await?; @@ -389,6 +389,7 @@ impl DdlController { } let ReleaseContext { + streaming_job_ids, state_table_ids, source_ids, connections, @@ -431,7 +432,8 @@ impl DdlController { self.stream_manager .drop_streaming_jobs_v2( removed_actors.into_iter().map(|id| id as _).collect(), - state_table_ids.into_iter().map(|id| id as _).collect(), + streaming_job_ids, + state_table_ids, ) .await; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index fa16b039236b6..a5efe5ef2fd8f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -15,10 +15,10 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; -use futures::future::{join_all, BoxFuture}; +use futures::future::join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::compaction_group::StateTableId; +use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; @@ -29,8 +29,7 @@ use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; -use crate::hummock::HummockManagerRef; -use crate::manager::{DdlType, LocalNotification, MetaSrvEnv, MetadataManager, StreamingJob}; +use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; use crate::model::{ActorId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -39,7 +38,7 @@ pub type GlobalStreamManagerRef = Arc; #[derive(Default)] pub struct CreateStreamingJobOption { - pub new_independent_compaction_group: bool, + // leave empty as a place holder for future option if there is any } /// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job. @@ -190,8 +189,6 @@ pub struct GlobalStreamManager { /// Creating streaming job info. creating_job_info: CreatingStreamingJobInfoRef, - hummock_manager: HummockManagerRef, - pub scale_controller: ScaleControllerRef, pub stream_rpc_manager: StreamRpcManager, @@ -203,7 +200,6 @@ impl GlobalStreamManager { metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler, source_manager: SourceManagerRef, - hummock_manager: HummockManagerRef, stream_rpc_manager: StreamRpcManager, scale_controller: ScaleControllerRef, ) -> MetaResult { @@ -212,7 +208,6 @@ impl GlobalStreamManager { metadata_manager, barrier_scheduler, source_manager, - hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), scale_controller, stream_rpc_manager, @@ -239,9 +234,8 @@ impl GlobalStreamManager { let stream_manager = self.clone(); let fut = async move { - let mut revert_funcs = vec![]; let res = stream_manager - .create_streaming_job_impl(&mut revert_funcs, table_fragments, ctx) + .create_streaming_job_impl( table_fragments, ctx) .await; match res { Ok(_) => { @@ -251,9 +245,6 @@ impl GlobalStreamManager { .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}")); } Err(err) => { - for revert_func in revert_funcs.into_iter().rev() { - revert_func.await; - } let _ = sender .send(CreatingState::Failed { reason: err.clone(), @@ -387,7 +378,6 @@ impl GlobalStreamManager { async fn create_streaming_job_impl( &self, - revert_funcs: &mut Vec>, table_fragments: TableFragments, CreateStreamingJobContext { dispatchers, @@ -395,41 +385,15 @@ impl GlobalStreamManager { building_locations, existing_locations, definition, - mv_table_id, - internal_tables, create_type, ddl_type, replace_table_job_info, - option, + .. }: CreateStreamingJobContext, ) -> MetaResult<()> { let mut replace_table_command = None; let mut replace_table_id = None; - // Register to compaction group beforehand. - let hummock_manager_ref = self.hummock_manager.clone(); - let registered_table_ids = hummock_manager_ref - .register_table_fragments( - mv_table_id, - internal_tables.keys().copied().collect(), - option, - ) - .await?; - debug_assert_eq!( - registered_table_ids.len(), - table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1) - ); - let notification_manager_ref = self.env.notification_manager_ref(); - revert_funcs.push(Box::pin(async move { - if create_type == CreateType::Foreground { - notification_manager_ref - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - registered_table_ids, - )) - .await; - } - })); - self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; @@ -560,23 +524,30 @@ impl GlobalStreamManager { pub async fn drop_streaming_jobs_v2( &self, removed_actors: Vec, - state_table_ids: Vec, + streaming_job_ids: Vec, + state_table_ids: Vec, ) { - if !removed_actors.is_empty() { + if !removed_actors.is_empty() + || !streaming_job_ids.is_empty() + || !state_table_ids.is_empty() + { let _ = self .barrier_scheduler - .run_command(Command::DropStreamingJobs(removed_actors)) + .run_command(Command::DropStreamingJobs { + actors: removed_actors, + unregistered_table_fragment_ids: streaming_job_ids + .into_iter() + .map(|job_id| TableId::new(job_id as _)) + .collect(), + unregistered_state_table_ids: state_table_ids + .into_iter() + .map(|table_id| TableId::new(table_id as _)) + .collect(), + }) .await .inspect_err(|err| { tracing::error!(error = ?err.as_report(), "failed to run drop command"); }); - - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - state_table_ids, - )) - .await; } } @@ -594,7 +565,7 @@ impl GlobalStreamManager { // Drop table fragments directly. let unregister_table_ids = mgr .fragment_manager - .drop_table_fragments_vec(&table_ids.into_iter().collect()) + .drop_table_fragments_vec(&table_ids.iter().cloned().collect()) .await?; // Issues a drop barrier command. @@ -604,20 +575,19 @@ impl GlobalStreamManager { .collect_vec(); let _ = self .barrier_scheduler - .run_command(Command::DropStreamingJobs(dropped_actors)) + .run_command(Command::DropStreamingJobs { + actors: dropped_actors, + unregistered_table_fragment_ids: table_ids.into_iter().collect(), + unregistered_state_table_ids: unregister_table_ids + .into_iter() + .map(TableId::new) + .collect(), + }) .await .inspect_err(|err| { tracing::error!(error = ?err.as_report(), "failed to run drop command"); }); - // Unregister from compaction group afterwards. - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( - unregister_table_ids, - )) - .await; - Ok(()) } @@ -1025,7 +995,7 @@ mod tests { scheduled_barriers, env.clone(), metadata_manager.clone(), - hummock_manager.clone(), + hummock_manager, source_manager.clone(), sink_manager, meta_metrics.clone(), @@ -1038,7 +1008,6 @@ mod tests { metadata_manager, barrier_scheduler.clone(), source_manager.clone(), - hummock_manager, stream_rpc_manager, scale_controller.clone(), )?;