From 55aa35f3c5db691023f5ef755e968d0e7e43781b Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 3 Dec 2024 13:26:44 +0800 Subject: [PATCH] feat(meta): support replace source --- proto/ddl_service.proto | 44 +++++++++----- proto/frontend_service.proto | 2 +- src/frontend/src/catalog/catalog_service.rs | 10 +-- src/frontend/src/handler/create_sink.rs | 14 +++-- src/frontend/src/handler/drop_sink.rs | 14 +++-- src/frontend/src/rpc/mod.rs | 16 +++-- src/frontend/src/test_utils.rs | 6 +- src/meta/service/src/ddl_service.rs | 67 ++++++++++++--------- src/meta/src/controller/streaming_job.rs | 30 ++++++--- src/meta/src/manager/streaming_job.rs | 4 +- src/meta/src/rpc/ddl_controller.rs | 39 ++++++------ src/rpc_client/src/meta_client.rs | 21 ++++--- 12 files changed, 164 insertions(+), 103 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 3b94b4d9f2bd9..32e94fb06b46b 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -85,7 +85,7 @@ message CreateSinkRequest { catalog.Sink sink = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; // It is used to provide a replace plan for the downstream table in `create sink into table` requests. - optional ReplaceTablePlan affected_table_change = 3; + optional ReplaceJobPlan affected_table_change = 3; // The list of object IDs that this sink depends on. repeated uint32 dependencies = 4; } @@ -98,7 +98,7 @@ message CreateSinkResponse { message DropSinkRequest { uint32 sink_id = 1; bool cascade = 2; - optional ReplaceTablePlan affected_table_change = 3; + optional ReplaceJobPlan affected_table_change = 3; } message DropSinkResponse { @@ -353,26 +353,42 @@ message DropIndexResponse { WaitVersion version = 2; } -message ReplaceTablePlan { - // The new table catalog, with the correct (old) table ID and a new version. - // If the new version does not match the subsequent version in the meta service's - // catalog, this request will be rejected. - catalog.Table table = 1; +message ReplaceJobPlan { + reserved 1; // The new materialization plan, where all schema are updated. stream_plan.StreamFragmentGraph fragment_graph = 2; // The mapping from the old columns to the new columns of the table. // If no column modifications occur (such as for sinking into table), this will be None. catalog.ColIndexMapping table_col_index_mapping = 3; - // Source catalog of table's associated source - catalog.Source source = 4; - TableJobType job_type = 5; + + reserved 4; + reserved 5; + + message ReplaceTable { + // The new table catalog, with the correct (old) table ID and a new version. + // If the new version does not match the subsequent version in the meta service's + // catalog, this request will be rejected. + catalog.Table table = 1; + // Source catalog of table's associated source + catalog.Source source = 2; + TableJobType job_type = 3; + } + + message ReplaceSource { + catalog.Source source = 1; + } + + oneof replace_job { + ReplaceTable replace_table = 6; + ReplaceSource replace_source = 7; + } } -message ReplaceTablePlanRequest { - ReplaceTablePlan plan = 1; +message ReplaceJobPlanRequest { + ReplaceJobPlan plan = 1; } -message ReplaceTablePlanResponse { +message ReplaceJobPlanResponse { common.Status status = 1; // The new global catalog version. WaitVersion version = 2; @@ -543,7 +559,7 @@ service DdlService { rpc DropIndex(DropIndexRequest) returns (DropIndexResponse); rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse); rpc DropFunction(DropFunctionRequest) returns (DropFunctionResponse); - rpc ReplaceTablePlan(ReplaceTablePlanRequest) returns (ReplaceTablePlanResponse); + rpc ReplaceJobPlan(ReplaceJobPlanRequest) returns (ReplaceJobPlanResponse); rpc GetTable(GetTableRequest) returns (GetTableResponse); rpc GetDdlProgress(GetDdlProgressRequest) returns (GetDdlProgressResponse); rpc CreateConnection(CreateConnectionRequest) returns (CreateConnectionResponse); diff --git a/proto/frontend_service.proto b/proto/frontend_service.proto index 7cdac815fde66..f97e91eb6a32b 100644 --- a/proto/frontend_service.proto +++ b/proto/frontend_service.proto @@ -15,7 +15,7 @@ message GetTableReplacePlanRequest { } message GetTableReplacePlanResponse { - ddl_service.ReplaceTablePlan replace_plan = 1; + ddl_service.ReplaceJobPlan replace_plan = 1; } service FrontendService { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index f7dcb919a6ad7..21002f0121b0b 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -27,7 +27,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, - create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, + create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; @@ -116,7 +116,7 @@ pub trait CatalogWriter: Send + Sync { &self, sink: PbSink, graph: StreamFragmentGraph, - affected_table_change: Option, + affected_table_change: Option, dependencies: HashSet, ) -> Result<()>; @@ -161,7 +161,7 @@ pub trait CatalogWriter: Send + Sync { &self, sink_id: u32, cascade: bool, - affected_table_change: Option, + affected_table_change: Option, ) -> Result<()>; async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>; @@ -329,7 +329,7 @@ impl CatalogWriter for CatalogWriterImpl { &self, sink: PbSink, graph: StreamFragmentGraph, - affected_table_change: Option, + affected_table_change: Option, dependencies: HashSet, ) -> Result<()> { let version = self @@ -425,7 +425,7 @@ impl CatalogWriter for CatalogWriterImpl { &self, sink_id: u32, cascade: bool, - affected_table_change: Option, + affected_table_change: Option, ) -> Result<()> { let version = self .meta_client diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 03e58ab4de3c3..4b96d31478c05 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -37,7 +37,7 @@ use risingwave_connector::sink::{ use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSink, PbSource, Table}; -use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; +use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableJobType}; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode}; use risingwave_pb::telemetry::TelemetryDatabaseObject; @@ -521,12 +521,16 @@ pub async fn handle_create_sink( // for new creating sink, we don't have a unique identity because the sink id is not generated yet. hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?; - target_table_replace_plan = Some(ReplaceTablePlan { - source, - table: Some(table), + target_table_replace_plan = Some(ReplaceJobPlan { + replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable( + replace_job_plan::ReplaceTable { + table: Some(table), + source, + job_type: TableJobType::General as _, + }, + )), fragment_graph: Some(graph), table_col_index_mapping: None, - job_type: TableJobType::General as _, }); } diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 8e0d015d0f632..ab98894330b69 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; +use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableJobType}; use risingwave_sqlparser::ast::ObjectName; use super::RwPgResponse; @@ -94,12 +94,16 @@ pub async fn handle_drop_sink( )?; } - affected_table_change = Some(ReplaceTablePlan { - source, - table: Some(table), + affected_table_change = Some(ReplaceJobPlan { + replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable( + replace_job_plan::ReplaceTable { + table: Some(table), + source, + job_type: TableJobType::General as _, + }, + )), fragment_graph: Some(graph), table_col_index_mapping: None, - job_type: TableJobType::General as _, }); } diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs index 2f5207ca6e928..69db7e5ae00f5 100644 --- a/src/frontend/src/rpc/mod.rs +++ b/src/frontend/src/rpc/mod.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pgwire::pg_server::{BoxedError, SessionManager}; -use risingwave_pb::ddl_service::{ReplaceTablePlan, TableSchemaChange}; +use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableSchemaChange}; use risingwave_pb::frontend_service::frontend_service_server::FrontendService; use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse}; use risingwave_rpc_client::error::ToTonicStatus; @@ -81,7 +81,7 @@ async fn get_new_table_plan( table_name: String, database_id: u32, owner: u32, -) -> Result { +) -> Result { tracing::info!("get_new_table_plan for table {}", table_name); let session_mgr = SESSION_MANAGER @@ -109,11 +109,15 @@ async fn get_new_table_plan( ) .await?; - Ok(ReplaceTablePlan { - table: Some(table), + Ok(ReplaceJobPlan { + replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable( + replace_job_plan::ReplaceTable { + table: Some(table), + source: None, // none for cdc table + job_type: job_type as _, + }, + )), fragment_graph: Some(graph), table_col_index_mapping: Some(col_index_mapping.to_protobuf()), - source: None, // none for cdc table - job_type: job_type as _, }) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 77a9bc23d5a53..24e4744450241 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -46,7 +46,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, - create_connection_request, DdlProgress, PbTableJobType, ReplaceTablePlan, TableJobType, + create_connection_request, DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -343,7 +343,7 @@ impl CatalogWriter for MockCatalogWriter { &self, sink: PbSink, graph: StreamFragmentGraph, - _affected_table_change: Option, + _affected_table_change: Option, _dependencies: HashSet, ) -> Result<()> { self.create_sink_inner(sink, graph) @@ -485,7 +485,7 @@ impl CatalogWriter for MockCatalogWriter { &self, sink_id: u32, cascade: bool, - _target_table_change: Option, + _target_table_change: Option, ) -> Result<()> { if cascade { return Err(ErrorCode::NotSupported( diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 166bb142bd7f6..11ade22502255 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use rand::seq::SliceRandom; use rand::thread_rng; +use replace_job_plan::{ReplaceSource, ReplaceTable}; use risingwave_common::catalog::ColumnCatalog; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -85,25 +86,31 @@ impl DdlServiceImpl { } fn extract_replace_table_info( - ReplaceTablePlan { - table, + ReplaceJobPlan { fragment_graph, table_col_index_mapping, - source, - job_type, - }: ReplaceTablePlan, + replace_job, + }: ReplaceJobPlan, ) -> ReplaceStreamJobInfo { - let table = table.unwrap(); let col_index_mapping = table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); ReplaceStreamJobInfo { - streaming_job: StreamingJob::Table( - source, - table, - TableJobType::try_from(job_type).unwrap(), - ), + streaming_job: match replace_job.unwrap() { + replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable { + table, + source, + job_type, + }) => StreamingJob::Table( + source, + table.unwrap(), + TableJobType::try_from(job_type).unwrap(), + ), + replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => { + StreamingJob::Source(source.unwrap()) + } + }, fragment_graph: fragment_graph.unwrap(), col_index_mapping, } @@ -303,7 +310,11 @@ impl DdlService for DdlServiceImpl { let sink = req.get_sink()?.clone(); let fragment_graph = req.get_fragment_graph()?.clone(); - let affected_table_change = req.get_affected_table_change().cloned().ok(); + let affected_table_change = req + .get_affected_table_change() + .cloned() + .ok() + .map(Self::extract_replace_table_info); let dependencies = req .get_dependencies() .iter() @@ -313,8 +324,11 @@ impl DdlService for DdlServiceImpl { let stream_job = match &affected_table_change { None => StreamingJob::Sink(sink, None), Some(change) => { - let table = change.table.clone().unwrap(); - let source = change.source.clone(); + let (source, table, _) = change + .streaming_job + .clone() + .try_as_table() + .expect("must be replace table"); StreamingJob::Sink(sink, Some((table, source))) } }; @@ -323,7 +337,7 @@ impl DdlService for DdlServiceImpl { stream_job, fragment_graph, CreateType::Foreground, - affected_table_change.map(Self::extract_replace_table_info), + affected_table_change, dependencies, ); @@ -646,20 +660,20 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(RisectlListStateTablesResponse { tables })) } - async fn replace_table_plan( + async fn replace_job_plan( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let req = request.into_inner().get_plan().cloned()?; let version = self .ddl_controller - .run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info( - req, - ))) + .run_command(DdlCommand::ReplaceStreamJob( + Self::extract_replace_table_info(req), + )) .await?; - Ok(Response::new(ReplaceTablePlanResponse { + Ok(Response::new(ReplaceJobPlanResponse { status: None, version, })) @@ -973,7 +987,7 @@ impl DdlService for DdlServiceImpl { .auto_schema_change_latency .with_guarded_label_values(&[&table.id.to_string(), &table.name]) .start_timer(); - // send a request to the frontend to get the ReplaceTablePlan + // send a request to the frontend to get the ReplaceJobPlan // will retry with exponential backoff if the request fails let resp = client .get_table_replace_plan(GetTableReplacePlanRequest { @@ -988,7 +1002,8 @@ impl DdlService for DdlServiceImpl { Ok(resp) => { let resp = resp.into_inner(); if let Some(plan) = resp.replace_plan { - plan.table.as_ref().inspect(|t| { + let plan = Self::extract_replace_table_info(plan); + plan.streaming_job.table().inspect(|t| { tracing::info!( target: "auto_schema_change", table_id = t.id, @@ -999,9 +1014,7 @@ impl DdlService for DdlServiceImpl { // start the schema change procedure let replace_res = self .ddl_controller - .run_command(DdlCommand::ReplaceTable( - Self::extract_replace_table_info(plan), - )) + .run_command(DdlCommand::ReplaceStreamJob(plan)) .await; match replace_res { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f869f3cd6c967..360776a5a5723 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -974,7 +974,7 @@ impl CatalogController { tmp_id: ObjectId, streaming_job: StreamingJob, merge_updates: Vec, - table_col_index_mapping: Option, + col_index_mapping: Option, sink_into_table_context: SinkIntoTableContext, ) -> MetaResult { let inner = self.inner.write().await; @@ -983,7 +983,7 @@ impl CatalogController { let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner( tmp_id, merge_updates, - table_col_index_mapping, + col_index_mapping, sink_into_table_context, &txn, streaming_job, @@ -1009,12 +1009,10 @@ impl CatalogController { Ok(version) } - /// TODO: make it general for other streaming jobs. - /// Currently only for replacing table. pub async fn finish_replace_streaming_job_inner( tmp_id: ObjectId, merge_updates: Vec, - table_col_index_mapping: Option, + col_index_mapping: Option, SinkIntoTableContext { creating_sink_id, dropping_sink_id, @@ -1065,7 +1063,11 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); table.update(txn).await?; } - // TODO: support other streaming jobs + StreamingJob::Source(source) => { + // Update the source catalog with the new one. + let source = source::ActiveModel::from(source); + source.update(txn).await?; + } _ => unreachable!( "invalid streaming job type: {:?}", streaming_job.job_type_str() @@ -1223,9 +1225,21 @@ impl CatalogController { )), }) } - _ => unreachable!("invalid streaming job type: {:?}", job_type), + StreamingJobType::Source => { + let (source, source_obj) = Source::find_by_id(original_job_id) + .find_also_related(Object) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Source( + ObjectModel(source, source_obj.unwrap()).into(), + )), + }) + } + _ => unreachable!("invalid streaming job type for replace: {:?}", job_type), } - if let Some(table_col_index_mapping) = table_col_index_mapping { + if let Some(table_col_index_mapping) = col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, }; diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index b91c3f5e9bd01..727a526896e8e 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; use sea_orm::entity::prelude::*; use sea_orm::{DatabaseTransaction, QuerySelect}; -use strum::EnumIs; +use strum::{EnumIs, EnumTryAs}; use super::{ get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source, @@ -35,7 +35,7 @@ use crate::{MetaError, MetaResult}; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. -#[derive(Debug, Clone, EnumIs)] +#[derive(Debug, Clone, EnumIs, EnumTryAs)] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6830c97f1e82e..033b0e135ccc6 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -145,7 +145,7 @@ pub enum DdlCommand { DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), AlterSwapRename(alter_swap_rename_request::Object), - ReplaceTable(ReplaceStreamJobInfo), + ReplaceStreamJob(ReplaceStreamJobInfo), AlterNonSharedSource(Source), AlterObjectOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), @@ -185,7 +185,7 @@ impl DdlCommand { | DdlCommand::AlterSwapRename(_) => true, DdlCommand::CreateStreamingJob(_, _, _, _, _) | DdlCommand::CreateNonSharedSource(_) - | DdlCommand::ReplaceTable(_) + | DdlCommand::ReplaceStreamJob(_) | DdlCommand::AlterNonSharedSource(_) | DdlCommand::CreateSubscription(_) => false, } @@ -330,7 +330,7 @@ impl DdlController { ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info) .await } - DdlCommand::ReplaceTable(ReplaceStreamJobInfo { + DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo { streaming_job, fragment_graph, col_index_mapping, @@ -354,7 +354,9 @@ impl DdlController { DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await, - DdlCommand::AlterNonSharedSource(source) => ctrl.alter_source(source).await, + DdlCommand::AlterNonSharedSource(source) => { + ctrl.alter_non_shared_source(source).await + } DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { ctrl.create_subscription(subscription).await @@ -464,7 +466,7 @@ impl DdlController { /// This replaces the source in the catalog. /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. - async fn alter_source(&self, source: Source) -> MetaResult { + async fn alter_non_shared_source(&self, source: Source) -> MetaResult { self.metadata_manager .catalog_controller .alter_non_shared_source(source) @@ -1299,7 +1301,7 @@ impl DdlController { &self, mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - table_col_index_mapping: Option, + col_index_mapping: Option, ) -> MetaResult { match &mut streaming_job { StreamingJob::Table(..) | StreamingJob::Source(..) => {} @@ -1352,7 +1354,7 @@ impl DdlController { ctx, &streaming_job, fragment_graph, - table_col_index_mapping.clone(), + col_index_mapping.as_ref(), tmp_id as _, ) .await?; @@ -1412,7 +1414,7 @@ impl DdlController { tmp_id, streaming_job, merge_updates, - table_col_index_mapping, + col_index_mapping, SinkIntoTableContext { creating_sink_id: None, dropping_sink_id: None, @@ -1690,14 +1692,12 @@ impl DdlController { stream_ctx: StreamContext, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, - // TODO(alter-source): check what does this mean - table_col_index_mapping: Option, + col_index_mapping: Option<&ColIndexMapping>, tmp_job_id: TableId, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { match &stream_job { - StreamingJob::Table(..) => {} - StreamingJob::Source(..) - | StreamingJob::MaterializedView(..) + StreamingJob::Table(..) | StreamingJob::Source(..) => {} + StreamingJob::MaterializedView(..) | StreamingJob::Sink(..) | StreamingJob::Index(..) => { bail_not_implemented!("schema change for {}", stream_job.job_type_str()) @@ -1730,7 +1730,7 @@ impl DdlController { // Map the column indices in the dispatchers with the given mapping. let (mut downstream_fragments, downstream_actor_location) = self.metadata_manager.get_downstream_fragments(id).await?; - if let Some(mapping) = &table_col_index_mapping { + if let Some(mapping) = &col_index_mapping { for (d, _f) in &mut downstream_fragments { *d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| { // The `rewrite` only fails if some column is dropped. @@ -1742,8 +1742,8 @@ impl DdlController { } // build complete graph based on the table job type - let complete_graph = match job_type { - StreamingJobType::Table(TableJobType::General) => { + let complete_graph = match &job_type { + StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => { CompleteStreamFragmentGraph::with_downstreams( fragment_graph, original_mview_fragment.fragment_id, @@ -1791,8 +1791,11 @@ impl DdlController { merge_updates, } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?; - // general table job type does not have upstream job, so the dispatchers should be empty - if matches!(job_type, StreamingJobType::Table(TableJobType::General)) { + // general table & source does not have upstream job, so the dispatchers should be empty + if matches!( + job_type, + StreamingJobType::Source | StreamingJobType::Table(TableJobType::General) + ) { assert!(dispatchers.is_empty()); } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index edf83355651aa..f8e45f69cd982 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,6 +27,7 @@ use either::Either; use futures::stream::BoxStream; use list_rate_limits_response::RateLimitInfo; use lru::LruCache; +use replace_job_plan::{ReplaceJob, ReplaceTable}; use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; @@ -443,7 +444,7 @@ impl MetaClient { &self, sink: PbSink, graph: StreamFragmentGraph, - affected_table_change: Option, + affected_table_change: Option, dependencies: HashSet, ) -> Result { let request = CreateSinkRequest { @@ -621,16 +622,18 @@ impl MetaClient { table_col_index_mapping: ColIndexMapping, job_type: PbTableJobType, ) -> Result { - let request = ReplaceTablePlanRequest { - plan: Some(ReplaceTablePlan { - source, - table: Some(table), + let request = ReplaceJobPlanRequest { + plan: Some(ReplaceJobPlan { fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), - job_type: job_type as _, + replace_job: Some(ReplaceJob::ReplaceTable(ReplaceTable { + source, + table: Some(table), + job_type: job_type as _, + })), }), }; - let resp = self.inner.replace_table_plan(request).await?; + let resp = self.inner.replace_job_plan(request).await?; // TODO: handle error in `resp.status` here Ok(resp .version @@ -710,7 +713,7 @@ impl MetaClient { &self, sink_id: u32, cascade: bool, - affected_table_change: Option, + affected_table_change: Option, ) -> Result { let request = DropSinkRequest { sink_id, @@ -2113,7 +2116,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse } ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse } ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse } - ,{ ddl_client, replace_table_plan, ReplaceTablePlanRequest, ReplaceTablePlanResponse } + ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse } ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse } ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse } ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }