Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): amortize snapshot read to multiple barriers #16349

Merged
merged 13 commits into from
Apr 26, 2024
40 changes: 37 additions & 3 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use futures::stream::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, ScanRange};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::array::{
Array, ArrayBuilder, DataChunk, DataChunkTestExt, Op, StreamChunk, Utf8ArrayBuilder,
};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
Expand Down Expand Up @@ -186,6 +188,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {

let actor_id = 0x1a;

// create state table
let state_schema = Schema::new(vec![
Field::with_name(DataType::Varchar, "split_id"),
Field::with_name(DataType::Int64, "id"), // pk
Expand Down Expand Up @@ -227,6 +230,8 @@ async fn test_cdc_backfill() -> StreamResult<()> {
state_table,
Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit
false,
1,
4,
)
.boxed(),
);
Expand All @@ -246,16 +251,18 @@ async fn test_cdc_backfill() -> StreamResult<()> {
.boxed()
.execute();

// construct upstream chunks
let chunk1_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 1, "price": 10.01}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 2, "price": 2.02}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 2, "price": 22.22}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 3, "price": 3.03}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 4, "price": 4.04}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 5, "price": 5.05}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 6.06}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
];

let chunk2_payload = vec![
r#"{ "payload": { "before": null, "after": { "id": 1, "price": 11.11}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 6, "price": 10.08}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 199, "price": 40.5}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
r#"{ "payload": { "before": null, "after": { "id": 978, "price": 72.6}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#,
Expand Down Expand Up @@ -311,11 +318,22 @@ async fn test_cdc_backfill() -> StreamResult<()> {

// ingest data and barrier
let interval = Duration::from_millis(10);

// send a dummy barrier to trigger the backfill, since
// cdc backfill will wait for a barrier before start
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

// first chunk
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;

// barrier to trigger emit buffered events
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

// second chunk
tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
Expand Down Expand Up @@ -352,9 +370,25 @@ async fn test_cdc_backfill() -> StreamResult<()> {
None,
None,
));

// check result
let mut stream = scan.execute();
while let Some(message) = stream.next().await {
println!("[scan] chunk: {:#?}", message.unwrap());
let chunk = message.expect("scan a chunk");
let expect = DataChunk::from_pretty(
"I F
1 11.11
2 22.22
3 3.03
4 4.04
5 5.05
6 10.08
8 1.0008
134 41.7
199 40.5
978 72.6",
);
assert_eq!(expect, chunk);
}

mview_handle.await.unwrap()?;
Expand Down
24 changes: 15 additions & 9 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,34 @@ impl MockExternalTableReader {
}

pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default())))
Box::new(move |offset| {
Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
offset,
)?))
})
}

/// The snapshot will emit to downstream all in once, because it is too small.
/// After that we will emit the buffered upstream chunks all in one.
#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(&self) {
let snap_idx = self
.snapshot_cnt
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("snapshot read: idx {}", snap_idx);

let snap0 = vec![OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(1.0001.into())),
])];
let snap1 = vec![
let snap0 = vec![
OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(10.01.into())),
Some(ScalarImpl::Float64(1.0001.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(1)),
Some(ScalarImpl::Float64(11.00.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(2)),
Some(ScalarImpl::Float64(2.02.into())),
Some(ScalarImpl::Float64(22.00.into())),
]),
OwnedRow::new(vec![
Some(ScalarImpl::Int64(5)),
Expand All @@ -76,7 +82,7 @@ impl MockExternalTableReader {
]),
];

let snapshots = [snap0, snap1];
let snapshots = vec![snap0];
if snap_idx >= snapshots.len() {
return Ok(());
}
Expand Down
Loading
Loading