Skip to content

Commit

Permalink
refactor(meta): move finish_streaming_job into catalog manager (#17414
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kwannoel authored Jun 24, 2024
1 parent 613fb51 commit cdbd982
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 80 deletions.
61 changes: 61 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,67 @@ impl CatalogManager {
Ok(())
}

/// `finish_stream_job` finishes a stream job and clean some states.
pub async fn finish_stream_job(
&self,
mut stream_job: StreamingJob,
internal_tables: Vec<Table>,
) -> MetaResult<u64> {
// 1. finish procedure.
let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec();

// Update the corresponding 'created_at' field.
stream_job.mark_created();

let version = match stream_job {
StreamingJob::MaterializedView(table) => {
creating_internal_table_ids.push(table.id);
self.finish_create_table_procedure(internal_tables, table)
.await?
}
StreamingJob::Sink(sink, target_table) => {
let sink_id = sink.id;

let mut version = self
.finish_create_sink_procedure(internal_tables, sink)
.await?;

if let Some((table, source)) = target_table {
version = self
.finish_replace_table_procedure(&source, &table, None, Some(sink_id), None)
.await?;
}

version
}
StreamingJob::Table(source, table, ..) => {
creating_internal_table_ids.push(table.id);
if let Some(source) = source {
self.finish_create_table_procedure_with_source(source, table, internal_tables)
.await?
} else {
self.finish_create_table_procedure(internal_tables, table)
.await?
}
}
StreamingJob::Index(index, table) => {
creating_internal_table_ids.push(table.id);
self.finish_create_index_procedure(internal_tables, index, table)
.await?
}
StreamingJob::Source(source) => {
self.finish_create_source_procedure(source, internal_tables)
.await?
}
};

// 2. unmark creating tables.
self.unmark_creating_tables(&creating_internal_table_ids, false)
.await;

Ok(version)
}

/// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`.
pub async fn finish_create_table_procedure(
&self,
Expand Down
83 changes: 3 additions & 80 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,9 @@ impl DdlController {
};

tracing::debug!(id = job_id, "finishing stream job");
let version = self
.finish_stream_job(mgr, stream_job, internal_tables)
let version = mgr
.catalog_manager
.finish_stream_job(stream_job, internal_tables)
.await?;
tracing::debug!(id = job_id, "finished stream job");

Expand Down Expand Up @@ -1757,84 +1758,6 @@ impl DdlController {
Ok(())
}

/// `finish_stream_job` finishes a stream job and clean some states.
async fn finish_stream_job(
&self,
mgr: &MetadataManagerV1,
mut stream_job: StreamingJob,
internal_tables: Vec<Table>,
) -> MetaResult<u64> {
// 1. finish procedure.
let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec();

// Update the corresponding 'created_at' field.
stream_job.mark_created();

let version = match stream_job {
StreamingJob::MaterializedView(table) => {
creating_internal_table_ids.push(table.id);
mgr.catalog_manager
.finish_create_table_procedure(internal_tables, table)
.await?
}
StreamingJob::Sink(sink, target_table) => {
let sink_id = sink.id;

let mut version = mgr
.catalog_manager
.finish_create_sink_procedure(internal_tables, sink)
.await?;

if let Some((table, source)) = target_table {
let streaming_job =
StreamingJob::Table(source, table, TableJobType::Unspecified);

version = self
.finish_replace_table(
mgr.catalog_manager.clone(),
&streaming_job,
None,
Some(sink_id),
None,
)
.await?;
}

version
}
StreamingJob::Table(source, table, ..) => {
creating_internal_table_ids.push(table.id);
if let Some(source) = source {
mgr.catalog_manager
.finish_create_table_procedure_with_source(source, table, internal_tables)
.await?
} else {
mgr.catalog_manager
.finish_create_table_procedure(internal_tables, table)
.await?
}
}
StreamingJob::Index(index, table) => {
creating_internal_table_ids.push(table.id);
mgr.catalog_manager
.finish_create_index_procedure(internal_tables, index, table)
.await?
}
StreamingJob::Source(source) => {
mgr.catalog_manager
.finish_create_source_procedure(source, internal_tables)
.await?
}
};

// 2. unmark creating tables.
mgr.catalog_manager
.unmark_creating_tables(&creating_internal_table_ids, false)
.await;

Ok(version)
}

async fn drop_table_inner(
&self,
source_id: Option<SourceId>,
Expand Down

0 comments on commit cdbd982

Please sign in to comment.