Skip to content

Commit

Permalink
feat(cdc): amortize snapshot read to multiple barriers (#16349)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Apr 26, 2024
1 parent c94445e commit 6329101
Show file tree
Hide file tree
Showing 5 changed files with 441 additions and 270 deletions.
40 changes: 37 additions & 3 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use futures::stream::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, ScanRange};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::array::{
Array, ArrayBuilder, DataChunk, DataChunkTestExt, Op, StreamChunk, Utf8ArrayBuilder,
};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
Expand Down Expand Up @@ -186,6 +188,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {

let actor_id = 0x1a;

// create state table
let state_schema = Schema::new(vec![
Field::with_name(DataType::Varchar, "split_id"),
Field::with_name(DataType::Int64, "id"), // pk
Expand Down Expand Up @@ -227,6 +230,8 @@ async fn test_cdc_backfill() -> StreamResult<()> {
state_table,
Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit
false,
1,
4,
)
.boxed(),
);
Expand All @@ -246,16 +251,18 @@ async fn test_cdc_backfill() -> StreamResult<()> {
.boxed()
.execute();

// construct upstream chunks
let chunk1_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 1, "price": 10.01}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 2, "price": 2.02}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 2, "price": 22.22}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 3, "price": 3.03}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 4, "price": 4.04}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 5, "price": 5.05}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 6.06}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
];

let chunk2_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 1, "price": 11.11}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 10.08}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 199, "price": 40.5}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 978, "price": 72.6}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
Expand Down Expand Up @@ -311,11 +318,22 @@ async fn test_cdc_backfill() -> StreamResult<()> {

// ingest data and barrier
let interval = Duration::from_millis(10);

// send a dummy barrier to trigger the backfill, since
// cdc backfill will wait for a barrier before start
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

// first chunk
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;

// barrier to trigger emit buffered events
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

// second chunk
tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
Expand Down Expand Up @@ -352,9 +370,25 @@ async fn test_cdc_backfill() -> StreamResult<()> {
None,
None,
));

// check result
let mut stream = scan.execute();
while let Some(message) = stream.next().await {
println!("[scan] chunk: {:#?}", message.unwrap());
let chunk = message.expect("scan a chunk");
let expect = DataChunk::from_pretty(
"I F
1 11.11
2 22.22
3 3.03
4 4.04
5 5.05
6 10.08
8 1.0008
134 41.7
199 40.5
978 72.6",
);
assert_eq!(expect, chunk);
}

mview_handle.await.unwrap()?;
Expand Down
24 changes: 15 additions & 9 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,34 @@ impl MockExternalTableReader {
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

/// The snapshot will emit to downstream all in once, because it is too small.
/// After that we will emit the buffered upstream chunks all in one.
#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
.snapshot_cnt
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("snapshot read: idx {}", snap_idx);

let snap0 = vec![OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(1.0001.into())),
])];
let snap1 = vec![
let snap0 = vec![
OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(10.01.into())),
Some(ScalarImpl::Float64(1.0001.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(11.00.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(2)),
Some(ScalarImpl::Float64(2.02.into())),
Some(ScalarImpl::Float64(22.00.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(5)),
Expand All @@ -76,7 +82,7 @@ impl MockExternalTableReader {
]),
];

let snapshots = [snap0, snap1];
let snapshots = vec![snap0];
if snap_idx >= snapshots.len() {
return Ok(());
}
Expand Down
Loading

0 comments on commit 6329101

Please sign in to comment.