diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 6579bc2683037..43a120a19d50f 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -1,4 +1,5 @@ -- PG +DROP TABLE IF EXISTS shipments; CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt new file mode 100644 index 0000000000000..6bea5dce2fe45 --- /dev/null +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -0,0 +1,242 @@ +control substitution on + +# mysql env vars will be read from the `.risingwave/config/risedev-env` file + +system ok +mysql -e " + SET GLOBAL time_zone = '+00:00'; +" + +system ok +mysql -e " + DROP DATABASE IF EXISTS testdb1; + CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512) + ); + ALTER TABLE products AUTO_INCREMENT = 101; + INSERT INTO products + VALUES (default,'scooter','Small 2-wheel scooter'), + (default,'car battery','12V car battery'), + (default,'12-pack drill','12-pack of drill bits with sizes ranging from #40 to #3'), + (default,'hammer','12oz carpenter s hammer'), + (default,'hammer','14oz carpenter s hammer'), + (default,'hammer','16oz carpenter s hammer'), + (default,'rocks','box of assorted rocks'), + (default,'jacket','water resistent black wind breaker'), + (default,'spare tire','24 inch spare tire'); + CREATE TABLE orders ( + order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_date DATETIME NOT NULL, + customer_name VARCHAR(255) NOT NULL, + price DECIMAL(10, 5) NOT NULL, + product_id INTEGER NOT NULL, + order_status BOOLEAN NOT NULL + ) AUTO_INCREMENT = 10001; + INSERT INTO orders + VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), + (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), + (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false); +" + +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST}', + port = '${MYSQL_TCP_PORT}', + username = 'root', + password = '${MYSQL_PWD}', + database.name = 'testdb1', + server.id = '5185' +); + +statement ok +create table my_products ( id INT, + name STRING, + description STRING, + PRIMARY KEY (id) +) from mysql_source table 'testdb1.products'; + +statement ok +create table my_orders ( + order_id int, + order_date timestamp, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) from mysql_source table 'testdb1.orders'; + +system ok +psql -c " + DROP TABLE IF EXISTS shipments1; + CREATE TABLE shipments1 ( + shipment_id SERIAL NOT NULL PRIMARY KEY, + order_id SERIAL NOT NULL, + origin VARCHAR(255) NOT NULL, + destination VARCHAR(255) NOT NULL, + is_arrived BOOLEAN NOT NULL + ); + ALTER SEQUENCE public.shipments1_shipment_id_seq RESTART WITH 1001; + INSERT INTO shipments1 + VALUES (default,10001,'Beijing','Shanghai',false), + (default,10002,'Hangzhou','Shanghai',false), + (default,10003,'Shanghai','Hangzhou',false); +" + +statement ok +create source pg_source with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'cdc_alter_test' +); + +statement ok +create table pg_shipments ( + shipment_id INTEGER, + order_id INTEGER, + origin STRING, + destination STRING, + is_arrived boolean, + PRIMARY KEY (shipment_id) +) from pg_source table 'public.shipments1'; + +# Create a mview join orders, products and shipments +statement ok +create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived + FROM my_orders AS o + LEFT JOIN my_products AS p ON o.product_id = p.id + LEFT JOIN pg_shipments AS s ON o.order_id = s.order_id; + + +sleep 3s + +query III +select order_id, product_id, shipment_id from enriched_orders order by order_id; +---- +10001 102 1001 +10002 105 1002 +10003 106 1003 + + +# alter mysql tables +system ok +mysql -e " + USE testdb1; + ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0; + ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255); +" + +# alter cdc tables +statement ok +ALTER TABLE my_products ADD COLUMN weight DECIMAL; + +statement ok +ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR; + +# wait alter ddl +sleep 3s + +query ITTT +SELECT id,name,description,weight FROM my_products order by id limit 3 +---- +101 scooter Small 2-wheel scooter NULL +102 car battery 12V car battery NULL +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL + + +# update mysql tables +system ok +mysql -e " + USE testdb1; + UPDATE products SET weight = 10.5 WHERE id = 101; + UPDATE products SET weight = 12.5 WHERE id = 102; + UPDATE orders SET order_comment = 'very good' WHERE order_id = 10001; +" + +sleep 3s + +query ITTT +SELECT id,name,description,weight FROM my_products order by id limit 3 +---- +101 scooter Small 2-wheel scooter 10.50 +102 car battery 12V car battery 12.50 +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL + +query ITTT +SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2 +---- +10001 2020-07-30 10:08:22 Jark 102 0 very good +10002 2020-07-30 10:11:09 Sally 105 0 NULL + + +# alter mysql tables +system ok +mysql -e " + USE testdb1; + ALTER TABLE products DROP COLUMN weight; +" + +# alter cdc table to drop column +statement ok +ALTER TABLE my_products DROP COLUMN weight; + +# wait alter ddl +sleep 3s + +query TTTT +describe my_products; +---- +id integer false NULL +name character varying false NULL +description character varying false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description my_products NULL NULL + + +# alter pg table +system ok +psql -c " + ALTER TABLE shipments1 DROP COLUMN destination; +" + +statement error unable to drop the column due to being referenced by downstream materialized views or sinks +ALTER TABLE pg_shipments DROP COLUMN destination; + +# wait alter ddl +sleep 3s + +# query mv again +query III +select order_id, product_id, shipment_id from enriched_orders order by order_id; +---- +10001 102 1001 +10002 105 1002 +10003 106 1003 + +statement ok +drop materialized view enriched_orders; + +statement ok +drop table my_orders; + +statement ok +create table orders_test (*) from mysql_source table 'testdb1.orders'; + +statement error Not supported: alter a table with empty column definitions +ALTER TABLE orders_test ADD COLUMN order_comment VARCHAR; + +statement ok +drop source mysql_source cascade; + +statement ok +drop source pg_source cascade; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 46c2a5c22ff6d..7c6a078ce9ce4 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -324,6 +324,7 @@ message ReplaceTablePlan { catalog.ColIndexMapping table_col_index_mapping = 3; // Source catalog of table's associated source catalog.Source source = 4; + TableJobType job_type = 5; } message ReplaceTablePlanRequest { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index f740e9567e4c0..5f42d1e73e5bb 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -26,7 +26,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, + PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -92,6 +92,7 @@ pub trait CatalogWriter: Send + Sync { table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, + job_type: TableJobType, ) -> Result<()>; async fn alter_source_column(&self, source: PbSource) -> Result<()>; @@ -309,10 +310,11 @@ impl CatalogWriter for CatalogWriterImpl { table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, + job_type: TableJobType, ) -> Result<()> { let version = self .meta_client - .replace_table(source, table, graph, mapping) + .replace_table(source, table, graph, mapping, job_type) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 3d9c5d71b4576..0ddeb2d1e3d37 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -52,13 +52,14 @@ pub async fn replace_table_with_definition( on_conflict, with_version_column, wildcard_idx, + cdc_table_info, .. } = definition else { panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source) = generate_stream_graph_for_table( + let (graph, table, source, job_type) = generate_stream_graph_for_table( session, table_name, original_catalog, @@ -72,6 +73,7 @@ pub async fn replace_table_with_definition( append_only, on_conflict, with_version_column, + cdc_table_info, ) .await?; @@ -92,7 +94,7 @@ pub async fn replace_table_with_definition( let catalog_writer = session.catalog_writer()?; catalog_writer - .replace_table(source, table, graph, col_index_mapping) + .replace_table(source, table, graph, col_index_mapping, job_type) .await?; Ok(()) } @@ -145,6 +147,13 @@ pub async fn handle_alter_table_column( } } + if columns.is_empty() { + Err(ErrorCode::NotSupported( + "alter a table with empty column definitions".to_string(), + "Please recreate the table with column definitions.".to_string(), + ))? + } + match operation { AlterTableOperation::AddColumn { column_def: new_column, @@ -171,7 +180,7 @@ pub async fn handle_alter_table_column( ))? } - // Add the new column to the table definition. + // Add the new column to the table definition if it is not created by `create table (*)` syntax. columns.push(new_column); } @@ -210,7 +219,7 @@ pub async fn handle_alter_table_column( } _ => unreachable!(), - } + }; replace_table_with_definition( &session, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index dd3f435885c28..1dcb4167f8c19 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -32,7 +32,7 @@ use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; use risingwave_pb::catalog::{PbSource, Table}; -use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; @@ -466,6 +466,7 @@ pub async fn handle_create_sink( table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: None, + job_type: TableJobType::General as _, }); } @@ -634,7 +635,7 @@ pub(crate) async fn reparse_table_for_sink( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source) = generate_stream_graph_for_table( + let (graph, table, source, _) = generate_stream_graph_for_table( session, table_name, table_catalog, @@ -648,6 +649,7 @@ pub(crate) async fn reparse_table_for_sink( append_only, on_conflict, with_version_column, + None, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 16d0bb8e88d7c..11d0d0ebd08dd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -724,6 +724,9 @@ fn gen_table_plan_inner( Ok((materialize.into(), table)) } +/// Generate stream plan for cdc table based on shared source. +/// In replace workflow, the `table_id` is the id of the table to be replaced +/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta #[allow(clippy::too_many_arguments)] pub(crate) fn gen_create_table_plan_for_cdc_table( handler_args: HandlerArgs, @@ -740,6 +743,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( resolved_table_name: String, database_id: DatabaseId, schema_id: SchemaId, + table_id: TableId, ) -> Result<(PlanRef, PbTable)> { let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); let session = context.session_ctx().clone(); @@ -778,8 +782,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( .collect(); let cdc_table_desc = CdcTableDesc { - table_id: TableId::placeholder(), // will be filled in meta node - source_id: source.id.into(), // id of cdc source streaming job + table_id, + source_id: source.id.into(), // id of cdc source streaming job external_table_name: external_table_name.clone(), pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), @@ -994,6 +998,7 @@ pub(super) async fn handle_create_table_plan( resolved_table_name, database_id, schema_id, + TableId::placeholder(), )?; ((plan, None, table), TableJobType::SharedCdcSource) @@ -1218,23 +1223,24 @@ pub async fn generate_stream_graph_for_table( source_schema: Option, handler_args: HandlerArgs, col_id_gen: ColumnIdGenerator, - columns: Vec, + column_defs: Vec, wildcard_idx: Option, constraints: Vec, source_watermarks: Vec, append_only: bool, on_conflict: Option, with_version_column: Option, -) -> Result<(StreamFragmentGraph, Table, Option)> { + cdc_table_info: Option, +) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let (plan, source, table) = match source_schema { - Some(source_schema) => { + let ((plan, source, table), job_type) = match (source_schema, cdc_table_info.as_ref()) { + (Some(source_schema), None) => ( gen_create_table_plan_with_source( handler_args, ExplainOptions::default(), table_name, - columns, + column_defs, wildcard_idx, constraints, source_schema, @@ -1245,14 +1251,15 @@ pub async fn generate_stream_graph_for_table( with_version_column, vec![], ) - .await? - } - None => { + .await?, + TableJobType::General, + ), + (None, None) => { let context = OptimizerContext::from_handler_args(handler_args); let (plan, table) = gen_create_table_plan( context, table_name, - columns, + column_defs, constraints, col_id_gen, source_watermarks, @@ -1260,7 +1267,53 @@ pub async fn generate_stream_graph_for_table( on_conflict, with_version_column, )?; - (plan, None, table) + ((plan, None, table), TableJobType::General) + } + (None, Some(cdc_table)) => { + let session = &handler_args.session; + let (source, resolved_table_name, database_id, schema_id) = + get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?; + + let connect_properties = derive_connect_properties( + &source.with_properties, + cdc_table.external_table_name.clone(), + )?; + + let (columns, pk_names) = derive_schema_for_cdc_table( + &column_defs, + &constraints, + connect_properties.clone(), + false, + ) + .await?; + + let (plan, table) = gen_create_table_plan_for_cdc_table( + handler_args, + ExplainOptions::default(), + source, + cdc_table.external_table_name.clone(), + columns, + pk_names, + connect_properties, + col_id_gen, + on_conflict, + with_version_column, + vec![], // empty include options + resolved_table_name, + database_id, + schema_id, + original_catalog.id(), + )?; + + ((plan, None, table), TableJobType::SharedCdcSource) + } + (Some(_), Some(_)) => { + return Err(ErrorCode::NotSupported( + "Data format and encoding format doesn't apply to table created from a CDC source" + .into(), + "Remove the FORMAT and ENCODE specification".into(), + ) + .into()) } }; @@ -1290,7 +1343,35 @@ pub async fn generate_stream_graph_for_table( ..table }; - Ok((graph, table, source)) + Ok((graph, table, source, job_type)) +} + +fn get_source_and_resolved_table_name( + session: &Arc, + cdc_table: CdcTableInfo, + table_name: ObjectName, +) -> Result<(Arc, String, DatabaseId, SchemaId)> { + let db_name = session.database(); + let (schema_name, resolved_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name)?; + let (database_id, schema_id) = + session.get_database_and_schema_id_for_create(schema_name.clone())?; + + let (source_schema, source_name) = + Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; + + let source = { + let catalog_reader = session.env().catalog_reader().read_guard(); + let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (source, _) = catalog_reader.get_source_by_name( + db_name, + SchemaPath::Name(schema_name.as_str()), + source_name.as_str(), + )?; + source.clone() + }; + + Ok((source, resolved_table_name, database_id, schema_id)) } #[cfg(test)] diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 1b491be81f5ef..a209a16c88ae3 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -13,7 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_sqlparser::ast::ObjectName; use super::RwPgResponse; @@ -89,6 +89,7 @@ pub async fn handle_drop_sink( table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: None, + job_type: TableJobType::General as _, }); } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index f2d768cc076f4..7596a5544b391 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -308,7 +308,7 @@ fn build_fragment( // memorize upstream source id for later use state .dependent_table_ids - .insert(TableId::new(node.upstream_source_id)); + .insert(node.upstream_source_id.into()); current_fragment .upstream_table_ids .push(node.upstream_source_id); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 3067e9a97b291..67bf2794f51b0 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -42,7 +42,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType, - ReplaceTablePlan, + ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -300,6 +300,7 @@ impl CatalogWriter for MockCatalogWriter { mut table: PbTable, _graph: StreamFragmentGraph, _mapping: ColIndexMapping, + _job_type: TableJobType, ) -> Result<()> { table.stream_job_status = PbStreamJobStatus::Created as _; self.catalog.write().update_table(&table); diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 2e4ba23e02d8f..2b2b1bcfeb2b3 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -82,6 +82,7 @@ impl DdlServiceImpl { } fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { + let job_type = change.get_job_type().unwrap_or_default(); let mut source = change.source; let mut fragment_graph = change.fragment_graph.unwrap(); let mut table = change.table.unwrap(); @@ -89,19 +90,14 @@ impl DdlServiceImpl { table.optional_associated_source_id { source.as_mut().unwrap().id = source_id; - fill_table_stream_graph_info( - &mut source, - &mut table, - TableJobType::General, - &mut fragment_graph, - ); + fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph); } let table_col_index_mapping = change .table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); - let stream_job = StreamingJob::Table(source, table, TableJobType::General); + let stream_job = StreamingJob::Table(source, table, job_type); ReplaceTableInfo { streaming_job: stream_job, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index cb7173943dcd5..b45787a6c3e9c 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -38,7 +38,7 @@ use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, Schema, Secret, Sink, Source, StreamJobStatus, Subscription, Table, View, }; -use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; +use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request, TableJobType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{Action, ActionWithGrantOption, Object}; use risingwave_pb::user::update_user_request::UpdateField; @@ -3552,7 +3552,7 @@ impl CatalogManager { /// This is used for `ALTER TABLE ADD/DROP COLUMN`. pub async fn start_replace_table_procedure(&self, stream_job: &StreamingJob) -> MetaResult<()> { - let StreamingJob::Table(source, table, ..) = stream_job else { + let StreamingJob::Table(source, table, job_type) = stream_job else { unreachable!("unexpected job: {stream_job:?}") }; let core = &mut *self.core.lock().await; @@ -3560,7 +3560,10 @@ impl CatalogManager { database_core.ensure_database_id(table.database_id)?; database_core.ensure_schema_id(table.schema_id)?; - assert!(table.dependent_relations.is_empty()); + // general table streaming job should not have dependent relations + if matches!(job_type, TableJobType::General) { + assert!(table.dependent_relations.is_empty()); + } let key = (table.database_id, table.schema_id, table.name.clone()); let original_table = database_core @@ -3715,8 +3718,6 @@ impl CatalogManager { let database_core = &mut core.database; let key = (table.database_id, table.schema_id, table.name.clone()); - assert!(table.dependent_relations.is_empty()); - assert!( database_core.tables.contains_key(&table.id) && database_core.has_in_progress_creation(&key), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6eda99a91d58d..6bcc59fee5538 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -71,9 +71,9 @@ use tracing::Instrument; use crate::barrier::BarrierManagerRef; use crate::error::MetaErrorInner; use crate::manager::{ - CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, - IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1, - NotificationVersion, RelationIdEnum, SchemaId, SecretId, SinkId, SourceId, + CatalogManagerRef, ConnectionId, DatabaseId, DdlType, FragmentManagerRef, FunctionId, + IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, + MetadataManagerV1, NotificationVersion, RelationIdEnum, SchemaId, SecretId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, }; @@ -1921,6 +1921,14 @@ impl DdlController { .mview_fragment() .expect("mview fragment not found"); + let ddl_type = DdlType::from(stream_job); + let DdlType::Table(table_job_type) = &ddl_type else { + bail!( + "only support replacing table streaming job, ddl_type: {:?}", + ddl_type + ) + }; + // Map the column indices in the dispatchers with the given mapping. let downstream_fragments = self.metadata_manager.get_downstream_chain_fragments(id).await? .into_iter() @@ -1938,12 +1946,33 @@ impl DdlController { ) })?; - let complete_graph = CompleteStreamFragmentGraph::with_downstreams( - fragment_graph, - original_table_fragment.fragment_id, - downstream_fragments, - stream_job.into(), - )?; + // build complete graph based on the table job type + let complete_graph = match table_job_type { + TableJobType::General => CompleteStreamFragmentGraph::with_downstreams( + fragment_graph, + original_table_fragment.fragment_id, + downstream_fragments, + ddl_type, + )?, + + TableJobType::SharedCdcSource => { + // get the upstream fragment which should be the cdc source + let upstream_root_fragments = self + .metadata_manager + .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) + .await?; + CompleteStreamFragmentGraph::with_upstreams_and_downstreams( + fragment_graph, + upstream_root_fragments, + original_table_fragment.fragment_id, + downstream_fragments, + ddl_type, + )? + } + TableJobType::Unspecified => { + unreachable!() + } + }; // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; @@ -1963,7 +1992,11 @@ impl DdlController { } = actor_graph_builder .generate_graph(&self.env, stream_job, expr_context) .await?; - assert!(dispatchers.is_empty()); + + // general table job type does not have upstream job, so the dispatchers should be empty + if matches!(table_job_type, TableJobType::General) { + assert!(dispatchers.is_empty()); + } // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e347dd0287f36..47bf446f1555d 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -606,6 +606,27 @@ impl CompleteStreamFragmentGraph { ) } + /// For replacing an existing table based on shared cdc source + pub fn with_upstreams_and_downstreams( + graph: StreamFragmentGraph, + upstream_root_fragments: HashMap, + original_table_fragment_id: FragmentId, + downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + ddl_type: DdlType, + ) -> MetaResult { + Self::build_helper( + graph, + Some(FragmentGraphUpstreamContext { + upstream_root_fragments, + }), + Some(FragmentGraphDownstreamContext { + original_table_fragment_id, + downstream_fragments, + }), + ddl_type, + ) + } + /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments. fn build_helper( mut graph: StreamFragmentGraph, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 9b41158399237..cd295abc9ba39 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -527,6 +527,7 @@ impl MetaClient { table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, + job_type: PbTableJobType, ) -> Result { let request = ReplaceTablePlanRequest { plan: Some(ReplaceTablePlan { @@ -534,6 +535,7 @@ impl MetaClient { table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), + job_type: job_type as _, }), }; let resp = self.inner.replace_table_plan(request).await?; diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 4e564130accbb..59686f4bb8fdd 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -42,6 +42,7 @@ use crate::executor::backfill::utils::{ use crate::executor::backfill::CdcScanOptions; use crate::executor::monitor::CdcBackfillMetrics; use crate::executor::prelude::*; +use crate::executor::UpdateMutation; use crate::task::CreateMviewProgress; /// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each. @@ -135,7 +136,7 @@ impl CdcBackfillExecutor { let pk_indices = self.external_table.pk_indices().to_vec(); let pk_order = self.external_table.pk_order_types().to_vec(); - let upstream_table_id = self.external_table.table_id().table_id; + let table_id = self.external_table.table_id().table_id; let upstream_table_name = self.external_table.qualified_table_name(); let schema_table_name = self.external_table.schema_table_name().clone(); let external_database_name = self.external_table.database_name().to_owned(); @@ -157,7 +158,7 @@ impl CdcBackfillExecutor { // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; - let mut paused = first_barrier.is_pause_on_startup(); + let mut is_snapshot_paused = first_barrier.is_pause_on_startup(); // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. @@ -192,11 +193,12 @@ impl CdcBackfillExecutor { let mut consumed_binlog_offset: Option = None; tracing::info!( - upstream_table_id, + table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, is_finished = state.is_finished, + is_snapshot_paused, snapshot_row_count = total_snapshot_row_count, rate_limit = self.rate_limit_rps, disable_backfill = self.options.disable_backfill, @@ -234,6 +236,27 @@ impl CdcBackfillExecutor { for msg in upstream.by_ref() { match msg? { Message::Barrier(barrier) => { + match barrier.mutation.as_deref() { + Some(crate::executor::Mutation::Pause) => { + is_snapshot_paused = true; + tracing::info!( + table_id, + upstream_table_name, + "snapshot is paused by barrier" + ); + } + Some(crate::executor::Mutation::Resume) => { + is_snapshot_paused = false; + tracing::info!( + table_id, + upstream_table_name, + "snapshot is resumed by barrier" + ); + } + _ => { + // ignore other mutations + } + } // commit state just to bump the epoch of state table state_impl.commit_state(barrier.epoch).await?; yield Message::Barrier(barrier); @@ -248,10 +271,11 @@ impl CdcBackfillExecutor { } } - tracing::info!(upstream_table_id, + tracing::info!(table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, + is_snapshot_paused, "start cdc backfill loop"); // the buffer will be drained when a barrier comes @@ -274,9 +298,9 @@ impl CdcBackfillExecutor { .snapshot_read_full_table(read_args, self.options.snapshot_batch_size) .map(Either::Right)); - let (right_snapshot, valve) = pausable(right_snapshot); - if paused { - valve.pause(); + let (right_snapshot, snapshot_valve) = pausable(right_snapshot); + if is_snapshot_paused { + snapshot_valve.pause(); } // Prefer to select upstream, so we can stop snapshot stream when barrier comes. @@ -306,12 +330,12 @@ impl CdcBackfillExecutor { use crate::executor::Mutation; match mutation { Mutation::Pause => { - paused = true; - valve.pause(); + is_snapshot_paused = true; + snapshot_valve.pause(); } Mutation::Resume => { - paused = false; - valve.resume(); + is_snapshot_paused = false; + snapshot_valve.resume(); } Mutation::Throttle(some) => { if let Some(new_rate_limit) = @@ -323,6 +347,21 @@ impl CdcBackfillExecutor { continue 'backfill_loop; } } + Mutation::Update(UpdateMutation { + dropped_actors, + .. + }) => { + if dropped_actors.contains(&self.actor_ctx.id) { + // the actor has been dropped, exit the backfill loop + tracing::info!( + table_id, + upstream_table_name, + "CdcBackfill has been dropped due to config change" + ); + yield Message::Barrier(barrier); + break 'backfill_loop; + } + } _ => (), } } @@ -339,7 +378,7 @@ impl CdcBackfillExecutor { // staging the barrier pending_barrier = Some(barrier); tracing::debug!( - upstream_table_id, + table_id, ?current_pk_pos, ?snapshot_read_row_cnt, "Prepare to start a new snapshot" @@ -406,7 +445,7 @@ impl CdcBackfillExecutor { match msg? { None => { tracing::info!( - upstream_table_id, + table_id, ?last_binlog_offset, ?current_pk_pos, "snapshot read stream ends" @@ -457,14 +496,20 @@ impl CdcBackfillExecutor { // Otherwise, the result set of the new snapshot stream may become empty. // It maybe a cancellation bug of the mysql driver. let (_, mut snapshot_stream) = backfill_stream.into_inner(); - if let Some(msg) = snapshot_stream.next().await { + + if !is_snapshot_paused + && let Some(msg) = snapshot_stream + .next() + .instrument_await("consume_snapshot_stream_once") + .await + { let Either::Right(msg) = msg else { bail!("BUG: snapshot_read contains upstream messages"); }; match msg? { None => { tracing::info!( - upstream_table_id, + table_id, ?last_binlog_offset, ?current_pk_pos, "snapshot read stream ends in the force emit branch" @@ -501,7 +546,7 @@ impl CdcBackfillExecutor { snapshot_read_row_cnt += row_count as usize; tracing::debug!( - upstream_table_id, + table_id, ?current_pk_pos, ?snapshot_read_row_cnt, "force emit a snapshot chunk" @@ -567,7 +612,7 @@ impl CdcBackfillExecutor { } else if self.options.disable_backfill { // If backfill is disabled, we just mark the backfill as finished tracing::info!( - upstream_table_id, + table_id, upstream_table_name, "CdcBackfill has been disabled" ); @@ -585,7 +630,7 @@ impl CdcBackfillExecutor { drop(upstream_table_reader); tracing::info!( - upstream_table_id, + table_id, upstream_table_name, "CdcBackfill has already finished and will forward messages directly to the downstream" );