Skip to content

Commit

Permalink
feat(cdc): introduce with option to configure cdc snapshot (#16426)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Apr 28, 2024
1 parent 9996e75 commit b213243
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ time.precision.mode=connect
# handle conflicts in the mview operator, thus we don't need to obey the above
# instructions. So we decrease the wait time here to reclaim jvm thread faster.
debezium.embedded.shutdown.pause.before.interrupt.ms=1
offset.flush.interval.ms=60000
13 changes: 13 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,16 @@ message StreamScanNode {
catalog.Table arrangement_table = 10;
}

// Config options for CDC backfill
message StreamCdcScanOptions {
// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 1;

uint32 snapshot_barrier_interval = 2;

uint32 snapshot_batch_size = 3;
}

message StreamCdcScanNode {
uint32 table_id = 1;

Expand All @@ -581,7 +591,10 @@ message StreamCdcScanNode {
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
// keep it for backward compatibility, new stream plan will use `options.disable_backfill`
bool disable_backfill = 7;

StreamCdcScanOptions options = 8;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
11 changes: 7 additions & 4 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream,
CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, StreamExecutorError,
CdcBackfillExecutor, CdcScanOptions, Execute, Executor as StreamExecutor, ExecutorInfo,
ExternalStorageTable, MaterializeExecutor, Message, Mutation, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
Expand Down Expand Up @@ -230,8 +230,11 @@ async fn test_cdc_backfill() -> StreamResult<()> {
state_table,
Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit
false,
1,
4,
Some(CdcScanOptions {
disable_backfill: false,
snapshot_interval: 1,
snapshot_batch_size: 4,
}),
)
.boxed(),
);
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ mod tests {
let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";

Expand Down
15 changes: 3 additions & 12 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -32,7 +31,6 @@ use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::{source, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
Expand Down Expand Up @@ -62,7 +60,7 @@ use crate::handler::create_source::{
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -850,20 +848,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};
let options = CdcScanOptions::from_with_options(context.with_options())?;

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
options,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
65 changes: 63 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@
// limitations under the License.

use std::rc::Rc;
use std::str::FromStr;

use anyhow::anyhow;
use educe::Educe;
use fixedbitset::FixedBitSet;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_connector::source::cdc::{
CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY,
CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
};
use risingwave_pb::stream_plan::StreamCdcScanOptions;

use super::GenericPlanNode;
use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::WithOptions;

/// [`CdcScan`] reads rows of a table from an external upstream database
#[derive(Debug, Clone, Educe)]
Expand All @@ -40,7 +49,59 @@ pub struct CdcScan {
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub options: CdcScanOptions,
}

#[derive(Debug, Clone, Hash, PartialEq)]
pub struct CdcScanOptions {
pub disable_backfill: bool,
pub snapshot_barrier_interval: u32,
pub snapshot_batch_size: u32,
}

impl Default for CdcScanOptions {
fn default() -> Self {
Self {
disable_backfill: false,
snapshot_barrier_interval: 1,
snapshot_batch_size: 1000,
}
}
}

impl CdcScanOptions {
pub fn from_with_options(with_options: &WithOptions) -> Result<Self> {
// unspecified option will use default values
let mut scan_options = Self::default();

// disable backfill if 'snapshot=false'
if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
scan_options.disable_backfill = !(bool::from_str(snapshot)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
};

if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
};

if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
scan_options.snapshot_batch_size =
u32::from_str(snapshot_batch_size).map_err(|_| {
anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY)
})?;
};

Ok(scan_options)
}

pub fn to_proto(&self) -> StreamCdcScanOptions {
StreamCdcScanOptions {
disable_backfill: self.disable_backfill,
snapshot_barrier_interval: self.snapshot_barrier_interval,
snapshot_batch_size: self.snapshot_batch_size,
}
}
}

impl CdcScan {
Expand Down Expand Up @@ -104,14 +165,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
options,
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::CdcScanOptions;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan,
ToStreamContext,
Expand Down Expand Up @@ -60,14 +61,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
options,
)
.into()
}
Expand Down Expand Up @@ -96,7 +97,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
self.core.options.clone(),
)
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl StreamCdcTableScan {
"stream cdc table scan output indices"
);

let options = self.core.options.to_proto();
let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
table_id: upstream_source_id,
upstream_column_ids,
Expand All @@ -255,7 +256,8 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
disable_backfill: options.disable_backfill,
options: Some(options),
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
36 changes: 18 additions & 18 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::executor::backfill::cdc::upstream_table::snapshot::{
use crate::executor::backfill::utils::{
get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
};
use crate::executor::backfill::CdcScanOptions;
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

Expand Down Expand Up @@ -70,12 +71,7 @@ pub struct CdcBackfillExecutor<S: StateStore> {
/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

disable_backfill: bool,

// TODO: make these options configurable
snapshot_interval: u32,

snapshot_read_limit: u32,
options: CdcScanOptions,
}

impl<S: StateStore> CdcBackfillExecutor<S> {
Expand All @@ -89,9 +85,8 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
metrics: Arc<StreamingMetrics>,
state_table: StateTable<S>,
rate_limit_rps: Option<u32>,
disable_backfill: bool,
snapshot_interval: u32,
snapshot_read_limit: u32,
disable_backfill: bool, // backward compatibility
scan_options: Option<CdcScanOptions>,
) -> Self {
let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap();
let upstream_table_id = external_table.table_id().table_id;
Expand All @@ -101,6 +96,11 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
pk_in_output_indices.len() + METADATA_STATE_LEN,
);

let options = scan_options.unwrap_or(CdcScanOptions {
disable_backfill,
..Default::default()
});

Self {
actor_ctx,
external_table,
Expand All @@ -110,9 +110,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
progress,
metrics,
rate_limit_rps,
disable_backfill,
snapshot_interval,
snapshot_read_limit,
options,
}
}

Expand Down Expand Up @@ -176,7 +174,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let state = state_impl.restore_state().await?;
current_pk_pos = state.current_pk_pos.clone();

let to_backfill = !self.disable_backfill && !state.is_finished;
let to_backfill = !self.options.disable_backfill && !state.is_finished;

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
Expand All @@ -200,10 +198,12 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
initial_binlog_offset = ?last_binlog_offset,
?current_pk_pos,
is_finished = state.is_finished,
disable_backfill = self.disable_backfill,
snapshot_row_count = total_snapshot_row_count,
rate_limit = self.rate_limit_rps,
"start cdc backfill"
disable_backfill = self.options.disable_backfill,
snapshot_interval = self.options.snapshot_interval,
snapshot_batch_size = self.options.snapshot_batch_size,
"start cdc backfill",
);

// CDC Backfill Algorithm:
Expand Down Expand Up @@ -269,7 +269,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
);

let right_snapshot = pin!(upstream_table_reader
.snapshot_read_full_table(read_args, self.snapshot_read_limit)
.snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
.map(Either::Right));

let (right_snapshot, valve) = pausable(right_snapshot);
Expand Down Expand Up @@ -298,7 +298,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// increase the barrier count and check whether need to start a new snapshot
barrier_count += 1;
let can_start_new_snapshot =
barrier_count == self.snapshot_interval;
barrier_count == self.options.snapshot_interval;

if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
Expand Down Expand Up @@ -567,7 +567,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
state_impl.commit_state(pending_barrier.epoch).await?;
yield Message::Barrier(pending_barrier);
}
} else if self.disable_backfill {
} else if self.options.disable_backfill {
// If backfill is disabled, we just mark the backfill as finished
tracing::info!(
upstream_table_id,
Expand Down
Loading

0 comments on commit b213243

Please sign in to comment.