Skip to content

Commit

Permalink
refactor: rename dummy table id to tmp table id (#19509)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Nov 21, 2024
1 parent f003859 commit 075c90d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct ReplaceTablePlan {
/// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
pub streaming_job: StreamingJob,
/// The temporary dummy table fragments id of new table fragment
pub dummy_id: u32,
pub tmp_id: u32,
}

impl ReplaceTablePlan {
Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,13 @@ impl CatalogController {
Some(ReplaceTablePlan {
streaming_job,
merge_updates,
dummy_id,
tmp_id,
..
}) => {
let incoming_sink_id = job_id;

let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner(
dummy_id as ObjectId,
tmp_id as ObjectId,
merge_updates,
None,
Some(incoming_sink_id as _),
Expand Down Expand Up @@ -965,7 +965,7 @@ impl CatalogController {

pub async fn finish_replace_streaming_job(
&self,
dummy_id: ObjectId,
tmp_id: ObjectId,
streaming_job: StreamingJob,
merge_updates: Vec<PbMergeUpdate>,
table_col_index_mapping: Option<ColIndexMapping>,
Expand All @@ -977,7 +977,7 @@ impl CatalogController {
let txn = inner.db.begin().await?;

let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner(
dummy_id,
tmp_id,
merge_updates,
table_col_index_mapping,
creating_sink_id,
Expand Down Expand Up @@ -1008,7 +1008,7 @@ impl CatalogController {
}

pub async fn finish_replace_streaming_job_inner(
dummy_id: ObjectId,
tmp_id: ObjectId,
merge_updates: Vec<PbMergeUpdate>,
table_col_index_mapping: Option<ColIndexMapping>,
creating_sink_id: Option<SinkId>,
Expand Down Expand Up @@ -1066,7 +1066,7 @@ impl CatalogController {
fragment::Column::FragmentId,
fragment::Column::StateTableIds,
])
.filter(fragment::Column::JobId.eq(dummy_id))
.filter(fragment::Column::JobId.eq(tmp_id))
.into_tuple()
.all(txn)
.await?;
Expand All @@ -1091,7 +1091,7 @@ impl CatalogController {
.await?;
Fragment::update_many()
.col_expr(fragment::Column::JobId, SimpleExpr::from(job_id))
.filter(fragment::Column::JobId.eq(dummy_id))
.filter(fragment::Column::JobId.eq(tmp_id))
.exec(txn)
.await?;

Expand Down Expand Up @@ -1190,7 +1190,7 @@ impl CatalogController {
}

// 3. remove dummy object.
Object::delete_by_id(dummy_id).exec(txn).await?;
Object::delete_by_id(tmp_id).exec(txn).await?;

// 4. update catalogs and notify.
let mut relations = vec![];
Expand Down
36 changes: 15 additions & 21 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ impl DdlController {
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
pub(crate) async fn inject_replace_table_job_for_table_sink(
&self,
dummy_id: u32,
tmp_id: u32,
mgr: &MetadataManager,
stream_ctx: StreamContext,
sink: Option<&Sink>,
Expand All @@ -669,13 +669,7 @@ impl DdlController {
fragment_graph: StreamFragmentGraph,
) -> MetaResult<(ReplaceTableContext, TableFragments)> {
let (mut replace_table_ctx, mut table_fragments) = self
.build_replace_table(
stream_ctx,
streaming_job,
fragment_graph,
None,
dummy_id as _,
)
.build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _)
.await?;

let mut union_fragment_id = None;
Expand Down Expand Up @@ -1192,7 +1186,7 @@ impl DdlController {
let table = streaming_job.table().unwrap();

tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
let dummy_id = self
let tmp_id = self
.metadata_manager
.catalog_controller
.create_job_catalog_for_replace(
Expand All @@ -1206,7 +1200,7 @@ impl DdlController {

let (ctx, table_fragments) = self
.inject_replace_table_job_for_table_sink(
dummy_id,
tmp_id,
&self.metadata_manager,
stream_ctx,
None,
Expand Down Expand Up @@ -1238,7 +1232,7 @@ impl DdlController {
.metadata_manager
.catalog_controller
.finish_replace_streaming_job(
dummy_id as _,
tmp_id as _,
streaming_job,
merge_updates,
None,
Expand All @@ -1253,7 +1247,7 @@ impl DdlController {
tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
let _ = self.metadata_manager
.catalog_controller
.try_abort_replacing_streaming_job(dummy_id as _)
.try_abort_replacing_streaming_job(tmp_id as _)
.await
.inspect_err(|err| {
tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
Expand Down Expand Up @@ -1340,7 +1334,7 @@ impl DdlController {
let StreamingJob::Table(_, table, ..) = &streaming_job else {
unreachable!("unexpected job: {streaming_job:?}")
};
let dummy_id = self
let tmp_id = self
.metadata_manager
.catalog_controller
.create_job_catalog_for_replace(
Expand All @@ -1362,7 +1356,7 @@ impl DdlController {
&streaming_job,
fragment_graph,
table_col_index_mapping.clone(),
dummy_id as _,
tmp_id as _,
)
.await?;

Expand Down Expand Up @@ -1437,7 +1431,7 @@ impl DdlController {
.metadata_manager
.catalog_controller
.finish_replace_streaming_job(
dummy_id,
tmp_id,
streaming_job,
merge_updates,
table_col_index_mapping,
Expand All @@ -1452,7 +1446,7 @@ impl DdlController {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table");
let _ = self.metadata_manager
.catalog_controller
.try_abort_replacing_streaming_job(dummy_id)
.try_abort_replacing_streaming_job(tmp_id)
.await.inspect_err(|err| {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table");
});
Expand Down Expand Up @@ -1651,7 +1645,7 @@ impl DdlController {
};

let table = streaming_job.table().unwrap();
let dummy_id = self
let tmp_id = self
.metadata_manager
.catalog_controller
.create_job_catalog_for_replace(
Expand All @@ -1665,7 +1659,7 @@ impl DdlController {

let (context, table_fragments) = self
.inject_replace_table_job_for_table_sink(
dummy_id,
tmp_id,
&self.metadata_manager,
stream_ctx,
Some(s),
Expand Down Expand Up @@ -1718,7 +1712,7 @@ impl DdlController {
stream_job: &StreamingJob,
mut fragment_graph: StreamFragmentGraph,
table_col_index_mapping: Option<ColIndexMapping>,
dummy_table_id: TableId,
tmp_table_id: TableId,
) -> MetaResult<(ReplaceTableContext, TableFragments)> {
let id = stream_job.id();
let expr_context = stream_ctx.to_expr_context();
Expand Down Expand Up @@ -1828,7 +1822,7 @@ impl DdlController {
// the context that contains all information needed for building the actors on the compute
// nodes.
let table_fragments = TableFragments::new(
(dummy_table_id as u32).into(),
(tmp_table_id as u32).into(),
graph,
&building_locations.actor_locations,
stream_ctx,
Expand All @@ -1846,7 +1840,7 @@ impl DdlController {
building_locations,
existing_locations,
streaming_job: stream_job.clone(),
dummy_id: dummy_table_id as _,
tmp_id: tmp_table_id as _,
};

Ok((ctx, table_fragments))
Expand Down
23 changes: 11 additions & 12 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct ReplaceTableContext {

pub streaming_job: StreamingJob,

pub dummy_id: u32,
pub tmp_id: u32,
}

/// `GlobalStreamManager` manages all the streams in the system.
Expand Down Expand Up @@ -354,11 +354,10 @@ impl GlobalStreamManager {
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.await?;

let dummy_table_id = table_fragments.table_id();
let init_split_assignment =
self.source_manager.allocate_splits(&dummy_table_id).await?;
let tmp_table_id = table_fragments.table_id();
let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?;

replace_table_id = Some(dummy_table_id);
replace_table_id = Some(tmp_table_id);

replace_table_command = Some(ReplaceTablePlan {
old_table_fragments: context.old_table_fragments,
Expand All @@ -367,7 +366,7 @@ impl GlobalStreamManager {
dispatchers: context.dispatchers,
init_split_assignment,
streaming_job,
dummy_id: dummy_table_id.table_id,
tmp_id: tmp_table_id.table_id,
});
}

Expand Down Expand Up @@ -447,8 +446,8 @@ impl GlobalStreamManager {
if create_type == CreateType::Foreground || err.is_cancelled() {
let mut table_ids: HashSet<TableId> =
HashSet::from_iter(std::iter::once(table_id));
if let Some(dummy_table_id) = replace_table_id {
table_ids.insert(dummy_table_id);
if let Some(tmp_table_id) = replace_table_id {
table_ids.insert(tmp_table_id);
}
}

Expand All @@ -465,13 +464,13 @@ impl GlobalStreamManager {
old_table_fragments,
merge_updates,
dispatchers,
dummy_id,
tmp_id,
streaming_job,
..
}: ReplaceTableContext,
) -> MetaResult<()> {
let dummy_table_id = table_fragments.table_id();
let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?;
let tmp_table_id = table_fragments.table_id();
let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?;

self.barrier_scheduler
.run_config_change_command_with_pause(
Expand All @@ -482,7 +481,7 @@ impl GlobalStreamManager {
merge_updates,
dispatchers,
init_split_assignment,
dummy_id,
tmp_id,
streaming_job,
}),
)
Expand Down

0 comments on commit 075c90d

Please sign in to comment.