From c5b9a8393cdf392d055ab2262be3f05a084df302 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 30 Jan 2024 10:45:43 +0800 Subject: [PATCH 1/2] fix(optimizer): fix temporal join shuffle (#14848) (#14849) Co-authored-by: Dylan --- .../tests/testdata/output/temporal_join.yaml | 13 +++++++------ .../src/optimizer/plan_node/logical_join.rs | 14 +++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index f49a82be2dd78..ea844cda185b1 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -107,12 +107,13 @@ StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } └─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } - ├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } - │ ├─StreamExchange { dist: HashShard(stream.k) } - │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } - │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } - │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } - │ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } + ├─StreamExchange { dist: HashShard(stream.k) } + │ └─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ ├─StreamExchange { dist: HashShard(stream.k) } + │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } + │ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) } └─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) } - name: multi-way temporal join with different keys diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 5365eb3642b79..a76fc953ee929 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1049,9 +1049,8 @@ impl LogicalJoin { let lookup_prefix_len = reorder_idx.len(); let predicate = predicate.reorder(&reorder_idx); - let left = if dist_key_in_order_key_pos.is_empty() { - self.left() - .to_stream_with_dist_required(&RequiredDist::single(), ctx)? + let required_dist = if dist_key_in_order_key_pos.is_empty() { + RequiredDist::single() } else { let left_eq_indexes = predicate.left_eq_indexes(); let left_dist_key = dist_key_in_order_key_pos @@ -1059,12 +1058,13 @@ impl LogicalJoin { .map(|pos| left_eq_indexes[*pos]) .collect_vec(); - self.left().to_stream_with_dist_required( - &RequiredDist::shard_by_key(self.left().schema().len(), &left_dist_key), - ctx, - )? + RequiredDist::hash_shard(&left_dist_key) }; + let left = self.left().to_stream(ctx)?; + // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. + let left = required_dist.enforce(left, &Order::any()); + if !left.append_only() { return Err(RwError::from(ErrorCode::NotSupported( "Temporal join requires an append-only left input".into(), From 85691043fa0b0b85d350ca465f70d307e51461b7 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 1 Feb 2024 06:11:07 +0000 Subject: [PATCH 2/2] feat(cdc): support disable cdc backfill and only consumes from latest changelog (#14718) (#14911) Co-authored-by: StrikeW --- e2e_test/source/cdc/cdc.check_new_rows.slt | 4 ++ e2e_test/source/cdc/cdc.share_stream.slt | 23 ++++++++++ e2e_test/source/cdc/mysql_init_data.sql | 1 - proto/stream_plan.proto | 5 ++- src/compute/tests/cdc_tests.rs | 1 + .../cdc/external/mock_external_table.rs | 14 +++---- src/connector/src/source/cdc/external/mod.rs | 40 +++++++++++------- .../src/source/cdc/external/postgres.rs | 18 ++++---- src/connector/src/source/cdc/mod.rs | 2 + src/frontend/src/handler/create_table.rs | 12 ++++++ .../optimizer/plan_node/generic/cdc_scan.rs | 4 ++ .../optimizer/plan_node/logical_cdc_scan.rs | 3 ++ .../plan_node/stream_cdc_table_scan.rs | 1 + .../src/executor/backfill/cdc/cdc_backfill.rs | 42 ++++++++++++++----- src/stream/src/executor/backfill/utils.rs | 18 ++++---- src/stream/src/from_proto/stream_cdc_scan.rs | 2 + 16 files changed, 135 insertions(+), 55 deletions(-) diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index af62d591d6feb..5a97a21ba973a 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3 102 RW 12V car battery 103 RW 12-pack of drill bits with sizes ranging from #40 to #3 +query TTTT +select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id; +---- +2022-12-01 15:08:22 Sam 110 0 \ No newline at end of file diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 9e4c1bac6a139..d4b50ed4db6d6 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t system ok mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');" +system ok +mysql --protocol=tcp -u root mytest -e "FLUSH LOGS" + +# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above) +statement ok +create table orders_no_backfill ( + order_id int, + order_date timestamp, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) with ( + snapshot = 'false' +) from mysql_mytest table 'mytest.orders'; + sleep 5s +# table without backfill should not contain historical records +query I +select count(*) from orders_no_backfill +---- +0 + # check ingestion results query I SELECT * from products_test_cnt diff --git a/e2e_test/source/cdc/mysql_init_data.sql b/e2e_test/source/cdc/mysql_init_data.sql index e954b74aaf500..d01165088eed1 100644 --- a/e2e_test/source/cdc/mysql_init_data.sql +++ b/e2e_test/source/cdc/mysql_init_data.sql @@ -1,4 +1,3 @@ --- USE `my@db`; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index f3bc311bf1b45..070a2655fcf23 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -553,7 +553,7 @@ message StreamCdcScanNode { // Strips the primary key columns if they're unnecessary. repeated uint32 output_indices = 3; - /// The state table used by Backfill operator for persisting internal state + // The state table used by CdcBackfill operator for persisting internal state catalog.Table state_table = 4; // The external table that will be backfilled for CDC. @@ -561,6 +561,9 @@ message StreamCdcScanNode { // The rate limit for the stream cdc scan node. optional uint32 rate_limit = 6; + + // Whether skip the backfill and only consume from upstream. + bool disable_backfill = 7; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index a2bab33a6c92f..0e21611f9afa2 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { Arc::new(StreamingMetrics::unused()), state_table, 4, // 4 rows in a snapshot chunk + false, ); // Create a `MaterializeExecutor` to write the changes to storage. diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index cf3eda6d92104..39f1da1fe5ff7 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl; use crate::error::ConnectorError; use crate::source::cdc::external::{ - CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset, + SchemaTableName, }; #[derive(Debug)] @@ -38,6 +39,10 @@ impl MockExternalTableReader { } } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default()))) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner(&self) { let snap_idx = self @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader { } } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - // same as mysql offset - Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, _table_name: SchemaTableName, diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index d520522440840..1d0c0e3974404 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -208,13 +208,13 @@ impl MySqlOffset { } } +pub type CdcOffsetParseFunc = Box ConnectorResult + Send>; + pub trait ExternalTableReader { fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String; async fn current_cdc_offset(&self) -> ConnectorResult; - fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult; - fn snapshot_read( &self, table_name: SchemaTableName, @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader { })) } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -328,6 +322,14 @@ impl MySqlExternalTableReader { }) } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) + }) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl { } } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - match self { - ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset), - ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset), - ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset), - } - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl { } impl ExternalTableReaderImpl { + pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc { + match self { + ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(), + ExternalTableReaderImpl::Postgres(_) => { + PostgresExternalTableReader::get_cdc_offset_parser() + } + ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(), + } + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -623,6 +627,10 @@ mod tests { let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); + let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; + let parser = MySqlExternalTableReader::get_cdc_offset_parser(); + println!("parsed offset: {:?}", parser(off0_str).unwrap()); + let table_name = SchemaTableName { schema_name: "mytest".to_string(), table_name: "t1".to_string(), diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index f940e34157532..036e62abfe129 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -30,8 +30,8 @@ use tokio_postgres::NoTls; use crate::error::ConnectorError; use crate::parser::postgres_row_to_owned_row; use crate::source::cdc::external::{ - CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader, - SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig, + ExternalTableReader, SchemaTableName, }; #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader { Ok(CdcOffset::Postgres(pg_offset)) } - fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { - Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset( - offset, - )?)) - } - fn snapshot_read( &self, table_name: SchemaTableName, @@ -165,6 +159,14 @@ impl PostgresExternalTableReader { }) } + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset( + offset, + )?)) + }) + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b3a2bc6554c60..ae9490bca3c56 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode"; pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill"; pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable"; +// User can set snapshot='false' to disable cdc backfill +pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot"; pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 8a4981c2d3c29..6cc8ed520fc25 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; +use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; @@ -33,6 +34,7 @@ use risingwave_connector::source; use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; +use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -818,10 +820,20 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( tracing::debug!(?cdc_table_desc, "create cdc table"); + // disable backfill if 'snapshot=false' + let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) { + None => false, + Some(v) => { + !(bool::from_str(v) + .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?) + } + }; + let logical_scan = LogicalCdcScan::create( external_table_name, Rc::new(cdc_table_desc), context.clone(), + disable_backfill, ); let scan_node: PlanRef = logical_scan.into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs index 22d37bba5e556..f742238de8912 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs @@ -39,6 +39,8 @@ pub struct CdcScan { #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, + + pub disable_backfill: bool, } impl CdcScan { @@ -102,12 +104,14 @@ impl CdcScan { output_col_idx: Vec, // the column index in the table cdc_table_desc: Rc, ctx: OptimizerContextRef, + disable_backfill: bool, ) -> Self { Self { table_name, output_col_idx, cdc_table_desc, ctx, + disable_backfill, } } diff --git a/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs b/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs index a6787d43b8e2d..a65d297b0681a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs @@ -60,12 +60,14 @@ impl LogicalCdcScan { table_name: String, // explain-only cdc_table_desc: Rc, ctx: OptimizerContextRef, + disable_backfill: bool, ) -> Self { generic::CdcScan::new( table_name, (0..cdc_table_desc.columns.len()).collect(), cdc_table_desc, ctx, + disable_backfill, ) .into() } @@ -94,6 +96,7 @@ impl LogicalCdcScan { output_col_idx, self.core.cdc_table_desc.clone(), self.base.ctx().clone(), + self.core.disable_backfill, ) .into() } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 54a11f1cf1b53..794132443e7cd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -252,6 +252,7 @@ impl StreamCdcTableScan { state_table: Some(catalog), cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + disable_backfill: self.core.disable_backfill, }); // plan: merge -> filter -> exchange(simple) -> stream_scan diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 4c365fa78ec8c..8e8a3cd67b95f 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -74,6 +74,8 @@ pub struct CdcBackfillExecutor { metrics: Arc, chunk_size: usize, + + disable_backfill: bool, } impl CdcBackfillExecutor { @@ -88,6 +90,7 @@ impl CdcBackfillExecutor { metrics: Arc, state_table: StateTable, chunk_size: usize, + disable_backfill: bool, ) -> Self { Self { actor_ctx, @@ -99,6 +102,7 @@ impl CdcBackfillExecutor { progress, metrics, chunk_size, + disable_backfill, } } @@ -140,7 +144,7 @@ impl CdcBackfillExecutor { let state = state_impl.restore_state().await?; current_pk_pos = state.current_pk_pos.clone(); - let to_backfill = !state.is_finished; + let to_backfill = !self.disable_backfill && !state.is_finished; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); @@ -153,6 +157,10 @@ impl CdcBackfillExecutor { .last_cdc_offset .map_or(upstream_table_reader.current_binlog_offset().await?, Some); + let offset_parse_func = upstream_table_reader + .inner() + .table_reader() + .get_cdc_offset_parser(); let mut consumed_binlog_offset: Option = None; tracing::info!( @@ -161,6 +169,7 @@ impl CdcBackfillExecutor { initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, is_finished = state.is_finished, + disable_backfill = self.disable_backfill, snapshot_row_count = total_snapshot_row_count, chunk_size = self.chunk_size, "start cdc backfill" @@ -201,10 +210,7 @@ impl CdcBackfillExecutor { break; } Message::Chunk(ref chunk) => { - last_binlog_offset = get_cdc_chunk_last_offset( - upstream_table_reader.inner().table_reader(), - chunk, - )?; + last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?; } Message::Watermark(_) => { // Ignore watermark @@ -250,12 +256,12 @@ impl CdcBackfillExecutor { // record the consumed binlog offset that will be // persisted later consumed_binlog_offset = get_cdc_chunk_last_offset( - upstream_table_reader.inner().table_reader(), + &offset_parse_func, &chunk, )?; yield Message::Chunk(mapping_chunk( mark_cdc_chunk( - upstream_table_reader.inner().table_reader(), + &offset_parse_func, chunk, current_pos, &pk_in_output_indices, @@ -318,10 +324,8 @@ impl CdcBackfillExecutor { continue; } - let chunk_binlog_offset = get_cdc_chunk_last_offset( - upstream_table_reader.inner().table_reader(), - &chunk, - )?; + let chunk_binlog_offset = + get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; tracing::trace!( target: "events::stream::cdc_backfill", @@ -411,6 +415,21 @@ impl CdcBackfillExecutor { } } } + } else if self.disable_backfill { + // If backfill is disabled, we just mark the backfill as finished + tracing::info!( + upstream_table_id, + upstream_table_name, + "CdcBackfill has been disabled" + ); + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; } // drop reader to release db connection @@ -418,6 +437,7 @@ impl CdcBackfillExecutor { tracing::info!( upstream_table_id, + upstream_table_name, "CdcBackfill has already finished and will forward messages directly to the downstream" ); diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 20038c86a1b7d..ac35e47f66588 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -34,9 +34,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::cdc::external::{ - CdcOffset, ExternalTableReader, ExternalTableReaderImpl, -}; +use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc}; use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; use risingwave_storage::StateStore; @@ -242,7 +240,7 @@ pub(crate) fn mark_chunk( } pub(crate) fn mark_cdc_chunk( - table_reader: &ExternalTableReaderImpl, + offset_parse_func: &CdcOffsetParseFunc, chunk: StreamChunk, current_pos: &OwnedRow, pk_in_output_indices: PkIndicesRef<'_>, @@ -251,7 +249,7 @@ pub(crate) fn mark_cdc_chunk( ) -> StreamExecutorResult { let chunk = chunk.compact(); mark_cdc_chunk_inner( - table_reader, + offset_parse_func, chunk, current_pos, last_cdc_offset, @@ -332,7 +330,7 @@ fn mark_chunk_inner( } fn mark_cdc_chunk_inner( - table_reader: &ExternalTableReaderImpl, + offset_parse_func: &CdcOffsetParseFunc, chunk: StreamChunk, current_pos: &OwnedRow, last_cdc_offset: Option, @@ -346,7 +344,7 @@ fn mark_cdc_chunk_inner( let offset_col_idx = data.dimension() - 1; for v in data.rows().map(|row| { let offset_datum = row.datum_at(offset_col_idx).unwrap(); - let event_offset = table_reader.parse_cdc_offset(offset_datum.into_utf8())?; + let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?; let visible = { // filter changelog events with binlog range let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset { @@ -532,13 +530,13 @@ pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) - } pub(crate) fn get_cdc_chunk_last_offset( - table_reader: &ExternalTableReaderImpl, + offset_parse_func: &CdcOffsetParseFunc, chunk: &StreamChunk, ) -> StreamExecutorResult> { let row = chunk.rows().last().unwrap().1; let offset_col = row.iter().last().unwrap(); - let output = offset_col - .map(|scalar| Ok::<_, ConnectorError>(table_reader.parse_cdc_offset(scalar.into_utf8()))?); + let output = + offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?); output.transpose().map_err(|e| e.into()) } diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 017ca1498f624..c325cc31eb026 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -45,6 +45,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .collect_vec(); let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?; + let disable_backfill = node.disable_backfill; let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); assert_eq!(output_indices, (0..table_schema.len()).collect_vec()); @@ -105,6 +106,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { params.executor_stats, state_table, backfill_chunk_size, + disable_backfill, ) .boxed();