From 2936f9fd7c40984d84a556e7d8206d7703bbfdac Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sun, 18 Feb 2024 19:40:27 +0800 Subject: [PATCH] Refactor Update in command.rs; Update replace_table in ddl_cont_v2 --- src/meta/src/barrier/command.rs | 6 ++---- src/meta/src/rpc/ddl_controller_v2.rs | 16 ++++++++++++---- src/meta/src/stream/stream_graph/schedule.rs | 1 - src/meta/src/stream/stream_manager.rs | 1 - 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index fb96900c5e980..07765fe840c38 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -648,15 +648,13 @@ impl CommandContext { .flat_map(build_actor_connector_splits) .collect(); - let mutation = Mutation::Update(UpdateMutation { + Some(Mutation::Update(UpdateMutation { actor_new_dispatchers, merge_update: merge_updates.to_owned(), dropped_actors, actor_splits, ..Default::default() - }); - - Some(mutation) + })) } /// Returns the paused reason after executing the current command. diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 64b11196e6292..126d040997a6b 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -197,7 +197,15 @@ impl DdlController { let stream_job_id = streaming_job.id(); match streaming_job.create_type() { CreateType::Unspecified | CreateType::Foreground => { - let replace_table_job_info = ctx.replace_table_job_info.clone(); + let replace_table_job_info = ctx.replace_table_job_info.as_ref().map( + |(streaming_job, ctx, table_fragments)| { + ( + streaming_job.clone(), + ctx.merge_updates.clone(), + table_fragments.table_id(), + ) + }, + ); self.stream_manager .create_streaming_job(table_fragments, ctx) @@ -208,13 +216,13 @@ impl DdlController { .finish_streaming_job(stream_job_id as _) .await?; - if let Some((streaming_job, ctx, table_fragments)) = replace_table_job_info { + if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info { version = mgr .catalog_controller .finish_replace_streaming_job( - table_fragments.table_id().table_id as _, + table_id.table_id as _, streaming_job, - ctx.merge_updates.clone(), + merge_updates, None, Some(stream_job_id), None, diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 02a4c3847304a..ed2dac5be0e06 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -326,7 +326,6 @@ impl Scheduler { /// [`Locations`] represents the parallel unit and worker locations of the actors. #[cfg_attr(test, derive(Default))] -#[derive(Clone)] pub struct Locations { /// actor location map. pub actor_locations: BTreeMap, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 263b7f4fdbad4..27d517c5a4e39 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -155,7 +155,6 @@ impl CreatingStreamingJobInfo { type CreatingStreamingJobInfoRef = Arc; -#[derive(Clone)] /// [`ReplaceTableContext`] carries one-time infos for replacing the plan of an existing table. /// /// Note: for better readability, keep this struct complete and immutable once created.