Skip to content

Commit

Permalink
feat(cdc): persist the backfill state for table-on-source (#13276)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Nov 14, 2023
1 parent 09d312c commit 3b7036c
Show file tree
Hide file tree
Showing 20 changed files with 463 additions and 274 deletions.
10 changes: 9 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'


# kill cluster and the connector node
# kill cluster
cargo make kill
echo "cluster killed "

# insert into mytest database (cdc.share_stream.slt)
mysql --protocol=tcp -u root mytest -e "INSERT INTO products
VALUES (default,'RisingWave','Next generation Streaming Database'),
(default,'Materialize','The Streaming Database You Already Know How to Use');
UPDATE products SET name = 'RW' WHERE id <= 103;
INSERT INTO orders VALUES (default, '2022-12-01 15:08:22', 'Sam', 1000.52, 110, false);"


# insert new rows
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql
psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql
Expand Down
18 changes: 18 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,21 @@ select v1, v2, v3 from mytable order by v1;
2 2 yes
3 3 no
4 4 no

# shared cdc source
query I
SELECT * from products_test_cnt
----
11

query I
SELECT * from orders_test_cnt
----
4

query ITT
SELECT * FROM products_test order by id limit 3
----
101 RW Small 2-wheel scooter
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3
9 changes: 4 additions & 5 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql

# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';
Expand Down Expand Up @@ -47,11 +51,6 @@ create materialized view products_test_cnt as select count(*) as cnt from produc
statement ok
create materialized view orders_test_cnt as select count(*) as cnt from orders_test;


# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

sleep 5s

# check ingestion results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static ConnectorServiceProto.ValidateSourceResponse validateResponse(Stri
.build();
}

public static void ensurePropNotNull(Map<String, String> props, String name) {
if (!props.containsKey(name)) {
public static void ensurePropNotBlank(Map<String, String> props, String name) {
if (StringUtils.isBlank(props.get(name))) {
throw ValidatorUtils.invalidArgument(
String.format("'%s' not found, please check the WITH properties", name));
}
Expand All @@ -73,39 +73,39 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
throws Exception {
var props = request.getPropertiesMap();

ensurePropNotNull(props, DbzConnectorConfig.HOST);
ensurePropNotNull(props, DbzConnectorConfig.PORT);
ensurePropNotNull(props, DbzConnectorConfig.DB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.USER);
ensurePropNotNull(props, DbzConnectorConfig.PASSWORD);
ensurePropNotBlank(props, DbzConnectorConfig.HOST);
ensurePropNotBlank(props, DbzConnectorConfig.PORT);
ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.USER);
ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD);

// ensure table name is passed by user in single mode
if (Utils.getCdcSourceMode(props) == CdcSourceMode.SINGLE_MODE) {
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
}

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_CREATE);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE);
try (var validator = new PostgresValidator(props, tableSchema)) {
validator.validateAll();
}
break;

case CITUS:
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
coordinatorValidator.validateDistributedTable();
coordinatorValidator.validateTable();
}

ensurePropNotNull(props, DbzConnectorConfig.DB_SERVERS);
ensurePropNotBlank(props, DbzConnectorConfig.DB_SERVERS);
var workerServers =
StringUtils.split(props.get(DbzConnectorConfig.DB_SERVERS), ',');
// props extracted from grpc request, clone it to modify
Expand All @@ -126,6 +126,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema)) {
validator.validateAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DbzConnectorConfig {

private static final String DBZ_PROPERTY_PREFIX = "debezium.";

private static final String SNAPSHOT_MODE_KEY = "debezium.snapshot.mode";
private static final String SNAPSHOT_MODE_BACKFILL = "rw_cdc_backfill";

private static Map<String, String> extractDebeziumProperties(
Expand Down Expand Up @@ -103,31 +104,48 @@ public DbzConnectorConfig(
String startOffset,
Map<String, String> userProps,
boolean snapshotDone) {

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
var isCdcBackfill =
null != userProps.get(SNAPSHOT_MODE_KEY)
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}",
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}",
source,
sourceId,
startOffset,
snapshotDone);
snapshotDone,
isCdcBackfill);

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
if (source == SourceTypeE.MYSQL) {
var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor);
// if snapshot phase is finished and offset is specified, we will continue binlog
// reading from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
// 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is
// specified. It only snapshots the schemas, not the data, and continue binlog
// reading from the specified offset
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
} else if (mysqlProps.getProperty("snapshot.mode").equals(SNAPSHOT_MODE_BACKFILL)) {
// only snapshot table schemas which is not required by the source parser
mysqlProps.setProperty("snapshot.mode", "schema_only");
if (isCdcBackfill) {
// disable snapshot locking at all
mysqlProps.setProperty("snapshot.locking.mode", "none");

// If cdc backfill enabled, the source only emit incremental changes, so we must
// rewind to the given offset and continue binlog reading from there
if (null != startOffset && !startOffset.isBlank()) {
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
} else {
// read upstream table schemas and emit incremental changes only
mysqlProps.setProperty("snapshot.mode", "schema_only");
}
} else {
// if snapshot phase is finished and offset is specified, we will continue binlog
// reading from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
// 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is
// specified. It only snapshots the schemas, not the data, and continue binlog
// reading from the specified offset
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}
}

dbzProps.putAll(mysqlProps);
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message ExternalTableDesc {
string table_name = 4;
repeated uint32 stream_key = 5;
map<string, string> connect_properties = 6;
// upstream cdc source job id
uint32 source_id = 7;
}

enum JoinType {
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use crate::util::sort_util::ColumnOrder;
/// Compute node will use this information to connect to the external database and scan the table.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct CdcTableDesc {
/// Id of the upstream source in sharing cdc mode
/// Id of the table in RW
pub table_id: TableId,

/// Id of the upstream source in sharing cdc mode
pub source_id: TableId,

/// The full name of the table in external database, e.g. `database_name.table.name` in MySQL
/// and `schema_name.table_name` in the Postgres.
pub external_table_name: String,
Expand Down Expand Up @@ -58,6 +61,7 @@ impl CdcTableDesc {
pub fn to_protobuf(&self) -> ExternalTableDesc {
ExternalTableDesc {
table_id: self.table_id.into(),
source_id: self.source_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
table_name: self.external_table_name.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0, 1, 2],
None,
Arc::new(StreamingMetrics::unused()),
source_state_handler,
None,
Some(source_state_handler),
false,
4, // 4 rows in a snapshot chunk
);
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,12 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());
}
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// assumes an empty message as a heartbeat
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());

continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl SchemaTableName {
}
}

#[derive(Debug, Clone, Default, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct MySqlOffset {
pub filename: String,
pub position: u64,
Expand All @@ -133,14 +133,14 @@ impl MySqlOffset {
}
}

#[derive(Debug, Clone, Default, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct PostgresOffset {
pub txid: u64,
pub lsn: u64,
pub tx_usec: u64,
}

#[derive(Debug, Clone, PartialEq, PartialOrd)]
#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
pub enum CdcOffset {
MySql(MySqlOffset),
Postgres(PostgresOffset),
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ pub(crate) async fn gen_create_table_plan_with_source(

let cdc_table_desc = CdcTableDesc {
table_id: TableId::placeholder(),
source_id: TableId::placeholder(),
external_table_name: "".to_string(),
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
Expand Down Expand Up @@ -847,7 +848,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
derive_connect_properties(source.as_ref(), external_table_name.clone())?;

let cdc_table_desc = CdcTableDesc {
table_id: source.id.into(), // source can be considered as an external table
table_id: TableId::placeholder(), // will be filled in meta node
source_id: source.id.into(), // id of cdc source streaming job
external_table_name: external_table_name.clone(),
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
Expand Down
Loading

0 comments on commit 3b7036c

Please sign in to comment.