From 3b7036ca19638206d94b986163170462a65a9fb0 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 14 Nov 2023 15:20:49 +0800 Subject: [PATCH] feat(cdc): persist the backfill state for table-on-source (#13276) --- ci/scripts/e2e-source-test.sh | 10 +- e2e_test/source/cdc/cdc.check_new_rows.slt | 18 ++ e2e_test/source/cdc/cdc.share_stream.slt | 9 +- .../source/SourceValidateHandler.java | 33 +-- .../source/common/DbzConnectorConfig.java | 50 +++-- proto/plan_common.proto | 2 + src/common/src/catalog/external_table.rs | 6 +- src/compute/tests/cdc_tests.rs | 3 +- src/connector/src/parser/mod.rs | 14 +- src/connector/src/source/external.rs | 6 +- src/frontend/src/handler/create_table.rs | 4 +- .../plan_node/stream_cdc_table_scan.rs | 93 +++----- src/meta/service/src/ddl_service.rs | 119 ++++++----- src/meta/src/manager/catalog/mod.rs | 5 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 139 +++++++----- src/stream/src/executor/backfill/cdc/mod.rs | 2 +- src/stream/src/executor/backfill/cdc/state.rs | 202 +++++++++++++++--- src/stream/src/executor/dispatch.rs | 2 +- .../src/from_proto/source/trad_source.rs | 3 +- src/stream/src/from_proto/stream_scan.rs | 17 +- 20 files changed, 463 insertions(+), 274 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 2f9c29d76b957..81cb5988926cf 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -71,10 +71,18 @@ export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt' -# kill cluster and the connector node +# kill cluster cargo make kill echo "cluster killed " +# insert into mytest database (cdc.share_stream.slt) +mysql --protocol=tcp -u root mytest -e "INSERT INTO products + VALUES (default,'RisingWave','Next generation Streaming Database'), + (default,'Materialize','The Streaming Database You Already Know How to Use'); + UPDATE products SET name = 'RW' WHERE id <= 103; + INSERT INTO orders VALUES (default, '2022-12-01 15:08:22', 'Sam', 1000.52, 110, false);" + + # insert new rows mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 2acf7813feebf..58362225660ea 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -47,3 +47,21 @@ select v1, v2, v3 from mytable order by v1; 2 2 yes 3 3 no 4 4 no + +# shared cdc source +query I +SELECT * from products_test_cnt +---- +11 + +query I +SELECT * from orders_test_cnt +---- +4 + +query ITT +SELECT * FROM products_test order by id limit 3 +---- +101 RW Small 2-wheel scooter +102 RW 12V car battery +103 RW 12-pack of drill bits with sizes ranging from #40 to #3 diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 459cde90580bc..5a6342a8cf35c 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -7,6 +7,10 @@ mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE system ok mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql +# generate data to mysql +system ok +mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql + # enable cdc backfill in ci statement ok set cdc_backfill='true'; @@ -47,11 +51,6 @@ create materialized view products_test_cnt as select count(*) as cnt from produc statement ok create materialized view orders_test_cnt as select count(*) as cnt from orders_test; - -# generate data to mysql -system ok -mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql - sleep 5s # check ingestion results diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 72b361e04bc0c..4c4cc092fa16d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -62,8 +62,8 @@ public static ConnectorServiceProto.ValidateSourceResponse validateResponse(Stri .build(); } - public static void ensurePropNotNull(Map props, String name) { - if (!props.containsKey(name)) { + public static void ensurePropNotBlank(Map props, String name) { + if (StringUtils.isBlank(props.get(name))) { throw ValidatorUtils.invalidArgument( String.format("'%s' not found, please check the WITH properties", name)); } @@ -73,39 +73,39 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re throws Exception { var props = request.getPropertiesMap(); - ensurePropNotNull(props, DbzConnectorConfig.HOST); - ensurePropNotNull(props, DbzConnectorConfig.PORT); - ensurePropNotNull(props, DbzConnectorConfig.DB_NAME); - ensurePropNotNull(props, DbzConnectorConfig.USER); - ensurePropNotNull(props, DbzConnectorConfig.PASSWORD); + ensurePropNotBlank(props, DbzConnectorConfig.HOST); + ensurePropNotBlank(props, DbzConnectorConfig.PORT); + ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.USER); + ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD); // ensure table name is passed by user in single mode if (Utils.getCdcSourceMode(props) == CdcSourceMode.SINGLE_MODE) { - ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); } TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { case POSTGRES: - ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); - ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); - ensurePropNotNull(props, DbzConnectorConfig.PG_SLOT_NAME); - ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_NAME); - ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_CREATE); + ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE); try (var validator = new PostgresValidator(props, tableSchema)) { validator.validateAll(); } break; case CITUS: - ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); - ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); + ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); try (var coordinatorValidator = new CitusValidator(props, tableSchema)) { coordinatorValidator.validateDistributedTable(); coordinatorValidator.validateTable(); } - ensurePropNotNull(props, DbzConnectorConfig.DB_SERVERS); + ensurePropNotBlank(props, DbzConnectorConfig.DB_SERVERS); var workerServers = StringUtils.split(props.get(DbzConnectorConfig.DB_SERVERS), ','); // props extracted from grpc request, clone it to modify @@ -126,6 +126,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re break; case MYSQL: + ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID); try (var validator = new MySqlValidator(props, tableSchema)) { validator.validateAll(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index a77da9b6d2ed0..1cf07e14eca4a 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -63,6 +63,7 @@ public class DbzConnectorConfig { private static final String DBZ_PROPERTY_PREFIX = "debezium."; + private static final String SNAPSHOT_MODE_KEY = "debezium.snapshot.mode"; private static final String SNAPSHOT_MODE_BACKFILL = "rw_cdc_backfill"; private static Map extractDebeziumProperties( @@ -103,31 +104,48 @@ public DbzConnectorConfig( String startOffset, Map userProps, boolean snapshotDone) { + + StringSubstitutor substitutor = new StringSubstitutor(userProps); + var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor); + var isCdcBackfill = + null != userProps.get(SNAPSHOT_MODE_KEY) + && userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL); + LOG.info( - "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}", + "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}", source, sourceId, startOffset, - snapshotDone); + snapshotDone, + isCdcBackfill); - StringSubstitutor substitutor = new StringSubstitutor(userProps); - var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor); if (source == SourceTypeE.MYSQL) { var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor); - // if snapshot phase is finished and offset is specified, we will continue binlog - // reading from the given offset - if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - // 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is - // specified. It only snapshots the schemas, not the data, and continue binlog - // reading from the specified offset - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); - mysqlProps.setProperty( - ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); - } else if (mysqlProps.getProperty("snapshot.mode").equals(SNAPSHOT_MODE_BACKFILL)) { - // only snapshot table schemas which is not required by the source parser - mysqlProps.setProperty("snapshot.mode", "schema_only"); + if (isCdcBackfill) { // disable snapshot locking at all mysqlProps.setProperty("snapshot.locking.mode", "none"); + + // If cdc backfill enabled, the source only emit incremental changes, so we must + // rewind to the given offset and continue binlog reading from there + if (null != startOffset && !startOffset.isBlank()) { + mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } else { + // read upstream table schemas and emit incremental changes only + mysqlProps.setProperty("snapshot.mode", "schema_only"); + } + } else { + // if snapshot phase is finished and offset is specified, we will continue binlog + // reading from the given offset + if (snapshotDone && null != startOffset && !startOffset.isBlank()) { + // 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is + // specified. It only snapshots the schemas, not the data, and continue binlog + // reading from the specified offset + mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } } dbzProps.putAll(mysqlProps); diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 9f43d9755edfe..4f1eccfab8666 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -84,6 +84,8 @@ message ExternalTableDesc { string table_name = 4; repeated uint32 stream_key = 5; map connect_properties = 6; + // upstream cdc source job id + uint32 source_id = 7; } enum JoinType { diff --git a/src/common/src/catalog/external_table.rs b/src/common/src/catalog/external_table.rs index 28d43a7e97a0a..b006bbd50d362 100644 --- a/src/common/src/catalog/external_table.rs +++ b/src/common/src/catalog/external_table.rs @@ -23,9 +23,12 @@ use crate::util::sort_util::ColumnOrder; /// Compute node will use this information to connect to the external database and scan the table. #[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] pub struct CdcTableDesc { - /// Id of the upstream source in sharing cdc mode + /// Id of the table in RW pub table_id: TableId, + /// Id of the upstream source in sharing cdc mode + pub source_id: TableId, + /// The full name of the table in external database, e.g. `database_name.table.name` in MySQL /// and `schema_name.table_name` in the Postgres. pub external_table_name: String, @@ -58,6 +61,7 @@ impl CdcTableDesc { pub fn to_protobuf(&self) -> ExternalTableDesc { ExternalTableDesc { table_id: self.table_id.into(), + source_id: self.source_id.into(), columns: self.columns.iter().map(Into::into).collect(), pk: self.pk.iter().map(|v| v.to_protobuf()).collect(), table_name: self.external_table_name.clone(), diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index a6e515a1ea571..b692b27973736 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -216,7 +216,8 @@ async fn test_cdc_backfill() -> StreamResult<()> { vec![0, 1, 2], None, Arc::new(StreamingMetrics::unused()), - source_state_handler, + None, + Some(source_state_handler), false, 4, // 4 rows in a snapshot chunk ); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index cdb3ef7f51620..53e843b5cfdbb 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -543,14 +543,12 @@ async fn into_chunk_stream(mut parser: P, data_stream for (i, msg) in batch.into_iter().enumerate() { if msg.key.is_none() && msg.payload.is_none() { - if parser.parser_format() == ParserFormat::Debezium { - tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message"); - // empty payload means a heartbeat in cdc source - // heartbeat message offset should not overwrite data messages offset - split_offset_mapping - .entry(msg.split_id) - .or_insert(msg.offset.clone()); - } + tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message"); + // assumes an empty message as a heartbeat + // heartbeat message offset should not overwrite data messages offset + split_offset_mapping + .entry(msg.split_id) + .or_insert(msg.offset.clone()); continue; } diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index c70f41b32b581..2fa17a0add927 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -121,7 +121,7 @@ impl SchemaTableName { } } -#[derive(Debug, Clone, Default, PartialEq, PartialOrd)] +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)] pub struct MySqlOffset { pub filename: String, pub position: u64, @@ -133,14 +133,14 @@ impl MySqlOffset { } } -#[derive(Debug, Clone, Default, PartialEq, PartialOrd)] +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)] pub struct PostgresOffset { pub txid: u64, pub lsn: u64, pub tx_usec: u64, } -#[derive(Debug, Clone, PartialEq, PartialOrd)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)] pub enum CdcOffset { MySql(MySqlOffset), Postgres(PostgresOffset), diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 5654bc5ed4ed0..b81bc00394bb4 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -562,6 +562,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let cdc_table_desc = CdcTableDesc { table_id: TableId::placeholder(), + source_id: TableId::placeholder(), external_table_name: "".to_string(), pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), @@ -847,7 +848,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( derive_connect_properties(source.as_ref(), external_table_name.clone())?; let cdc_table_desc = CdcTableDesc { - table_id: source.id.into(), // source can be considered as an external table + table_id: TableId::placeholder(), // will be filled in meta node + source_id: source.id.into(), // id of cdc source streaming job external_table_name: external_table_name.clone(), pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 7b68e9d80c93e..4731227bc54c7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -15,7 +15,6 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, Field}; -use risingwave_common::hash::VirtualNode; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -73,37 +72,9 @@ impl StreamCdcTableScan { StreamScanType::CdcBackfill } - /// Build catalog for backfill state - /// - /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` | - /// - /// key: | vnode | - /// value: | pk ... | `backfill_finished` | `row_count` | - /// - /// When we update the backfill progress, - /// we update it for all vnodes. - /// - /// `pk` refers to the upstream pk which we use to track the backfill progress. - /// - /// `vnode` is the corresponding vnode of the upstream's distribution key. - /// It should also match the vnode of the backfill executor. - /// - /// `backfill_finished` is a boolean which just indicates if backfill is done. - /// - /// `row_count` is a count of rows which indicates the # of rows per executor. - /// We used to track this in memory. - /// But for backfill persistence we have to also persist it. - /// - /// FIXME(kwannoel): - /// - Across all vnodes, the values are the same. - /// - e.g. | vnode | pk ... | `backfill_finished` | `row_count` | - /// | 1002 | Int64(1) | t | 10 | - /// | 1003 | Int64(1) | t | 10 | - /// | 1003 | Int64(1) | t | 10 | - /// Eventually we should track progress per vnode, to support scaling with both mview and - /// the corresponding `no_shuffle_backfill`. - /// However this is not high priority, since we are working on supporting arrangement backfill, - /// which already has this capability. + /// Build catalog for cdc backfill state + /// Right now we only persist whether the backfill is finished and the corresponding cdc offset + /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` | pub fn build_backfill_state_catalog( &self, state: &mut BuildFragmentGraphState, @@ -112,9 +83,9 @@ impl StreamCdcTableScan { let mut catalog_builder = TableCatalogBuilder::new(properties); let upstream_schema = &self.core.get_table_columns(); - // We use vnode as primary key in state table. - // If `Distribution::Single`, vnode will just be `VirtualNode::default()`. - catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode")); + // Use `split_id` as primary key in state table. + // Currently we only support single split for cdc backfill. + catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id")); catalog_builder.add_order_column(0, OrderType::ascending()); // pk columns @@ -123,27 +94,17 @@ impl StreamCdcTableScan { catalog_builder.add_column(&Field::from(col)); } - // `backfill_finished` column - catalog_builder.add_column(&Field::with_name( - DataType::Boolean, - format!("{}_backfill_finished", self.table_name()), - )); + catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished")); - // `row_count` column - catalog_builder.add_column(&Field::with_name( - DataType::Int64, - format!("{}_row_count", self.table_name()), - )); + // `row_count` column, the number of rows read from snapshot + catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count")); - // Reuse the state store pk (vnode) as the vnode as well. - catalog_builder.set_vnode_col_idx(0); - catalog_builder.set_dist_key_in_pk(vec![0]); - - let num_of_columns = catalog_builder.columns().len(); - catalog_builder.set_value_indices((1..num_of_columns).collect_vec()); + // The offset is only for observability, not for recovery right now + catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset")); + // leave dist key empty, since the cdc backfill executor is singleton catalog_builder - .build(vec![0], 1) + .build(vec![], 1) .with_id(state.gen_table_id_wrapped()) } } @@ -250,20 +211,20 @@ impl StreamCdcTableScan { .build_backfill_state_catalog(state) .to_internal_table_prost(); - let node_body = - // don't need batch plan for cdc source - PbNodeBody::StreamScan(StreamScanNode { - table_id: self.core.cdc_table_desc.table_id.table_id, - stream_scan_type: self.stream_scan_type as i32, - // The column indices need to be forwarded to the downstream - output_indices, - upstream_column_ids, - // The table desc used by backfill executor - state_table: Some(catalog), - rate_limit: None, - cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), - ..Default::default() - }); + // We need to pass the id of upstream source job here + let upstream_source_id = self.core.cdc_table_desc.source_id.table_id; + let node_body = PbNodeBody::StreamScan(StreamScanNode { + table_id: upstream_source_id, + stream_scan_type: self.stream_scan_type as i32, + // The column indices need to be forwarded to the downstream + output_indices, + upstream_column_ids, + // The table desc used by backfill executor + state_table: Some(catalog), + rate_limit: None, + cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), + ..Default::default() + }); PbStreamNode { fields: self.schema().to_prost(), diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index c63f81da472c3..abc9e690ceeb9 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use anyhow::anyhow; use rand::Rng; +use risingwave_common::catalog::TableId; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_connector::sink::catalog::SinkId; @@ -457,25 +458,23 @@ impl DdlService for DdlServiceImpl { let mut mview = request.materialized_view.unwrap(); let mut fragment_graph = request.fragment_graph.unwrap(); let table_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + // If we're creating a table with connector, we should additionally fill its ID first. - if let Some(source) = &mut source { + let source_id = if source.is_some() { // Generate source id. - let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category - fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph); - - // Modify properties for cdc sources if needed - if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) { - if matches!( - CdcSourceType::from(connector.as_str()), - CdcSourceType::Mysql - ) { - fill_cdc_mysql_server_id(&mut fragment_graph); - } - } - } + self.gen_unique_id::<{ IdCategory::Table }>().await? // TODO: Use source category + } else { + TableId::placeholder().into() + }; - let mut stream_job = StreamingJob::Table(source, mview, job_type); + fill_table_stream_graph_info( + source.as_mut().map(|source| (source, source_id)), + (&mut mview, table_id), + job_type, + &mut fragment_graph, + ); + let mut stream_job = StreamingJob::Table(source, mview, job_type); stream_job.set_id(table_id); let version = self @@ -577,7 +576,12 @@ impl DdlService for DdlServiceImpl { { let source = source.as_mut().unwrap(); let table_id = table.id; - fill_table_source(source, source_id, &mut table, table_id, &mut fragment_graph); + fill_table_stream_graph_info( + Some((source, source_id)), + (&mut table, table_id), + TableJobType::General, + &mut fragment_graph, + ); } let table_col_index_mapping = ColIndexMapping::from_protobuf(&req.table_col_index_mapping.unwrap()); @@ -853,51 +857,60 @@ impl DdlServiceImpl { } } -fn fill_table_source( - source: &mut PbSource, - source_id: u32, - table: &mut PbTable, - table_id: u32, +/// Fill in necessary information for table stream graph. +fn fill_table_stream_graph_info( + mut source_info: Option<(&mut PbSource, u32)>, + table_info: (&mut PbTable, u32), + table_job_type: TableJobType, fragment_graph: &mut PbStreamFragmentGraph, ) { - // If we're creating a table with connector, we should additionally fill its ID first. - source.id = source_id; - - let mut source_count = 0; + let (table, table_id) = table_info; for fragment in fragment_graph.fragments.values_mut() { visit_fragment(fragment, |node_body| { if let NodeBody::Source(source_node) = node_body { - // TODO: Refactor using source id. - source_node.source_inner.as_mut().unwrap().source_id = source_id; - source_count += 1; - } - }); - } - assert_eq!( - source_count, 1, - "require exactly 1 external stream source when creating table with a connector" - ); + // If we're creating a table with connector, we should additionally fill its ID first. + if let Some(&mut (ref mut source, source_id)) = source_info.as_mut() { + source.id = source_id; + let mut source_count = 0; + + source_node.source_inner.as_mut().unwrap().source_id = source_id; + source_count += 1; + + // Generate a random server id for mysql cdc source if needed + // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication + // group (that is, different from any other server id being used by any master or slave) + if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) + && matches!(CdcSourceType::from(connector.as_str()),CdcSourceType::Mysql) { + let props = &mut source_node.source_inner.as_mut().unwrap().properties; + let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); + props + .entry("server.id".to_string()) + .or_insert(rand_server_id.to_string()); + + // make these two `Source` consistent + props.clone_into(&mut source.properties); + } - // Fill in the correct table id for source. - source.optional_associated_table_id = - Some(OptionalAssociatedTableId::AssociatedTableId(table_id)); + assert_eq!( + source_count, 1, + "require exactly 1 external stream source when creating table with a connector" + ); - // Fill in the correct source id for mview. - table.optional_associated_source_id = - Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); -} + // Fill in the correct table id for source. + source.optional_associated_table_id = + Some(OptionalAssociatedTableId::AssociatedTableId(table_id)); -// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication -// group (that is, different from any other server id being used by any master or slave) -fn fill_cdc_mysql_server_id(fragment_graph: &mut PbStreamFragmentGraph) { - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - let props = &mut source_node.source_inner.as_mut().unwrap().properties; - let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); - props - .entry("server.id".to_string()) - .or_insert(rand_server_id.to_string()); + // Fill in the correct source id for mview. + table.optional_associated_source_id = + Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); + } + } + + // fill table id for cdc backfill + if let NodeBody::StreamScan(node) = node_body && table_job_type == TableJobType::SharedCdcSource { + if let Some(table) = node.cdc_table_desc.as_mut() { + table.table_id = table_id; + } } }); } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 04bc743ae5348..9254702073e99 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1945,7 +1945,7 @@ impl CatalogManager { pub async fn finish_create_source_procedure( &self, mut source: Source, - internal_tables: Vec, + mut internal_tables: Vec
, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -1961,7 +1961,8 @@ impl CatalogManager { source.created_at_epoch = Some(Epoch::now().0); sources.insert(source.id, source.clone()); - for table in &internal_tables { + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); } commit_meta!(self, sources, tables)?; diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index b756d48944a03..451577e98a60b 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -34,7 +34,10 @@ use risingwave_connector::source::external::CdcOffset; use risingwave_connector::source::{SourceColumnDesc, SourceContext, SplitMetaData}; use risingwave_storage::StateStore; -use crate::executor::backfill::cdc::state::{CdcBackfillStateImpl, SingleTableState}; +use crate::common::table::state_table::StateTable; +use crate::executor::backfill::cdc::state::{ + CdcBackfillStateImpl, MultiBackfillState, SingleBackfillState, +}; use crate::executor::backfill::upstream_table::external::ExternalStorageTable; use crate::executor::backfill::upstream_table::snapshot::{ SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader, @@ -48,14 +51,17 @@ use crate::executor::{ ExecutorInfo, Message, Mutation, PkIndicesRef, SourceStateTableHandler, StreamExecutorError, StreamExecutorResult, }; -use crate::task::{ActorId, CreateMviewProgress}; +use crate::task::CreateMviewProgress; + +/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each. +const METADATA_STATE_LEN: usize = 4; pub struct CdcBackfillExecutor { actor_ctx: ActorContextRef, info: ExecutorInfo, - /// Upstream external table - upstream_table: ExternalStorageTable, + /// The external table to be backfilled + external_table: ExternalStorageTable, /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset` upstream: BoxedExecutor, @@ -64,13 +70,14 @@ pub struct CdcBackfillExecutor { /// User may select a subset of columns from the upstream table. output_indices: Vec, - actor_id: ActorId, - /// State table of the Source executor - source_state_handler: SourceStateTableHandler, + source_state_handler: Option>, shared_cdc_source: bool, + /// State table of the CdcBackfill executor + state_table: Option>, + progress: Option, metrics: Arc, @@ -83,24 +90,25 @@ impl CdcBackfillExecutor { pub fn new( actor_ctx: ActorContextRef, info: ExecutorInfo, - upstream_table: ExternalStorageTable, + external_table: ExternalStorageTable, upstream: BoxedExecutor, output_indices: Vec, progress: Option, metrics: Arc, - source_state_handler: SourceStateTableHandler, + state_table: Option>, + source_state_handler: Option>, shared_cdc_source: bool, chunk_size: usize, ) -> Self { Self { actor_ctx, info, - upstream_table, + external_table, upstream, output_indices, - actor_id: 0, source_state_handler, shared_cdc_source, + state_table, progress, metrics, chunk_size, @@ -110,13 +118,13 @@ impl CdcBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. - let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); - let pk_order = self.upstream_table.pk_order_types().to_vec(); + let pk_in_output_indices = self.external_table.pk_in_output_indices().unwrap(); + let pk_order = self.external_table.pk_order_types().to_vec(); let shared_cdc_source = self.shared_cdc_source; - let upstream_table_id = self.upstream_table.table_id().table_id; - let upstream_table_schema = self.upstream_table.schema().clone(); - let upstream_table_reader = UpstreamTableReader::new(self.upstream_table); + let upstream_table_id = self.external_table.table_id().table_id; + let upstream_table_schema = self.external_table.schema().clone(); + let upstream_table_reader = UpstreamTableReader::new(self.external_table); let mut upstream = self.upstream.execute(); @@ -133,11 +141,15 @@ impl CdcBackfillExecutor { // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. let mut state_impl = if shared_cdc_source { - CdcBackfillStateImpl::Undefined + assert!(self.state_table.is_some(), "expect state table for shared cdc source"); + CdcBackfillStateImpl::MultiTable(MultiBackfillState::new(upstream_table_id, self.state_table.unwrap(), pk_in_output_indices.len() + METADATA_STATE_LEN)) } else if let Some(mutation) = first_barrier.mutation.as_ref() && let Mutation::Add{splits, ..} = mutation.as_ref() { tracing::info!(?mutation, ?shared_cdc_source, "got first barrier"); + + assert!(self.source_state_handler.is_some(), "expect source state handler"); + // We can assume for cdc table, the parallism of the fragment must be 1 match splits.get(&self.actor_ctx.id) { None => { @@ -164,7 +176,7 @@ impl CdcBackfillExecutor { upstream_table_id )) })?; - CdcBackfillStateImpl::SingleTable(SingleTableState::new(self.source_state_handler, upstream_table_id, split.id(), split.clone())) + CdcBackfillStateImpl::SingleTable(SingleBackfillState::new(self.source_state_handler.unwrap(), upstream_table_id, split.id(), split.clone())) } } } @@ -180,21 +192,16 @@ impl CdcBackfillExecutor { upstream.peekable() }; - tracing::debug!(?upstream_table_id, ?self.actor_ctx.id, ?shared_cdc_source, "start cdc backfill"); + tracing::debug!(?upstream_table_id, ?shared_cdc_source, "start cdc backfill"); state_impl.init_epoch(first_barrier.epoch); - // start from the beginning - // TODO(siyuan): restore backfill offset from state store - let backfill_offset = None; - - current_pk_pos = backfill_offset; - - // restore backfill done flag from state store - let is_finished = state_impl.check_finished().await?; + // restore backfill state + let state = state_impl.restore_state().await?; + current_pk_pos = state.current_pk_pos.clone(); // If the snapshot is empty, we don't need to backfill. let is_snapshot_empty: bool = { - if is_finished { + if state.is_finished { // It is finished, so just assign a value to avoid accessing storage table again. false } else { @@ -209,19 +216,18 @@ impl CdcBackfillExecutor { // | t | t/f | f | // | f | t | f | // | f | f | t | - let to_backfill = !is_finished && !is_snapshot_empty; + let to_backfill = !state.is_finished && !is_snapshot_empty; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); // Keep track of rows from the snapshot. - #[allow(unused_variables)] - let mut total_snapshot_processed_rows: u64 = 0; + let mut total_snapshot_row_count: u64 = 0; let mut snapshot_read_epoch; - // Read the current binlog offset as a low watermark - let mut last_binlog_offset: Option = - upstream_table_reader.current_binlog_offset().await?; + let mut last_binlog_offset: Option = state + .last_cdc_offset + .map_or(upstream_table_reader.current_binlog_offset().await?, Some); let mut consumed_binlog_offset: Option = None; @@ -312,7 +318,7 @@ impl CdcBackfillExecutor { .backfill_snapshot_read_row_count .with_label_values(&[ upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), + self.actor_ctx.id.to_string().as_str(), ]) .inc_by(cur_barrier_snapshot_processed_rows); @@ -320,7 +326,7 @@ impl CdcBackfillExecutor { .backfill_upstream_output_row_count .with_label_values(&[ upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), + self.actor_ctx.id.to_string().as_str(), ]) .inc_by(cur_barrier_upstream_processed_rows); @@ -329,14 +335,23 @@ impl CdcBackfillExecutor { last_binlog_offset = consumed_binlog_offset.clone(); } - // seal current epoch even though there is no data + // update and persist backfill state + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + false, + ) + .await?; state_impl.commit_state(barrier.epoch).await?; + snapshot_read_epoch = barrier.epoch.prev; if let Some(progress) = self.progress.as_mut() { progress.update( barrier.epoch.curr, snapshot_read_epoch, - total_snapshot_processed_rows, + total_snapshot_row_count, ); } @@ -356,7 +371,8 @@ impl CdcBackfillExecutor { )?; tracing::trace!( - "recv changelog chunk: bin offset {:?}, capactiy {}", + target: "events::stream::cdc_backfill", + "recv changelog chunk: chunk_offset {:?}, capactiy {}", chunk_binlog_offset, chunk.capacity() ); @@ -364,16 +380,15 @@ impl CdcBackfillExecutor { // Since we don't need changelog before the // `last_binlog_offset`, skip the chunk that *only* contains // events before `last_binlog_offset`. - if let Some(last_binlog_offset) = &last_binlog_offset { - if let Some(chunk_binlog_offset) = chunk_binlog_offset { - if chunk_binlog_offset < *last_binlog_offset { - tracing::trace!( - "skip changelog chunk: offset {:?}, capacity {}", - chunk_binlog_offset, - chunk.capacity() - ); - continue; - } + if let Some(last_binlog_offset) = last_binlog_offset.as_ref() { + if let Some(chunk_offset) = chunk_binlog_offset && chunk_offset < *last_binlog_offset { + tracing::trace!( + target: "events::stream::cdc_backfill", + "skip changelog chunk: chunk_offset {:?}, capacity {}", + chunk_offset, + chunk.capacity() + ); + continue; } } // Buffer the upstream chunk. @@ -400,16 +415,20 @@ impl CdcBackfillExecutor { // in the buffer. Here we choose to never mark the chunk. // Consume with the renaming stream buffer chunk without mark. for chunk in upstream_chunk_buffer.drain(..) { - let chunk_cardinality = chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += chunk_cardinality; - total_snapshot_processed_rows += chunk_cardinality; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, )); } - state_impl.mutate_state(last_binlog_offset.clone()).await?; + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; break 'backfill_loop; } Some(chunk) => { @@ -425,7 +444,7 @@ impl CdcBackfillExecutor { ); let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; - total_snapshot_processed_rows += chunk_cardinality; + total_snapshot_row_count += chunk_cardinality; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, @@ -443,14 +462,20 @@ impl CdcBackfillExecutor { "upstream snapshot is empty, mark backfill is done and persist current binlog offset"); // The snapshot is empty, just set backfill to finished - state_impl.mutate_state(last_binlog_offset).await?; + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset, + total_snapshot_row_count, + true, + ) + .await?; } // drop reader to release db connection drop(upstream_table_reader); tracing::info!( - actor = self.actor_id, "CdcBackfill has already finished and forward messages directly to the downstream" ); @@ -465,7 +490,7 @@ impl CdcBackfillExecutor { // mark progress as finished if let Some(progress) = self.progress.as_mut() { - progress.finish(barrier.epoch.curr, total_snapshot_processed_rows); + progress.finish(barrier.epoch.curr, total_snapshot_row_count); } yield msg; // break after the state have been saved diff --git a/src/stream/src/executor/backfill/cdc/mod.rs b/src/stream/src/executor/backfill/cdc/mod.rs index 5061211f28c6a..eae920fbab7e1 100644 --- a/src/stream/src/executor/backfill/cdc/mod.rs +++ b/src/stream/src/executor/backfill/cdc/mod.rs @@ -15,4 +15,4 @@ pub mod cdc_backfill; mod state; -pub use state::BACKFILL_STATE_KEY_SUFFIX; +pub use state::{CdcStateRecord, BACKFILL_STATE_KEY_SUFFIX}; diff --git a/src/stream/src/executor/backfill/cdc/state.rs b/src/stream/src/executor/backfill/cdc/state.rs index 2d32bbd1d4353..d211924c39ec8 100644 --- a/src/stream/src/executor/backfill/cdc/state.rs +++ b/src/stream/src/executor/backfill/cdc/state.rs @@ -12,61 +12,188 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use maplit::hashmap; -use risingwave_common::row::Row; -use risingwave_common::types::{JsonbVal, ScalarRefImpl}; +use risingwave_common::row; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::{Datum, JsonbVal, ScalarImpl, ScalarRefImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset}; use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_storage::StateStore; use serde_json::Value; +use crate::common::table::state_table::StateTable; use crate::executor::{SourceStateTableHandler, StreamExecutorResult}; +/// Depending on how the table is created, we have two scenarios for CDC Backfill: +/// 1. `CREATE TABLE xx WITH ("connector"= 'mysql-cdc', "database.name"='mydb', "table.name"='t1')` +/// In this case, the cdc backfill executor will wraps the source executor, and maintain its state +/// (a finish flag) in the source state table. +/// +/// +/// 2. `CREATE TABLE xx FROM source TABLE 'mydb.t1'` +/// In this case, we can have multiple Table jobs sharing a single cdc Source job. +/// The cdc backfill executor will be an instance of the `StreamScan` operator and has its own state table +/// schema: `table_id | backfill_finished | row_count | cdc_offset` pub enum CdcBackfillStateImpl { - Undefined, - SingleTable(SingleTableState), + SingleTable(SingleBackfillState), + MultiTable(MultiBackfillState), +} + +#[derive(Debug, Default)] +pub struct CdcStateRecord { + pub current_pk_pos: Option, + pub is_finished: bool, + /// The last cdc offset that has been consumed by the cdc backfill executor + pub last_cdc_offset: Option, + pub row_count: i64, } impl CdcBackfillStateImpl { pub fn init_epoch(&mut self, epoch: EpochPair) { match self { - CdcBackfillStateImpl::Undefined => {} CdcBackfillStateImpl::SingleTable(state) => state.init_epoch(epoch), + CdcBackfillStateImpl::MultiTable(state) => state.init_epoch(epoch), } } - pub async fn check_finished(&self) -> StreamExecutorResult { + /// Restore the state of the corresponding split + pub async fn restore_state(&mut self) -> StreamExecutorResult { match self { - CdcBackfillStateImpl::Undefined => Ok(false), - CdcBackfillStateImpl::SingleTable(state) => state.check_finished().await, + CdcBackfillStateImpl::SingleTable(state) => state.restore_state().await, + CdcBackfillStateImpl::MultiTable(state) => state.restore_state().await, } } + /// Modify the state of the corresponding split (currently only supports single split) pub async fn mutate_state( &mut self, - last_binlog_offset: Option, + current_pk_pos: Option, + last_cdc_offset: Option, + row_count: u64, + is_finished: bool, ) -> StreamExecutorResult<()> { + let record = CdcStateRecord { + current_pk_pos, + last_cdc_offset, + row_count: row_count as _, + is_finished, + }; match self { - CdcBackfillStateImpl::Undefined => Ok(()), - CdcBackfillStateImpl::SingleTable(state) => { - state.mutate_state(last_binlog_offset).await - } + CdcBackfillStateImpl::SingleTable(state) => state.mutate_state(&record).await, + CdcBackfillStateImpl::MultiTable(state) => state.mutate_state(&record).await, } } + /// Persist the state to storage pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> { match self { - CdcBackfillStateImpl::Undefined => Ok(()), CdcBackfillStateImpl::SingleTable(state) => state.commit_state(new_epoch).await, + CdcBackfillStateImpl::MultiTable(state) => state.commit_state(new_epoch).await, } } } pub const BACKFILL_STATE_KEY_SUFFIX: &str = "_backfill"; -/// The state manager for single cdc table -pub struct SingleTableState { +pub struct MultiBackfillState { + /// Id of the backfilling table, will be the key of the state + split_id: String, + state_table: StateTable, + + cached_state: Vec, +} + +impl MultiBackfillState { + pub fn new(table_id: u32, state_table: StateTable, state_len: usize) -> Self { + Self { + split_id: table_id.to_string(), + state_table, + cached_state: vec![None; state_len], + } + } + + pub fn init_epoch(&mut self, epoch: EpochPair) { + self.state_table.init_epoch(epoch) + } + + /// Restore the backfill state from storage + pub async fn restore_state(&mut self) -> StreamExecutorResult { + let key = Some(self.split_id.clone()); + match self + .state_table + .get_row(row::once(key.map(ScalarImpl::from))) + .await? + { + Some(row) => { + self.cached_state = row.into_inner().into_vec(); + let state = self.cached_state.as_slice(); + let state_len = state.len(); + // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` | + let cdc_offset = match state[state_len - 1] { + Some(ScalarImpl::Jsonb(ref jsonb)) => { + serde_json::from_value(jsonb.clone().take()).unwrap() + } + _ => return Err(anyhow!("invalid backfill state: cdc_offset").into()), + }; + let row_count = match state[state_len - 2] { + Some(ScalarImpl::Int64(val)) => val, + _ => return Err(anyhow!("invalid backfill state: row_count").into()), + }; + let is_finished = match state[state_len - 3] { + Some(ScalarImpl::Bool(val)) => val, + _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()), + }; + + let current_pk_pos = state[1..state_len - 3].to_vec(); + Ok(CdcStateRecord { + current_pk_pos: Some(OwnedRow::new(current_pk_pos)), + is_finished, + last_cdc_offset: cdc_offset, + row_count, + }) + } + None => Ok(CdcStateRecord::default()), + } + } + + pub async fn mutate_state(&mut self, record: &CdcStateRecord) -> StreamExecutorResult<()> { + let Some(current_pk_pos) = &record.current_pk_pos else { + return Ok(()); + }; + + // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` | + let state = self.cached_state.as_mut_slice(); + let split_id = Some(ScalarImpl::from(self.split_id.clone())); + state[0] = split_id.clone(); + state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner()); + state[current_pk_pos.len() + 1] = Some(record.is_finished.into()); + state[current_pk_pos.len() + 2] = Some(record.row_count.into()); + state[current_pk_pos.len() + 3] = record.last_cdc_offset.clone().map(|cdc_offset| { + let json = serde_json::to_value(cdc_offset).unwrap(); + ScalarImpl::Jsonb(JsonbVal::from(json)) + }); + + match self.state_table.get_row(row::once(split_id)).await? { + Some(prev_row) => { + self.state_table + .update(prev_row, self.cached_state.as_slice()); + } + None => { + self.state_table.insert(self.cached_state.as_slice()); + } + } + Ok(()) + } + + pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> { + self.state_table.commit(new_epoch).await + } +} + +/// The state manager for backfilling a single table +pub struct SingleBackfillState { /// Stores the backfill done flag source_state_handler: SourceStateTableHandler, cdc_table_id: u32, @@ -74,9 +201,7 @@ pub struct SingleTableState { cdc_split: SplitImpl, } -impl SingleTableState {} - -impl SingleTableState { +impl SingleBackfillState { pub fn new( source_state_handler: SourceStateTableHandler, cdc_table_id: u32, @@ -95,37 +220,46 @@ impl SingleTableState { self.source_state_handler.init_epoch(epoch) } - pub async fn check_finished(&self) -> StreamExecutorResult { + pub async fn restore_state(&self) -> StreamExecutorResult { let mut key = self.split_id.to_string(); key.push_str(BACKFILL_STATE_KEY_SUFFIX); - match self.source_state_handler.get(key.into()).await? { + let is_finished = match self.source_state_handler.get(key.into()).await? { Some(row) => match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => Ok(jsonb_ref.as_bool()?), + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => jsonb_ref.as_bool()?, _ => unreachable!("invalid backfill persistent state"), }, - None => Ok(false), - } + None => false, + }; + Ok(CdcStateRecord { + is_finished, + ..Default::default() + }) } - /// When snapshot read stream ends, we should persist two states: - /// 1) a backfill finish flag to denote the backfill has done - /// 2) a consumed binlog offset to denote the last binlog offset - /// which will be committed to the state store upon next barrier. - pub async fn mutate_state( - &mut self, - last_binlog_offset: Option, - ) -> StreamExecutorResult<()> { + pub async fn mutate_state(&mut self, state_item: &CdcStateRecord) -> StreamExecutorResult<()> { + // skip if unfinished for single backfill + if !state_item.is_finished { + return Ok(()); + } + + // When single backfill is finished, we should persist two states: + // 1) a finish flag to denote the backfill has done + // 2) a consumed binlog offset to denote the last binlog offset + // which will be committed to the state store upon next barrier. let mut key = self.split_id.to_string(); key.push_str(BACKFILL_STATE_KEY_SUFFIX); // write backfill finished flag self.source_state_handler - .set(key.into(), JsonbVal::from(Value::Bool(true))) + .set( + key.into(), + JsonbVal::from(Value::Bool(state_item.is_finished)), + ) .await?; if let SplitImpl::MysqlCdc(split) = &mut self.cdc_split && let Some(state) = split.mysql_split.as_mut() { let start_offset = - last_binlog_offset.as_ref().map(|cdc_offset| { + state_item.last_cdc_offset.as_ref().map(|cdc_offset| { let source_offset = if let CdcOffset::MySql(o) = cdc_offset { diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index b6de1ac0daa4f..21b8ce35ed9ef 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -896,7 +896,7 @@ impl Dispatcher for CdcTableNameDispatcher { for vis_map in &mut vis_maps { let should_emit = if let Some(row) = row && let Some(full_table_name) = self.downstream_table_name.as_ref() { let table_name_datum = row.datum_at(self.table_name_col_index).unwrap(); - tracing::trace!(target: "events::stream::dispatch::hash::cdc", "keys: {:?}, table: {}", self.table_name_col_index, full_table_name); + tracing::trace!(target: "events::stream::dispatch::cdc", "keys: {:?}, table: {}", self.table_name_col_index, full_table_name); // dispatch based on downstream table name table_name_datum == ScalarRefImpl::Utf8(full_table_name) } else { diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 995a83b8f38fd..e78f3cd79c88e 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -183,7 +183,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { (0..table_schema.len()).collect_vec(), None, params.executor_stats, - source_state_handler, + None, + Some(source_state_handler), false, source_ctrl_opts.chunk_size, ); diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 0cb9b64bafbf5..5b6136fda9ea1 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -29,7 +29,7 @@ use crate::common::table::state_table::StateTable; use crate::executor::external::ExternalStorageTable; use crate::executor::{ BackfillExecutor, CdcBackfillExecutor, ChainExecutor, FlowControlExecutor, - RearrangedChainExecutor, SourceStateTableHandler, + RearrangedChainExecutor, }; pub struct StreamScanExecutorBuilder; @@ -102,11 +102,13 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { output_indices.clone(), ); - let source_state_handler = SourceStateTableHandler::from_table_catalog( - node.get_state_table().as_ref().unwrap(), - state_store.clone(), - ) - .await; + let vnodes = params.vnode_bitmap.map(Arc::new); + // cdc backfill should be singleton, so vnodes must be None. + assert_eq!(None, vnodes); + let state_table = + StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes) + .await; + CdcBackfillExecutor::new( params.actor_context.clone(), params.info, @@ -115,7 +117,8 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { output_indices, Some(progress), params.executor_stats, - source_state_handler, + Some(state_table), + None, true, params.env.config().developer.chunk_size, )