Skip to content

Commit

Permalink
Refactor Update in command.rs; Update replace_table in ddl_cont_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 18, 2024
1 parent 055e45c commit 2936f9f
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
6 changes: 2 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,13 @@ impl CommandContext {
.flat_map(build_actor_connector_splits)
.collect();

let mutation = Mutation::Update(UpdateMutation {
Some(Mutation::Update(UpdateMutation {
actor_new_dispatchers,
merge_update: merge_updates.to_owned(),
dropped_actors,
actor_splits,
..Default::default()
});

Some(mutation)
}))
}

/// Returns the paused reason after executing the current command.
Expand Down
16 changes: 12 additions & 4 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,15 @@ impl DdlController {
let stream_job_id = streaming_job.id();
match streaming_job.create_type() {
CreateType::Unspecified | CreateType::Foreground => {
let replace_table_job_info = ctx.replace_table_job_info.clone();
let replace_table_job_info = ctx.replace_table_job_info.as_ref().map(
|(streaming_job, ctx, table_fragments)| {
(
streaming_job.clone(),
ctx.merge_updates.clone(),
table_fragments.table_id(),
)
},
);

self.stream_manager
.create_streaming_job(table_fragments, ctx)
Expand All @@ -208,13 +216,13 @@ impl DdlController {
.finish_streaming_job(stream_job_id as _)
.await?;

if let Some((streaming_job, ctx, table_fragments)) = replace_table_job_info {
if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info {
version = mgr
.catalog_controller
.finish_replace_streaming_job(
table_fragments.table_id().table_id as _,
table_id.table_id as _,
streaming_job,
ctx.merge_updates.clone(),
merge_updates,
None,
Some(stream_job_id),
None,
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ impl Scheduler {

/// [`Locations`] represents the parallel unit and worker locations of the actors.
#[cfg_attr(test, derive(Default))]
#[derive(Clone)]
pub struct Locations {
/// actor location map.
pub actor_locations: BTreeMap<ActorId, ParallelUnit>,
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl CreatingStreamingJobInfo {

type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;

#[derive(Clone)]
/// [`ReplaceTableContext`] carries one-time infos for replacing the plan of an existing table.
///
/// Note: for better readability, keep this struct complete and immutable once created.
Expand Down

0 comments on commit 2936f9f

Please sign in to comment.