From 64290ea5b39399251dae26ca5fd770c3a4e858ea Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 26 Nov 2024 14:04:00 +0800 Subject: [PATCH] minor fix Signed-off-by: xxchan --- src/meta/src/controller/streaming_job.rs | 39 +++++++++++++++--------- src/meta/src/manager/streaming_job.rs | 5 +++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index b5c7633d475ea..3c8c793256db9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -71,9 +71,9 @@ use crate::controller::utils::{ get_internal_tables_by_id, rebuild_fragment_mapping_from_actors, PartialObject, }; use crate::controller::ObjectModel; -use crate::manager::{NotificationVersion, StreamingJob}; +use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType}; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; -use crate::stream::{self, SplitAssignment}; +use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; impl CatalogController { @@ -1015,10 +1015,11 @@ impl CatalogController { streaming_job: StreamingJob, ) -> MetaResult<(Vec, Vec)> { let original_job_id = streaming_job.id() as ObjectId; + let job_type = streaming_job.job_type(); match streaming_job { StreamingJob::Table(_source, table, _table_job_type) => { - // The source catalog should be remain unchanged + // The source catalog should remain unchanged let original_table_catalogs = Table::find_by_id(original_job_id) .select_only() @@ -1038,7 +1039,7 @@ impl CatalogController { .update(txn) .await?; } - + // Update the table catalog with the new one. (column catalog is also updated here) let mut table = table::ActiveModel::from(table); let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); if let Some(sink_id) = creating_sink_id { @@ -1052,10 +1053,13 @@ impl CatalogController { } table.incoming_sinks = Set(incoming_sinks.into()); - let table = table.update(txn).await?; + table.update(txn).await?; } // TODO: support other streaming jobs - _ => unreachable!("invalid streaming job type: {:?}", streaming_job.job_type()), + _ => unreachable!( + "invalid streaming job type: {:?}", + streaming_job.job_type_str() + ), } // 0. update internal tables @@ -1196,14 +1200,21 @@ impl CatalogController { // 4. update catalogs and notify. let mut relations = vec![]; - let table_obj = table - .find_related(Object) - .one(txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Table(ObjectModel(table, table_obj).into())), - }); + match job_type { + StreamingJobType::Table => { + let (table, table_obj) = Table::find_by_id(original_job_id) + .find_also_related(Object) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )), + }) + } + _ => unreachable!("invalid streaming job type: {:?}", job_type), + } if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 34fd7d0057b4b..924cdb0124a9a 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -33,6 +33,7 @@ use crate::{MetaError, MetaResult}; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. #[derive(Debug, Clone, EnumDiscriminants, EnumIs)] +#[strum_discriminants(name(StreamingJobType))] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), @@ -260,6 +261,10 @@ impl StreamingJob { } } + pub fn job_type(&self) -> StreamingJobType { + self.into() + } + pub fn job_type_str(&self) -> &'static str { match self { StreamingJob::MaterializedView(_) => "materialized view",