diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f869f3cd6c967..c642305286804 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1009,7 +1009,7 @@ impl CatalogController { Ok(version) } - /// TODO: make it general for other streaming jobs. + /// TODO(alter-source): make it general for other streaming jobs. /// Currently only for replacing table. pub async fn finish_replace_streaming_job_inner( tmp_id: ObjectId, @@ -1065,7 +1065,11 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); table.update(txn).await?; } - // TODO: support other streaming jobs + StreamingJob::Source(source) => { + // Update the source catalog with the new one. + let source = source::ActiveModel::from(source); + source.update(txn).await?; + } _ => unreachable!( "invalid streaming job type: {:?}", streaming_job.job_type_str() @@ -1223,8 +1227,21 @@ impl CatalogController { )), }) } - _ => unreachable!("invalid streaming job type: {:?}", job_type), + StreamingJobType::Source => { + let (source, source_obj) = Source::find_by_id(original_job_id) + .find_also_related(Object) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Source( + ObjectModel(source, source_obj.unwrap()).into(), + )), + }) + } + _ => unreachable!("invalid streaming job type for replace: {:?}", job_type), } + // TODO(alter-source) what is this if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6830c97f1e82e..9527cffa5c773 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1299,6 +1299,7 @@ impl DdlController { &self, mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, + // TODO(alter-source): what is this table_col_index_mapping: Option, ) -> MetaResult { match &mut streaming_job { @@ -1695,9 +1696,8 @@ impl DdlController { tmp_job_id: TableId, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { match &stream_job { - StreamingJob::Table(..) => {} - StreamingJob::Source(..) - | StreamingJob::MaterializedView(..) + StreamingJob::Table(..) | StreamingJob::Source(..) => {} + StreamingJob::MaterializedView(..) | StreamingJob::Sink(..) | StreamingJob::Index(..) => { bail_not_implemented!("schema change for {}", stream_job.job_type_str()) @@ -1742,8 +1742,8 @@ impl DdlController { } // build complete graph based on the table job type - let complete_graph = match job_type { - StreamingJobType::Table(TableJobType::General) => { + let complete_graph = match &job_type { + StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => { CompleteStreamFragmentGraph::with_downstreams( fragment_graph, original_mview_fragment.fragment_id, @@ -1791,8 +1791,11 @@ impl DdlController { merge_updates, } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?; - // general table job type does not have upstream job, so the dispatchers should be empty - if matches!(job_type, StreamingJobType::Table(TableJobType::General)) { + // general table & source does not have upstream job, so the dispatchers should be empty + if matches!( + job_type, + StreamingJobType::Source | StreamingJobType::Table(TableJobType::General) + ) { assert!(dispatchers.is_empty()); } @@ -1810,6 +1813,7 @@ impl DdlController { // Note: no need to set `vnode_count` as it's already set by the frontend. // See `get_replace_table_plan`. + // TODO(alter-source): check what does this mean let ctx = ReplaceStreamJobContext { old_fragments,