Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support disable cdc backfill and only consumes from latest changelog #14718

Merged
merged 15 commits into from
Feb 1, 2024
4 changes: 4 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ SELECT * FROM rw.products_test order by id limit 3
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3

query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,31 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

# Should not contain records inserted before the table is created (e.g. 'Bob' inserted above)
statement ok
create table orders_no_backfill (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) with (
snapshot = 'false'
) from mysql_mytest table 'mytest.orders';

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see that we insert Sam as a record, but where? Do you think it's possible for the cdc tests to be inlined?

Seems a little confusing because we create the table here (in cdc.share_stream.slt).

But we check for the contents in a separate file: (e2e_test/source/cdc/cdc.check_new_rows.slt).

Not sure how they are synced up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see that we insert Sam as a record, but where?

In the e2e-source-test.sh which is the "main" of e2e test.

And cdc.check_new_rows.slt is called after cluster restart, which is to test cdc source can resume after recovery. I feel it is also odd to kill the cluster and restart it in the .slt.

sleep 5s

# table without backfill should not contain historical records
query I
select count(*) from orders_no_backfill
----
0

# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
1 change: 0 additions & 1 deletion e2e_test/source/cdc/mysql_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- USE `my@db`;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
Expand Down
5 changes: 4 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,17 @@ message StreamCdcScanNode {
// Strips the primary key columns if they're unnecessary.
repeated uint32 output_indices = 3;

/// The state table used by Backfill operator for persisting internal state
// The state table used by CdcBackfill operator for persisting internal state
catalog.Table state_table = 4;

// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 7;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
);

// Create a `MaterializeExecutor` to write the changes to storage.
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
};

#[derive(Debug)]
Expand All @@ -38,6 +39,10 @@ impl MockExternalTableReader {
}
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
Expand Down Expand Up @@ -102,13 +107,6 @@ impl ExternalTableReader for MockExternalTableReader {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
// same as mysql offset
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
_table_name: SchemaTableName,
Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ impl MySqlOffset {
}
}

pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;

pub trait ExternalTableReader {
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String;

async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;

fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult<CdcOffset>;

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -276,12 +276,6 @@ impl ExternalTableReader for MySqlExternalTableReader {
}))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -328,6 +322,14 @@ impl MySqlExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -502,14 +504,6 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
match self {
ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset),
ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset),
ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset),
}
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand All @@ -521,6 +515,16 @@ impl ExternalTableReader for ExternalTableReaderImpl {
}

impl ExternalTableReaderImpl {
pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
match self {
ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
ExternalTableReaderImpl::Postgres(_) => {
PostgresExternalTableReader::get_cdc_offset_parser()
}
ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
}
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -623,6 +627,10 @@ mod tests {
let offset = reader.current_cdc_offset().await.unwrap();
println!("BinlogOffset: {:?}", offset);

let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
18 changes: 10 additions & 8 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio_postgres::NoTls;
use crate::error::ConnectorError;
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,12 +105,6 @@ impl ExternalTableReader for PostgresExternalTableReader {
Ok(CdcOffset::Postgres(pg_offset))
}

fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
}

fn snapshot_read(
&self,
table_name: SchemaTableName,
Expand Down Expand Up @@ -165,6 +159,14 @@ impl PostgresExternalTableReader {
})
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |offset| {
Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
offset,
)?))
})
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -33,6 +34,7 @@ use risingwave_connector::source;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -807,10 +809,20 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct CdcScan {
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub disable_backfill: bool,
}

impl CdcScan {
Expand Down Expand Up @@ -102,12 +104,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
)
.into()
}
Expand Down Expand Up @@ -94,6 +96,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
)
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
Loading
Loading