diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties index 4d85e1b98c78..ee5c8003e658 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties @@ -19,3 +19,4 @@ time.precision.mode=connect # handle conflicts in the mview operator, thus we don't need to obey the above # instructions. So we decrease the wait time here to reclaim jvm thread faster. debezium.embedded.shutdown.pause.before.interrupt.ms=1 +offset.flush.interval.ms=60000 diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 89c052137897..67438904ace7 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -561,6 +561,16 @@ message StreamScanNode { catalog.Table arrangement_table = 10; } +// Config options for CDC backfill +message StreamCdcScanOptions { + // Whether skip the backfill and only consume from upstream. + bool disable_backfill = 1; + + uint32 snapshot_barrier_interval = 2; + + uint32 snapshot_batch_size = 3; +} + message StreamCdcScanNode { uint32 table_id = 1; @@ -581,7 +591,10 @@ message StreamCdcScanNode { optional uint32 rate_limit = 6; // Whether skip the backfill and only consume from upstream. + // keep it for backward compatibility, new stream plan will use `options.disable_backfill` bool disable_backfill = 7; + + StreamCdcScanOptions options = 8; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 6ddeae73de8b..d29f9d39c8bb 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -47,8 +47,8 @@ use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::executor::test_utils::MockSource; use risingwave_stream::executor::{ expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream, - CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable, - MaterializeExecutor, Message, Mutation, StreamExecutorError, + CdcBackfillExecutor, CdcScanOptions, Execute, Executor as StreamExecutor, ExecutorInfo, + ExternalStorageTable, MaterializeExecutor, Message, Mutation, StreamExecutorError, }; // mock upstream binlog offset starting from "1.binlog, pos=0" @@ -230,8 +230,11 @@ async fn test_cdc_backfill() -> StreamResult<()> { state_table, Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit false, - 1, - 4, + Some(CdcScanOptions { + disable_backfill: false, + snapshot_interval: 1, + snapshot_batch_size: 4, + }), ) .boxed(), ); diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index bd2b623731c7..ed197975b766 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -649,7 +649,6 @@ mod tests { let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; let parser = MySqlExternalTableReader::get_cdc_offset_parser(); println!("parsed offset: {:?}", parser(off0_str).unwrap()); - let table_name = SchemaTableName { schema_name: "mytest".to_string(), table_name: "t1".to_string(), diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index decbcffebc36..44c52e4910af 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -40,6 +40,8 @@ pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill"; pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable"; // User can set snapshot='false' to disable cdc backfill pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot"; +pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval"; +pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size"; // We enable transaction for shared cdc source by default pub const CDC_TRANSACTIONAL_KEY: &str = "transactional"; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 705a8956fff3..7f8ce183c438 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; -use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; @@ -32,7 +31,6 @@ use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; -use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; use risingwave_connector::{source, WithPropertiesExt}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; @@ -62,7 +60,7 @@ use crate::handler::create_source::{ check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::generic::SourceNodeKind; +use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; @@ -850,20 +848,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( tracing::debug!(?cdc_table_desc, "create cdc table"); - // disable backfill if 'snapshot=false' - let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) { - None => false, - Some(v) => { - !(bool::from_str(v) - .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?) - } - }; + let options = CdcScanOptions::from_with_options(context.with_options())?; let logical_scan = LogicalCdcScan::create( external_table_name, Rc::new(cdc_table_desc), context.clone(), - disable_backfill, + options, ); let scan_node: PlanRef = logical_scan.into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs index f742238de891..2d7d708291e4 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs @@ -13,19 +13,28 @@ // limitations under the License. use std::rc::Rc; +use std::str::FromStr; +use anyhow::anyhow; use educe::Educe; use fixedbitset::FixedBitSet; use pretty_xmlish::Pretty; use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_connector::source::cdc::{ + CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY, + CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY, +}; +use risingwave_pb::stream_plan::StreamCdcScanOptions; use super::GenericPlanNode; use crate::catalog::ColumnId; +use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; +use crate::WithOptions; /// [`CdcScan`] reads rows of a table from an external upstream database #[derive(Debug, Clone, Educe)] @@ -40,7 +49,59 @@ pub struct CdcScan { #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, + pub options: CdcScanOptions, +} + +#[derive(Debug, Clone, Hash, PartialEq)] +pub struct CdcScanOptions { pub disable_backfill: bool, + pub snapshot_barrier_interval: u32, + pub snapshot_batch_size: u32, +} + +impl Default for CdcScanOptions { + fn default() -> Self { + Self { + disable_backfill: false, + snapshot_barrier_interval: 1, + snapshot_batch_size: 1000, + } + } +} + +impl CdcScanOptions { + pub fn from_with_options(with_options: &WithOptions) -> Result { + // unspecified option will use default values + let mut scan_options = Self::default(); + + // disable backfill if 'snapshot=false' + if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) { + scan_options.disable_backfill = !(bool::from_str(snapshot) + .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?); + }; + + if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) { + scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval) + .map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?; + }; + + if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) { + scan_options.snapshot_batch_size = + u32::from_str(snapshot_batch_size).map_err(|_| { + anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) + })?; + }; + + Ok(scan_options) + } + + pub fn to_proto(&self) -> StreamCdcScanOptions { + StreamCdcScanOptions { + disable_backfill: self.disable_backfill, + snapshot_barrier_interval: self.snapshot_barrier_interval, + snapshot_batch_size: self.snapshot_batch_size, + } + } } impl CdcScan { @@ -104,14 +165,14 @@ impl CdcScan { output_col_idx: Vec, // the column index in the table cdc_table_desc: Rc, ctx: OptimizerContextRef, - disable_backfill: bool, + options: CdcScanOptions, ) -> Self { Self { table_name, output_col_idx, cdc_table_desc, ctx, - disable_backfill, + options, } } diff --git a/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs b/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs index 43ce91bd0dc2..5e9d6467656f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs @@ -28,6 +28,7 @@ use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::generic::CdcScanOptions; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan, ToStreamContext, @@ -60,14 +61,14 @@ impl LogicalCdcScan { table_name: String, // explain-only cdc_table_desc: Rc, ctx: OptimizerContextRef, - disable_backfill: bool, + options: CdcScanOptions, ) -> Self { generic::CdcScan::new( table_name, (0..cdc_table_desc.columns.len()).collect(), cdc_table_desc, ctx, - disable_backfill, + options, ) .into() } @@ -96,7 +97,7 @@ impl LogicalCdcScan { output_col_idx, self.core.cdc_table_desc.clone(), self.base.ctx().clone(), - self.core.disable_backfill, + self.core.options.clone(), ) .into() } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 9df1a94879b2..9fe334717145 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -247,6 +247,7 @@ impl StreamCdcTableScan { "stream cdc table scan output indices" ); + let options = self.core.options.to_proto(); let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { table_id: upstream_source_id, upstream_column_ids, @@ -255,7 +256,8 @@ impl StreamCdcTableScan { state_table: Some(catalog), cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, - disable_backfill: self.core.disable_backfill, + disable_backfill: options.disable_backfill, + options: Some(options), }); // plan: merge -> filter -> exchange(simple) -> stream_scan diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 8b32fe63320a..5da34fa7f154 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -39,6 +39,7 @@ use crate::executor::backfill::cdc::upstream_table::snapshot::{ use crate::executor::backfill::utils::{ get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk, }; +use crate::executor::backfill::CdcScanOptions; use crate::executor::prelude::*; use crate::task::CreateMviewProgress; @@ -70,12 +71,7 @@ pub struct CdcBackfillExecutor { /// Rate limit in rows/s. rate_limit_rps: Option, - disable_backfill: bool, - - // TODO: make these options configurable - snapshot_interval: u32, - - snapshot_read_limit: u32, + options: CdcScanOptions, } impl CdcBackfillExecutor { @@ -89,9 +85,8 @@ impl CdcBackfillExecutor { metrics: Arc, state_table: StateTable, rate_limit_rps: Option, - disable_backfill: bool, - snapshot_interval: u32, - snapshot_read_limit: u32, + disable_backfill: bool, // backward compatibility + scan_options: Option, ) -> Self { let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap(); let upstream_table_id = external_table.table_id().table_id; @@ -101,6 +96,11 @@ impl CdcBackfillExecutor { pk_in_output_indices.len() + METADATA_STATE_LEN, ); + let options = scan_options.unwrap_or(CdcScanOptions { + disable_backfill, + ..Default::default() + }); + Self { actor_ctx, external_table, @@ -110,9 +110,7 @@ impl CdcBackfillExecutor { progress, metrics, rate_limit_rps, - disable_backfill, - snapshot_interval, - snapshot_read_limit, + options, } } @@ -176,7 +174,7 @@ impl CdcBackfillExecutor { let state = state_impl.restore_state().await?; current_pk_pos = state.current_pk_pos.clone(); - let to_backfill = !self.disable_backfill && !state.is_finished; + let to_backfill = !self.options.disable_backfill && !state.is_finished; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); @@ -200,10 +198,12 @@ impl CdcBackfillExecutor { initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, is_finished = state.is_finished, - disable_backfill = self.disable_backfill, snapshot_row_count = total_snapshot_row_count, rate_limit = self.rate_limit_rps, - "start cdc backfill" + disable_backfill = self.options.disable_backfill, + snapshot_interval = self.options.snapshot_interval, + snapshot_batch_size = self.options.snapshot_batch_size, + "start cdc backfill", ); // CDC Backfill Algorithm: @@ -269,7 +269,7 @@ impl CdcBackfillExecutor { ); let right_snapshot = pin!(upstream_table_reader - .snapshot_read_full_table(read_args, self.snapshot_read_limit) + .snapshot_read_full_table(read_args, self.options.snapshot_batch_size) .map(Either::Right)); let (right_snapshot, valve) = pausable(right_snapshot); @@ -298,7 +298,7 @@ impl CdcBackfillExecutor { // increase the barrier count and check whether need to start a new snapshot barrier_count += 1; let can_start_new_snapshot = - barrier_count == self.snapshot_interval; + barrier_count == self.options.snapshot_interval; if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; @@ -567,7 +567,7 @@ impl CdcBackfillExecutor { state_impl.commit_state(pending_barrier.epoch).await?; yield Message::Barrier(pending_barrier); } - } else if self.disable_backfill { + } else if self.options.disable_backfill { // If backfill is disabled, we just mark the backfill as finished tracing::info!( upstream_table_id, diff --git a/src/stream/src/executor/backfill/cdc/mod.rs b/src/stream/src/executor/backfill/cdc/mod.rs index e35facc6a543..e780e8a4d83b 100644 --- a/src/stream/src/executor/backfill/cdc/mod.rs +++ b/src/stream/src/executor/backfill/cdc/mod.rs @@ -17,4 +17,35 @@ mod state; mod upstream_table; pub use cdc_backfill::CdcBackfillExecutor; +use risingwave_pb::stream_plan::StreamCdcScanOptions; pub use upstream_table::external::ExternalStorageTable; + +#[derive(Debug, Clone)] +pub struct CdcScanOptions { + /// Whether to disable backfill + pub disable_backfill: bool, + /// Barreir interval to start a new snapshot read + pub snapshot_interval: u32, + /// Batch size for a snapshot read query + pub snapshot_batch_size: u32, +} + +impl Default for CdcScanOptions { + fn default() -> Self { + Self { + disable_backfill: false, + snapshot_interval: 1, + snapshot_batch_size: 1000, + } + } +} + +impl CdcScanOptions { + pub fn from_proto(proto: StreamCdcScanOptions) -> Self { + Self { + disable_backfill: proto.disable_backfill, + snapshot_interval: proto.snapshot_barrier_interval, + snapshot_batch_size: proto.snapshot_batch_size, + } + } +} diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 48a4b569ec09..d81e9e46f3bb 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -28,7 +28,7 @@ use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader use super::external::ExternalStorageTable; use crate::common::rate_limit::limited_chunk_size; use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; -use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; +use crate::executor::{StreamExecutorError, StreamExecutorResult}; pub trait UpstreamTableRead { fn snapshot_read_full_table( @@ -44,9 +44,7 @@ pub trait UpstreamTableRead { #[derive(Debug, Clone, Default)] pub struct SnapshotReadArgs { - pub epoch: u64, pub current_pos: Option, - pub ordered: bool, pub rate_limit_rps: Option, pub pk_in_output_indices: Vec, } @@ -58,9 +56,7 @@ impl SnapshotReadArgs { pk_in_output_indices: Vec, ) -> Self { Self { - epoch: INVALID_EPOCH, current_pos, - ordered: false, rate_limit_rps, pk_in_output_indices, } diff --git a/src/stream/src/executor/backfill/mod.rs b/src/stream/src/executor/backfill/mod.rs index 99ad412548a8..48ef5ac7fb6c 100644 --- a/src/stream/src/executor/backfill/mod.rs +++ b/src/stream/src/executor/backfill/mod.rs @@ -16,3 +16,5 @@ pub mod arrangement_backfill; pub mod cdc; pub mod no_shuffle_backfill; pub mod utils; + +pub use cdc::CdcScanOptions; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 62740e6344cf..28f40f7b3863 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -109,7 +109,7 @@ mod utils; pub use actor::{Actor, ActorContext, ActorContextRef}; use anyhow::Context; pub use backfill::arrangement_backfill::*; -pub use backfill::cdc::{CdcBackfillExecutor, ExternalStorageTable}; +pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable}; pub use backfill::no_shuffle_backfill::*; pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index ddd51d95a7ee..4223a3ecbe01 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::StreamCdcScanNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{CdcBackfillExecutor, Executor, ExternalStorageTable}; +use crate::executor::{CdcBackfillExecutor, CdcScanOptions, Executor, ExternalStorageTable}; pub struct StreamCdcScanExecutorBuilder; @@ -44,7 +44,6 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .collect_vec(); let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?; - let disable_backfill = node.disable_backfill; let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); assert_eq!(output_indices, (0..table_schema.len()).collect_vec()); @@ -88,6 +87,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { let state_table = StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await; + let scan_options = node.options.clone().map(CdcScanOptions::from_proto); let exec = CdcBackfillExecutor::new( params.actor_context.clone(), external_table, @@ -97,9 +97,8 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { params.executor_stats, state_table, node.rate_limit, - disable_backfill, - 1, - 1000, + node.disable_backfill, + scan_options, ); Ok((params.info, exec).into()) }