Skip to content

Commit

Permalink
update more
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 22, 2024
1 parent b3a88dd commit 27fd6a2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
12 changes: 6 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ impl Command {
}));

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
old_fragments: old_table_fragments,
old_fragments,
new_fragments: _,
merge_updates,
dispatchers,
Expand All @@ -699,7 +699,7 @@ impl Command {
{
// TODO: support in v2.
let update = Self::generate_update_mutation_for_replace_table(
old_table_fragments,
old_fragments,
merge_updates,
dispatchers,
init_split_assignment,
Expand Down Expand Up @@ -732,13 +732,13 @@ impl Command {
}

Command::ReplaceTable(ReplaceTablePlan {
old_fragments: old_table_fragments,
old_fragments,
merge_updates,
dispatchers,
init_split_assignment,
..
}) => Self::generate_update_mutation_for_replace_table(
old_table_fragments,
old_fragments,
merge_updates,
dispatchers,
init_split_assignment,
Expand Down Expand Up @@ -951,12 +951,12 @@ impl Command {
}

fn generate_update_mutation_for_replace_table(
old_table_fragments: &StreamJobFragments,
old_fragments: &StreamJobFragments,
merge_updates: &[MergeUpdate],
dispatchers: &HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: &SplitAssignment,
) -> Option<Mutation> {
let dropped_actors = old_table_fragments.actor_ids();
let dropped_actors = old_fragments.actor_ids();

let actor_new_dispatchers = dispatchers
.iter()
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,11 +1717,11 @@ impl DdlController {
let id = stream_job.id();
let expr_context = stream_ctx.to_expr_context();

let old_table_fragments = self
let old_fragments = self
.metadata_manager
.get_job_fragments_by_id(&id.into())
.await?;
let old_internal_table_ids = old_table_fragments.internal_table_ids();
let old_internal_table_ids = old_fragments.internal_table_ids();
let old_internal_tables = self
.metadata_manager
.get_table_catalog_by_ids(old_internal_table_ids)
Expand All @@ -1731,7 +1731,7 @@ impl DdlController {

// 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
// graph that contains all information needed for building the actor graph.
let original_table_fragment = old_table_fragments
let original_table_fragment = old_fragments
.mview_fragment()
.expect("mview fragment not found");

Expand Down Expand Up @@ -1826,15 +1826,15 @@ impl DdlController {
graph,
&building_locations.actor_locations,
stream_ctx,
old_table_fragments.assigned_parallelism,
old_table_fragments.max_parallelism,
old_fragments.assigned_parallelism,
old_fragments.max_parallelism,
);

// Note: no need to set `vnode_count` as it's already set by the frontend.
// See `get_replace_table_plan`.

let ctx = ReplaceTableContext {
old_fragments: old_table_fragments,
old_fragments,
merge_updates,
dispatchers,
building_locations,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
///
/// Note: for better readability, keep this struct complete and immutable once created.
pub struct ReplaceTableContext {
/// The old table fragments to be replaced.
/// The old job fragments to be replaced.
pub old_fragments: StreamJobFragments,

/// The updates to be applied to the downstream chain actors. Used for schema change.
Expand Down Expand Up @@ -461,7 +461,7 @@ impl GlobalStreamManager {
&self,
stream_job_fragments: StreamJobFragments,
ReplaceTableContext {
old_fragments: old_table_fragments,
old_fragments,
merge_updates,
dispatchers,
tmp_id,
Expand All @@ -476,7 +476,7 @@ impl GlobalStreamManager {
.run_config_change_command_with_pause(
streaming_job.database_id().into(),
Command::ReplaceTable(ReplaceTablePlan {
old_fragments: old_table_fragments,
old_fragments,
new_fragments: stream_job_fragments,
merge_updates,
dispatchers,
Expand Down

0 comments on commit 27fd6a2

Please sign in to comment.