From 075c90d7e3b2953283e188bd636b0ea05b268314 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 21 Nov 2024 16:17:12 +0800 Subject: [PATCH] refactor: rename dummy table id to tmp table id (#19509) --- src/meta/src/barrier/command.rs | 2 +- src/meta/src/controller/streaming_job.rs | 16 +++++------ src/meta/src/rpc/ddl_controller.rs | 36 ++++++++++-------------- src/meta/src/stream/stream_manager.rs | 23 ++++++++------- 4 files changed, 35 insertions(+), 42 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index d2dd3058544c4..f19feadabed56 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -101,7 +101,7 @@ pub struct ReplaceTablePlan { /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table` pub streaming_job: StreamingJob, /// The temporary dummy table fragments id of new table fragment - pub dummy_id: u32, + pub tmp_id: u32, } impl ReplaceTablePlan { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index a908704129c75..d5ee31efae246 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -910,13 +910,13 @@ impl CatalogController { Some(ReplaceTablePlan { streaming_job, merge_updates, - dummy_id, + tmp_id, .. }) => { let incoming_sink_id = job_id; let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner( - dummy_id as ObjectId, + tmp_id as ObjectId, merge_updates, None, Some(incoming_sink_id as _), @@ -965,7 +965,7 @@ impl CatalogController { pub async fn finish_replace_streaming_job( &self, - dummy_id: ObjectId, + tmp_id: ObjectId, streaming_job: StreamingJob, merge_updates: Vec, table_col_index_mapping: Option, @@ -977,7 +977,7 @@ impl CatalogController { let txn = inner.db.begin().await?; let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner( - dummy_id, + tmp_id, merge_updates, table_col_index_mapping, creating_sink_id, @@ -1008,7 +1008,7 @@ impl CatalogController { } pub async fn finish_replace_streaming_job_inner( - dummy_id: ObjectId, + tmp_id: ObjectId, merge_updates: Vec, table_col_index_mapping: Option, creating_sink_id: Option, @@ -1066,7 +1066,7 @@ impl CatalogController { fragment::Column::FragmentId, fragment::Column::StateTableIds, ]) - .filter(fragment::Column::JobId.eq(dummy_id)) + .filter(fragment::Column::JobId.eq(tmp_id)) .into_tuple() .all(txn) .await?; @@ -1091,7 +1091,7 @@ impl CatalogController { .await?; Fragment::update_many() .col_expr(fragment::Column::JobId, SimpleExpr::from(job_id)) - .filter(fragment::Column::JobId.eq(dummy_id)) + .filter(fragment::Column::JobId.eq(tmp_id)) .exec(txn) .await?; @@ -1190,7 +1190,7 @@ impl CatalogController { } // 3. remove dummy object. - Object::delete_by_id(dummy_id).exec(txn).await?; + Object::delete_by_id(tmp_id).exec(txn).await?; // 4. update catalogs and notify. let mut relations = vec![]; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8ef2ec8dc4c96..5b6b7033719c4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -659,7 +659,7 @@ impl DdlController { // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. pub(crate) async fn inject_replace_table_job_for_table_sink( &self, - dummy_id: u32, + tmp_id: u32, mgr: &MetadataManager, stream_ctx: StreamContext, sink: Option<&Sink>, @@ -669,13 +669,7 @@ impl DdlController { fragment_graph: StreamFragmentGraph, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let (mut replace_table_ctx, mut table_fragments) = self - .build_replace_table( - stream_ctx, - streaming_job, - fragment_graph, - None, - dummy_id as _, - ) + .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) .await?; let mut union_fragment_id = None; @@ -1192,7 +1186,7 @@ impl DdlController { let table = streaming_job.table().unwrap(); tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); - let dummy_id = self + let tmp_id = self .metadata_manager .catalog_controller .create_job_catalog_for_replace( @@ -1206,7 +1200,7 @@ impl DdlController { let (ctx, table_fragments) = self .inject_replace_table_job_for_table_sink( - dummy_id, + tmp_id, &self.metadata_manager, stream_ctx, None, @@ -1238,7 +1232,7 @@ impl DdlController { .metadata_manager .catalog_controller .finish_replace_streaming_job( - dummy_id as _, + tmp_id as _, streaming_job, merge_updates, None, @@ -1253,7 +1247,7 @@ impl DdlController { tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table"); let _ = self.metadata_manager .catalog_controller - .try_abort_replacing_streaming_job(dummy_id as _) + .try_abort_replacing_streaming_job(tmp_id as _) .await .inspect_err(|err| { tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table"); @@ -1340,7 +1334,7 @@ impl DdlController { let StreamingJob::Table(_, table, ..) = &streaming_job else { unreachable!("unexpected job: {streaming_job:?}") }; - let dummy_id = self + let tmp_id = self .metadata_manager .catalog_controller .create_job_catalog_for_replace( @@ -1362,7 +1356,7 @@ impl DdlController { &streaming_job, fragment_graph, table_col_index_mapping.clone(), - dummy_id as _, + tmp_id as _, ) .await?; @@ -1437,7 +1431,7 @@ impl DdlController { .metadata_manager .catalog_controller .finish_replace_streaming_job( - dummy_id, + tmp_id, streaming_job, merge_updates, table_col_index_mapping, @@ -1452,7 +1446,7 @@ impl DdlController { tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); let _ = self.metadata_manager .catalog_controller - .try_abort_replacing_streaming_job(dummy_id) + .try_abort_replacing_streaming_job(tmp_id) .await.inspect_err(|err| { tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); }); @@ -1651,7 +1645,7 @@ impl DdlController { }; let table = streaming_job.table().unwrap(); - let dummy_id = self + let tmp_id = self .metadata_manager .catalog_controller .create_job_catalog_for_replace( @@ -1665,7 +1659,7 @@ impl DdlController { let (context, table_fragments) = self .inject_replace_table_job_for_table_sink( - dummy_id, + tmp_id, &self.metadata_manager, stream_ctx, Some(s), @@ -1718,7 +1712,7 @@ impl DdlController { stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, - dummy_table_id: TableId, + tmp_table_id: TableId, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); let expr_context = stream_ctx.to_expr_context(); @@ -1828,7 +1822,7 @@ impl DdlController { // the context that contains all information needed for building the actors on the compute // nodes. let table_fragments = TableFragments::new( - (dummy_table_id as u32).into(), + (tmp_table_id as u32).into(), graph, &building_locations.actor_locations, stream_ctx, @@ -1846,7 +1840,7 @@ impl DdlController { building_locations, existing_locations, streaming_job: stream_job.clone(), - dummy_id: dummy_table_id as _, + tmp_id: tmp_table_id as _, }; Ok((ctx, table_fragments)) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d15a73ecfa9c3..2d7ab47a7c784 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -185,7 +185,7 @@ pub struct ReplaceTableContext { pub streaming_job: StreamingJob, - pub dummy_id: u32, + pub tmp_id: u32, } /// `GlobalStreamManager` manages all the streams in the system. @@ -354,11 +354,10 @@ impl GlobalStreamManager { .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; - let dummy_table_id = table_fragments.table_id(); - let init_split_assignment = - self.source_manager.allocate_splits(&dummy_table_id).await?; + let tmp_table_id = table_fragments.table_id(); + let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; - replace_table_id = Some(dummy_table_id); + replace_table_id = Some(tmp_table_id); replace_table_command = Some(ReplaceTablePlan { old_table_fragments: context.old_table_fragments, @@ -367,7 +366,7 @@ impl GlobalStreamManager { dispatchers: context.dispatchers, init_split_assignment, streaming_job, - dummy_id: dummy_table_id.table_id, + tmp_id: tmp_table_id.table_id, }); } @@ -447,8 +446,8 @@ impl GlobalStreamManager { if create_type == CreateType::Foreground || err.is_cancelled() { let mut table_ids: HashSet = HashSet::from_iter(std::iter::once(table_id)); - if let Some(dummy_table_id) = replace_table_id { - table_ids.insert(dummy_table_id); + if let Some(tmp_table_id) = replace_table_id { + table_ids.insert(tmp_table_id); } } @@ -465,13 +464,13 @@ impl GlobalStreamManager { old_table_fragments, merge_updates, dispatchers, - dummy_id, + tmp_id, streaming_job, .. }: ReplaceTableContext, ) -> MetaResult<()> { - let dummy_table_id = table_fragments.table_id(); - let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; + let tmp_table_id = table_fragments.table_id(); + let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; self.barrier_scheduler .run_config_change_command_with_pause( @@ -482,7 +481,7 @@ impl GlobalStreamManager { merge_updates, dispatchers, init_split_assignment, - dummy_id, + tmp_id, streaming_job, }), )