Skip to content

Commit

Permalink
feat(cdc): support disable cdc backfill and only consumes from latest…
Browse files Browse the repository at this point in the history
… changelog (#14718)
  • Loading branch information
StrikeW authored and StrikeW committed Feb 1, 2024
1 parent c5b9a83 commit f9c0731
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 55 deletions.
4 changes: 4 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3

query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above)
statement ok
create table orders_no_backfill (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) with (
snapshot = 'false'
) from mysql_mytest table 'mytest.orders';

sleep 5s

# table without backfill should not contain historical records
query I
select count(*) from orders_no_backfill
----
0

# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
1 change: 0 additions & 1 deletion e2e_test/source/cdc/mysql_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- USE `my@db`;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
Expand Down
5 changes: 4 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,17 @@ message StreamCdcScanNode {
// Strips the primary key columns if they're unnecessary.
repeated uint32 output_indices = 3;

/// The state table used by Backfill operator for persisting internal state
// The state table used by CdcBackfill operator for persisting internal state
catalog.Table state_table = 4;

// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 7;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
);

// Create a `MaterializeExecutor` to write the changes to storage.
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
};

#[derive(Debug)]
Expand All @@ -38,6 +39,10 @@ impl MockExternalTableReader {
}
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
Expand Down Expand Up @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
// same as mysql offset
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
_table_name: SchemaTableName,
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ impl MySqlOffset {
}
}

pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;

pub trait ExternalTableReader {
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String;

async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;

fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult<CdcOffset>;

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader {
}))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -328,6 +322,14 @@ impl MySqlExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
match self {
ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset),
ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset),
ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset),
}
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand All @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}

impl ExternalTableReaderImpl {
pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
match self {
ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
ExternalTableReaderImpl::Postgres(_) => {
PostgresExternalTableReader::get_cdc_offset_parser()
}
ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
}
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -623,6 +627,10 @@ mod tests {
let offset = reader.current_cdc_offset().await.unwrap();
println!("BinlogOffset: {:?}", offset);

let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
18 changes: 10 additions & 8 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio_postgres::NoTls;
use crate::error::ConnectorError;
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader {
Ok(CdcOffset::Postgres(pg_offset))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -165,6 +159,14 @@ impl PostgresExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -33,6 +34,7 @@ use risingwave_connector::source;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -818,10 +820,20 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct CdcScan {
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub disable_backfill: bool,
}

impl CdcScan {
Expand Down Expand Up @@ -102,12 +104,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
)
.into()
}
Expand Down Expand Up @@ -94,6 +96,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
)
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
Loading

0 comments on commit f9c0731

Please sign in to comment.