From cdbd982d55613636672fe2f5524d819a9e992571 Mon Sep 17 00:00:00 2001
From: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
Date: Mon, 24 Jun 2024 19:00:42 +0800
Subject: [PATCH] refactor(meta): move `finish_streaming_job` into catalog
manager (#17414)
---
src/meta/src/manager/catalog/mod.rs | 61 +++++++++++++++++++++
src/meta/src/rpc/ddl_controller.rs | 83 ++---------------------------
2 files changed, 64 insertions(+), 80 deletions(-)
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index 3aeab64bef128..9ba9333cee8e1 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -1073,6 +1073,67 @@ impl CatalogManager {
Ok(())
}
+ /// `finish_stream_job` finishes a stream job and clean some states.
+ pub async fn finish_stream_job(
+ &self,
+ mut stream_job: StreamingJob,
+ internal_tables: Vec
,
+ ) -> MetaResult {
+ // 1. finish procedure.
+ let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec();
+
+ // Update the corresponding 'created_at' field.
+ stream_job.mark_created();
+
+ let version = match stream_job {
+ StreamingJob::MaterializedView(table) => {
+ creating_internal_table_ids.push(table.id);
+ self.finish_create_table_procedure(internal_tables, table)
+ .await?
+ }
+ StreamingJob::Sink(sink, target_table) => {
+ let sink_id = sink.id;
+
+ let mut version = self
+ .finish_create_sink_procedure(internal_tables, sink)
+ .await?;
+
+ if let Some((table, source)) = target_table {
+ version = self
+ .finish_replace_table_procedure(&source, &table, None, Some(sink_id), None)
+ .await?;
+ }
+
+ version
+ }
+ StreamingJob::Table(source, table, ..) => {
+ creating_internal_table_ids.push(table.id);
+ if let Some(source) = source {
+ self.finish_create_table_procedure_with_source(source, table, internal_tables)
+ .await?
+ } else {
+ self.finish_create_table_procedure(internal_tables, table)
+ .await?
+ }
+ }
+ StreamingJob::Index(index, table) => {
+ creating_internal_table_ids.push(table.id);
+ self.finish_create_index_procedure(internal_tables, index, table)
+ .await?
+ }
+ StreamingJob::Source(source) => {
+ self.finish_create_source_procedure(source, internal_tables)
+ .await?
+ }
+ };
+
+ // 2. unmark creating tables.
+ self.unmark_creating_tables(&creating_internal_table_ids, false)
+ .await;
+
+ Ok(version)
+ }
+
/// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`.
pub async fn finish_create_table_procedure(
&self,
diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs
index e1c6ebb0f32a8..cc5d7dd8971ad 100644
--- a/src/meta/src/rpc/ddl_controller.rs
+++ b/src/meta/src/rpc/ddl_controller.rs
@@ -1317,8 +1317,9 @@ impl DdlController {
};
tracing::debug!(id = job_id, "finishing stream job");
- let version = self
- .finish_stream_job(mgr, stream_job, internal_tables)
+ let version = mgr
+ .catalog_manager
+ .finish_stream_job(stream_job, internal_tables)
.await?;
tracing::debug!(id = job_id, "finished stream job");
@@ -1757,84 +1758,6 @@ impl DdlController {
Ok(())
}
- /// `finish_stream_job` finishes a stream job and clean some states.
- async fn finish_stream_job(
- &self,
- mgr: &MetadataManagerV1,
- mut stream_job: StreamingJob,
- internal_tables: Vec,
- ) -> MetaResult {
- // 1. finish procedure.
- let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec();
-
- // Update the corresponding 'created_at' field.
- stream_job.mark_created();
-
- let version = match stream_job {
- StreamingJob::MaterializedView(table) => {
- creating_internal_table_ids.push(table.id);
- mgr.catalog_manager
- .finish_create_table_procedure(internal_tables, table)
- .await?
- }
- StreamingJob::Sink(sink, target_table) => {
- let sink_id = sink.id;
-
- let mut version = mgr
- .catalog_manager
- .finish_create_sink_procedure(internal_tables, sink)
- .await?;
-
- if let Some((table, source)) = target_table {
- let streaming_job =
- StreamingJob::Table(source, table, TableJobType::Unspecified);
-
- version = self
- .finish_replace_table(
- mgr.catalog_manager.clone(),
- &streaming_job,
- None,
- Some(sink_id),
- None,
- )
- .await?;
- }
-
- version
- }
- StreamingJob::Table(source, table, ..) => {
- creating_internal_table_ids.push(table.id);
- if let Some(source) = source {
- mgr.catalog_manager
- .finish_create_table_procedure_with_source(source, table, internal_tables)
- .await?
- } else {
- mgr.catalog_manager
- .finish_create_table_procedure(internal_tables, table)
- .await?
- }
- }
- StreamingJob::Index(index, table) => {
- creating_internal_table_ids.push(table.id);
- mgr.catalog_manager
- .finish_create_index_procedure(internal_tables, index, table)
- .await?
- }
- StreamingJob::Source(source) => {
- mgr.catalog_manager
- .finish_create_source_procedure(source, internal_tables)
- .await?
- }
- };
-
- // 2. unmark creating tables.
- mgr.catalog_manager
- .unmark_creating_tables(&creating_internal_table_ids, false)
- .await;
-
- Ok(version)
- }
-
async fn drop_table_inner(
&self,
source_id: Option,