Skip to content

Commit

Permalink
refactor(cdc): clean-up cdc backfill code for the create table xx wit…
Browse files Browse the repository at this point in the history
…h (..) syntax (risingwavelabs#13765)
  • Loading branch information
StrikeW authored Dec 5, 2023
1 parent e0cfb82 commit 5454f6e
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 515 deletions.
4 changes: 0 additions & 4 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# CDC source basic test

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';

statement ok
create table products ( id INT,
name STRING,
Expand Down
4 changes: 0 additions & 4 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
create source mysql_mytest with (
Expand Down
22 changes: 11 additions & 11 deletions integration_tests/mysql-cdc/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
set cdc_backfill='true';

create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = '2'
);

CREATE TABLE lineitem_rw (
L_ORDERKEY BIGINT,
Expand All @@ -18,13 +27,4 @@ CREATE TABLE lineitem_rw (
L_SHIPMODE VARCHAR,
L_COMMENT VARCHAR,
PRIMARY KEY(L_ORDERKEY, L_LINENUMBER)
) WITH (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
table.name = 'lineitem',
server.id = '2'
);
) FROM mysql_mydb TABLE 'mydb.lineitem';
4 changes: 0 additions & 4 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ pub struct ConfigMap {
#[parameter(default = ConfigNonZeroU64::default())]
streaming_rate_limit: ConfigNonZeroU64,

/// Enable backfill for CDC table to allow lock-free and incremental snapshot
#[parameter(default = false)]
cdc_backfill: bool,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "recent_first_n" or "recent_last_n".
#[parameter(default = OverWindowCachePolicy::default(), rename = "rw_streaming_over_window_cache_policy")]
Expand Down
132 changes: 90 additions & 42 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -25,10 +26,9 @@ use futures::stream::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, ScanRange};
use risingwave_common::array::{
Array, ArrayBuilder, StreamChunk, StreamChunkTestExt, Utf8ArrayBuilder,
};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::{CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit};
use risingwave_connector::source::external::{
Expand All @@ -38,15 +38,15 @@ use risingwave_connector::source::{MockExternalTableReader, SplitImpl};
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::external::ExternalStorageTable;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
default_source_internal_table, expect_first_barrier, ActorContext, Barrier,
BoxedExecutor as StreamBoxedExecutor, BoxedMessageStream, CdcBackfillExecutor, Executor,
ExecutorInfo, MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef,
SourceStateTableHandler, StreamExecutorError,
expect_first_barrier, ActorContext, Barrier, BoxedExecutor as StreamBoxedExecutor,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, MaterializeExecutor, Message,
Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
Expand Down Expand Up @@ -156,11 +156,10 @@ async fn test_cdc_backfill() -> StreamResult<()> {

let table_id = TableId::new(1002);
let schema = Schema::new(vec![
Field::unnamed(DataType::Int64), // primary key
Field::unnamed(DataType::Float64),
Field::unnamed(DataType::Varchar),
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]);
let column_ids = vec![0.into(), 1.into(), 2.into()];
let column_ids = vec![0.into(), 1.into()];

let pk_indices = vec![0];

Expand All @@ -172,7 +171,6 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MockOffsetGenExecutor::new(Box::new(source), schema.clone(), pk_indices.clone());

let binlog_file = String::from("1.binlog");

// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
Expand All @@ -186,25 +184,48 @@ async fn test_cdc_backfill() -> StreamResult<()> {
];

let table_name = SchemaTableName::new("mock_table".to_string(), "public".to_string());
let table_schema = Schema::new(vec![
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
]);
let external_table = ExternalStorageTable::new(
table_id,
table_name,
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
schema.clone(),
table_schema.clone(),
vec![OrderType::ascending()],
pk_indices,
vec![0, 1],
);

let source_state_handler = SourceStateTableHandler::from_table_catalog(
&default_source_internal_table(0x2333),
MemoryStateStore::new(),
let actor_id = 0x1a;

let state_schema = Schema::new(vec![
Field::with_name(DataType::Varchar, "split_id"),
Field::with_name(DataType::Int64, "id"), // pk
Field::with_name(DataType::Boolean, "backfill_finished"),
Field::with_name(DataType::Int64, "row_count"),
Field::with_name(DataType::Jsonb, "cdc_offset"),
]);

let column_descs = vec![
ColumnDesc::unnamed(ColumnId::from(0), state_schema[0].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(1), state_schema[1].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(2), state_schema[2].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(3), state_schema[3].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(4), state_schema[4].data_type.clone()),
];

let state_table = StateTable::new_without_distribution(
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
vec![OrderType::ascending()],
vec![0_usize],
)
.await;

let actor_id = 0x1a;
let info = ExecutorInfo {
schema: schema.clone(),
schema: table_schema.clone(),
pk_indices: vec![0],
identity: "CdcBackfillExecutor".to_string(),
};
Expand All @@ -213,12 +234,10 @@ async fn test_cdc_backfill() -> StreamResult<()> {
info,
external_table,
Box::new(mock_offset_executor),
vec![0, 1, 2],
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
None,
Some(source_state_handler),
false,
state_table,
4, // 4 rows in a snapshot chunk
);

Expand All @@ -237,23 +256,38 @@ async fn test_cdc_backfill() -> StreamResult<()> {
.boxed()
.execute();

// push upstream chunks
let stream_chunk1 = StreamChunk::from_pretty(
" I F
+ 1 10.01
+ 2 2.02
+ 3 3.03 // binlog pos=2
+ 4 4.04
+ 5 5.05 // binlog pos=4
+ 6 6.06",
);
let stream_chunk2 = StreamChunk::from_pretty(
" I F
+ 6 10.08
+ 199 40.5
+ 978 72.6
+ 134 41.7",
);
let chunk1_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 1, "price": 10.01}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 2, "price": 2.02}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 3, "price": 3.03}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 4, "price": 4.04}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 5, "price": 5.05}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 6.06}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
];

let chunk2_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 10.08}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 199, "price": 40.5}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 978, "price": 72.6}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 134, "price": 41.7}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
];

let chunk1_datums: Vec<Datum> = chunk1_payload
.into_iter()
.map(|s| Some(JsonbVal::from_str(s).unwrap().into()))
.collect_vec();

let chunk2_datums: Vec<Datum> = chunk2_payload
.into_iter()
.map(|s| Some(JsonbVal::from_str(s).unwrap().into()))
.collect_vec();

let chunk_schema = Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
]);

let stream_chunk1 = create_stream_chunk(chunk1_datums, &chunk_schema);
let stream_chunk2 = create_stream_chunk(chunk2_datums, &chunk_schema);

// The first barrier
let curr_epoch = 11;
Expand Down Expand Up @@ -311,8 +345,8 @@ async fn test_cdc_backfill() -> StreamResult<()> {

// scan the final result of the mv table
let column_descs = vec![
ColumnDesc::unnamed(ColumnId::from(0), schema[0].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(1), schema[1].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(0), table_schema[0].data_type.clone()),
ColumnDesc::unnamed(ColumnId::from(1), table_schema[1].data_type.clone()),
];
let value_indices = (0..column_descs.len()).collect_vec();
// Since we have not polled `Materialize`, we cannot scan anything from this table
Expand Down Expand Up @@ -345,6 +379,20 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Ok(())
}

fn create_stream_chunk(datums: Vec<Datum>, schema: &Schema) -> StreamChunk {
let mut builders = schema.create_array_builders(8);
for datum in &datums {
builders[0].append(datum);
}
let columns = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();

let ops = vec![Op::Insert; datums.len()];
StreamChunk::from_parts(ops, DataChunk::new(columns, datums.len()))
}

async fn consume_message_stream(mut stream: BoxedMessageStream) -> StreamResult<()> {
while let Some(message) = stream.next().await {
let message = message?;
Expand Down
6 changes: 0 additions & 6 deletions src/connector/src/source/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,27 @@ impl MockExternalTableReader {
let snap0 = vec![OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(1.0001.into())),
None,
])];
let snap1 = vec![
OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(10.01.into())),
None,
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(2)),
Some(ScalarImpl::Float64(2.02.into())),
None,
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(5)),
Some(ScalarImpl::Float64(1.0005.into())),
None,
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(6)),
Some(ScalarImpl::Float64(1.0006.into())),
None,
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(8)),
Some(ScalarImpl::Float64(1.0008.into())),
None,
]),
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
with_config_map:
CDC_BACKFILL: 'true'
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand All @@ -71,5 +69,3 @@
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
with_config_map:
CDC_BACKFILL: 'true'
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@
) from mysql_mydb table 'mydb.t1';
explain_output: |
LogicalCdcScan { table: mydb.t1, columns: [v1, v2] }
with_config_map:
CDC_BACKFILL: 'true'
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand All @@ -89,5 +87,3 @@
└─StreamExchange { dist: HashShard(v1) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
with_config_map:
CDC_BACKFILL: 'true'
11 changes: 6 additions & 5 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use risingwave_connector::source::cdc::{
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::external::CdcTableType;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
Expand Down Expand Up @@ -1101,8 +1102,11 @@ pub async fn handle_create_source(
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job =
is_cdc_connector(&with_properties) && session.config().cdc_backfill();
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};

let (columns_from_resolve_source, source_info) = bind_columns_from_source(
&session,
Expand Down Expand Up @@ -1301,9 +1305,6 @@ pub mod tests {
"CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_string();
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
session
.set_config("cdc_backfill", "true".to_string())
.unwrap();

frontend
.run_sql_with_session(session.clone(), sql)
Expand Down
Loading

0 comments on commit 5454f6e

Please sign in to comment.