diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 1b4501865f86..6ddeae73de8b 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -25,7 +25,9 @@ use futures::stream::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, ScanRange}; -use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder}; +use risingwave_common::array::{ + Array, ArrayBuilder, DataChunk, DataChunkTestExt, Op, StreamChunk, Utf8ArrayBuilder, +}; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -186,6 +188,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { let actor_id = 0x1a; + // create state table let state_schema = Schema::new(vec![ Field::with_name(DataType::Varchar, "split_id"), Field::with_name(DataType::Int64, "id"), // pk @@ -227,6 +230,8 @@ async fn test_cdc_backfill() -> StreamResult<()> { state_table, Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit false, + 1, + 4, ) .boxed(), ); @@ -246,9 +251,10 @@ async fn test_cdc_backfill() -> StreamResult<()> { .boxed() .execute(); + // construct upstream chunks let chunk1_payload = vec![ r#"{ "payload": { "before": null, "after": { "id": 1, "price": 10.01}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, - r#"{ "payload": { "before": null, "after": { "id": 2, "price": 2.02}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, + r#"{ "payload": { "before": null, "after": { "id": 2, "price": 22.22}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 3, "price": 3.03}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 4, "price": 4.04}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 5, "price": 5.05}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, @@ -256,6 +262,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { ]; let chunk2_payload = vec![ + r#"{ "payload": { "before": null, "after": { "id": 1, "price": 11.11}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 6, "price": 10.08}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 199, "price": 40.5}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 978, "price": 72.6}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, @@ -311,11 +318,22 @@ async fn test_cdc_backfill() -> StreamResult<()> { // ingest data and barrier let interval = Duration::from_millis(10); + + // send a dummy barrier to trigger the backfill, since + // cdc backfill will wait for a barrier before start + curr_epoch.inc_epoch(); + tx.push_barrier(curr_epoch, false); + + // first chunk tx.push_chunk(stream_chunk1); + tokio::time::sleep(interval).await; + + // barrier to trigger emit buffered events curr_epoch.inc_epoch(); tx.push_barrier(curr_epoch, false); + // second chunk tx.push_chunk(stream_chunk2); tokio::time::sleep(interval).await; @@ -352,9 +370,25 @@ async fn test_cdc_backfill() -> StreamResult<()> { None, None, )); + + // check result let mut stream = scan.execute(); while let Some(message) = stream.next().await { - println!("[scan] chunk: {:#?}", message.unwrap()); + let chunk = message.expect("scan a chunk"); + let expect = DataChunk::from_pretty( + "I F + 1 11.11 + 2 22.22 + 3 3.03 + 4 4.04 + 5 5.05 + 6 10.08 + 8 1.0008 + 134 41.7 + 199 40.5 + 978 72.6", + ); + assert_eq!(expect, chunk); } mview_handle.await.unwrap()?; diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 4a652e1c1628..1cdc55a41b64 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -39,9 +39,15 @@ impl MockExternalTableReader { } pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { - Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default()))) + Box::new(move |offset| { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) + }) } + /// The snapshot will emit to downstream all in once, because it is too small. + /// After that we will emit the buffered upstream chunks all in one. #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner(&self) { let snap_idx = self @@ -49,18 +55,18 @@ impl MockExternalTableReader { .fetch_add(1, std::sync::atomic::Ordering::Relaxed); println!("snapshot read: idx {}", snap_idx); - let snap0 = vec![OwnedRow::new(vec![ - Some(ScalarImpl::Int64(1)), - Some(ScalarImpl::Float64(1.0001.into())), - ])]; - let snap1 = vec![ + let snap0 = vec![ OwnedRow::new(vec![ Some(ScalarImpl::Int64(1)), - Some(ScalarImpl::Float64(10.01.into())), + Some(ScalarImpl::Float64(1.0001.into())), + ]), + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(1)), + Some(ScalarImpl::Float64(11.00.into())), ]), OwnedRow::new(vec![ Some(ScalarImpl::Int64(2)), - Some(ScalarImpl::Float64(2.02.into())), + Some(ScalarImpl::Float64(22.00.into())), ]), OwnedRow::new(vec![ Some(ScalarImpl::Int64(5)), @@ -76,7 +82,7 @@ impl MockExternalTableReader { ]), ]; - let snapshots = [snap0, snap1]; + let snapshots = vec![snap0]; if snap_idx >= snapshots.len() { return Ok(()); } diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 22a8d06adee6..8b32fe63320a 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -59,8 +59,10 @@ pub struct CdcBackfillExecutor { output_indices: Vec, /// State table of the `CdcBackfill` executor - state_table: StateTable, + state_impl: CdcBackfillState, + // TODO: introduce a CdcBackfillProgress to report finish to Meta + // This object is just a stub right now progress: Option, metrics: Arc, @@ -69,6 +71,11 @@ pub struct CdcBackfillExecutor { rate_limit_rps: Option, disable_backfill: bool, + + // TODO: make these options configurable + snapshot_interval: u32, + + snapshot_read_limit: u32, } impl CdcBackfillExecutor { @@ -83,20 +90,56 @@ impl CdcBackfillExecutor { state_table: StateTable, rate_limit_rps: Option, disable_backfill: bool, + snapshot_interval: u32, + snapshot_read_limit: u32, ) -> Self { + let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap(); + let upstream_table_id = external_table.table_id().table_id; + let state_impl = CdcBackfillState::new( + upstream_table_id, + state_table, + pk_in_output_indices.len() + METADATA_STATE_LEN, + ); + Self { actor_ctx, external_table, upstream, output_indices, - state_table, + state_impl, progress, metrics, rate_limit_rps, disable_backfill, + snapshot_interval, + snapshot_read_limit, } } + fn report_metrics( + metrics: &Arc, + upstream_table_id: u32, + actor_id: u32, + snapshot_processed_row_count: u64, + upstream_processed_row_count: u64, + ) { + metrics + .cdc_backfill_snapshot_read_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(snapshot_processed_row_count); + + metrics + .cdc_backfill_upstream_output_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(upstream_processed_row_count); + } + #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. @@ -121,11 +164,7 @@ impl CdcBackfillExecutor { // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. - let mut state_impl = CdcBackfillState::new( - upstream_table_id, - self.state_table, - pk_in_output_indices.len() + METADATA_STATE_LEN, - ); + let mut state_impl = self.state_impl; let mut upstream = transform_upstream(upstream, &upstream_table_schema) .boxed() @@ -144,11 +183,10 @@ impl CdcBackfillExecutor { // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; - let mut snapshot_read_epoch; let mut last_binlog_offset: Option = state .last_cdc_offset - .map_or(upstream_table_reader.current_binlog_offset().await?, Some); + .map_or(upstream_table_reader.current_cdc_offset().await?, Some); let offset_parse_func = upstream_table_reader .inner() @@ -217,9 +255,6 @@ impl CdcBackfillExecutor { ?current_pk_pos, "start cdc backfill loop"); - // TODO: make the limit configurable - let snapshot_read_limit: usize = 1000; - // the buffer will be drained when a barrier comes let mut upstream_chunk_buffer: Vec = vec![]; @@ -227,10 +262,14 @@ impl CdcBackfillExecutor { let left_upstream = upstream.by_ref().map(Either::Left); let mut snapshot_read_row_cnt: usize = 0; - let args = - SnapshotReadArgs::new_for_cdc(current_pk_pos.clone(), self.rate_limit_rps); + let read_args = SnapshotReadArgs::new( + current_pk_pos.clone(), + self.rate_limit_rps, + pk_in_output_indices.clone(), + ); + let right_snapshot = pin!(upstream_table_reader - .snapshot_read(args, snapshot_read_limit as u32) + .snapshot_read_full_table(read_args, self.snapshot_read_limit) .map(Either::Right)); let (right_snapshot, valve) = pausable(right_snapshot); @@ -246,6 +285,8 @@ impl CdcBackfillExecutor { let mut cur_barrier_snapshot_processed_rows: u64 = 0; let mut cur_barrier_upstream_processed_rows: u64 = 0; + let mut barrier_count: u32 = 0; + let mut pending_barrier = None; #[for_await] for either in &mut backfill_stream { @@ -254,6 +295,11 @@ impl CdcBackfillExecutor { Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { + // increase the barrier count and check whether need to start a new snapshot + barrier_count += 1; + let can_start_new_snapshot = + barrier_count == self.snapshot_interval; + if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; match mutation { @@ -279,157 +325,43 @@ impl CdcBackfillExecutor { } } - // ensure the snapshot stream is consumed once - // otherwise, reconstruct a snapshot stream with same pk offset may return an - // empty stream (don't know the cause) - if snapshot_read_row_cnt == 0 { - pin_mut!(backfill_stream); - let (_, mut snapshot_stream) = - backfill_stream.get_pin_mut(); - if let Some(msg) = snapshot_stream.next().await { - let Either::Right(msg) = msg else { - bail!( - "BUG: snapshot_read contains upstream messages" - ); - }; - match msg? { - None => { - tracing::info!( - upstream_table_id, - ?last_binlog_offset, - ?current_pk_pos, - "snapshot read stream ends" - ); - // End of the snapshot read stream. - // Consume the buffered upstream chunk without filtering by `binlog_low`. - for chunk in upstream_chunk_buffer.drain(..) { - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - - // mark backfill has finished - state_impl - .mutate_state( - current_pk_pos, - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // commit state because we have received a barrier message - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); - // end of backfill loop, since backfill has finished - break 'backfill_loop; - } - Some(chunk) => { - // Raise the current pk position. - // Because snapshot streams are ordered by pk, so we can - // use the last row to update `current_pk_pos`. - current_pk_pos = Some(get_new_pos( - &chunk, - &pk_in_output_indices, - )); - - let chunk_cardinality = - chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += - chunk_cardinality; - total_snapshot_row_count += chunk_cardinality; - snapshot_read_row_cnt += - chunk_cardinality as usize; - - tracing::debug!( - upstream_table_id, - ?current_pk_pos, - ?snapshot_read_row_cnt, - "force emit a snapshot chunk" - ); - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - } - } - } + Self::report_metrics( + &self.metrics, + upstream_table_id, + self.actor_ctx.id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); - // If it is a barrier, switch snapshot and consume buffered - // upstream chunk. - // If no current_pos, means we did not process any snapshot yet. - // In that case we can just ignore the upstream buffer chunk. - if let Some(current_pos) = ¤t_pk_pos { - for chunk in upstream_chunk_buffer.drain(..) { - cur_barrier_upstream_processed_rows += - chunk.cardinality() as u64; - - // record the consumed binlog offset that will be - // persisted later - consumed_binlog_offset = get_cdc_chunk_last_offset( - &offset_parse_func, - &chunk, - )?; - yield Message::Chunk(mapping_chunk( - mark_cdc_chunk( - &offset_parse_func, - chunk, - current_pos, - &pk_in_output_indices, - &pk_order, - last_binlog_offset.clone(), - )?, - &self.output_indices, - )); - } - } + // when processing a barrier, check whether can start a new snapshot + // if the number of barriers reaches the snapshot interval + if can_start_new_snapshot { + // staging the barrier + pending_barrier = Some(barrier); + tracing::debug!( + upstream_table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "Prepare to start a new snapshot" + ); + // Break the loop for consuming snapshot and prepare to start a new snapshot + break; + } else { + // update and persist current backfill progress + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + false, + ) + .await?; - self.metrics - .cdc_backfill_snapshot_read_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_ctx.id.to_string().as_str(), - ]) - .inc_by(cur_barrier_snapshot_processed_rows); - - self.metrics - .cdc_backfill_upstream_output_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_ctx.id.to_string().as_str(), - ]) - .inc_by(cur_barrier_upstream_processed_rows); - - // Update last seen binlog offset - if consumed_binlog_offset.is_some() { - last_binlog_offset.clone_from(&consumed_binlog_offset); - } + state_impl.commit_state(barrier.epoch).await?; - // update and persist backfill state - state_impl - .mutate_state( - current_pk_pos.clone(), - last_binlog_offset.clone(), - total_snapshot_row_count, - false, - ) - .await?; - state_impl.commit_state(barrier.epoch).await?; - - snapshot_read_epoch = barrier.epoch.prev; - if let Some(progress) = self.progress.as_mut() { - progress.update( - barrier.epoch.curr, - snapshot_read_epoch, - total_snapshot_row_count, - ); + // emit barrier and continue consume the backfill stream + yield Message::Barrier(barrier); } - - yield Message::Barrier(barrier); - // Break the for loop and start a new snapshot read stream. - break; } Message::Chunk(chunk) => { // skip empty upstream chunk @@ -441,7 +373,6 @@ impl CdcBackfillExecutor { get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; tracing::trace!( - target: "events::stream::cdc_backfill", "recv changelog chunk: chunk_offset {:?}, capactiy {}", chunk_binlog_offset, chunk.capacity() @@ -455,7 +386,6 @@ impl CdcBackfillExecutor { && chunk_offset < *last_binlog_offset { tracing::trace!( - target: "events::stream::cdc_backfill", "skip changelog chunk: chunk_offset {:?}, capacity {}", chunk_offset, chunk.capacity() @@ -475,42 +405,25 @@ impl CdcBackfillExecutor { Either::Right(msg) => { match msg? { None => { - if snapshot_read_row_cnt < snapshot_read_limit { - tracing::info!( - upstream_table_id, - ?last_binlog_offset, - ?current_pk_pos, - "snapshot read stream ends" - ); - // If the snapshot read stream ends with less than `limit` rows, - // it means all historical data has been loaded. - // We should not mark the chunk anymore, - // otherwise, we will ignore some rows in the buffer. - // Here we choose to never mark the chunk. - // Consume with the renaming stream buffer chunk without mark. - for chunk in upstream_chunk_buffer.drain(..) { - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - - state_impl - .mutate_state( - current_pk_pos.clone(), - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // exit backfill - break 'backfill_loop; - } else { - // break the for loop to reconstruct a new snapshot with pk offset - // to ensure we load all historical data - break; + tracing::info!( + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends" + ); + // If the snapshot read stream ends, it means all historical + // data has been loaded. + // We should not mark the chunk anymore, + // otherwise, we will ignore some rows in the buffer. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); } + + // backfill has finished, exit the backfill loop and persist the state when we recv a barrier + break 'backfill_loop; } Some(chunk) => { // Raise the current position. @@ -527,8 +440,6 @@ impl CdcBackfillExecutor { let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_row_count += chunk_cardinality; - // count the number of rows in the snapshot - snapshot_read_row_cnt += chunk_cardinality as usize; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, @@ -538,6 +449,123 @@ impl CdcBackfillExecutor { } } } + + assert!(pending_barrier.is_some(), "pending_barrier must exist"); + let pending_barrier = pending_barrier.unwrap(); + + // Here we have to ensure the snapshot stream is consumed at least once, + // since the barrier event can kick in anytime. + // Otherwise, the result set of the new snapshot stream may become empty. + // It maybe a cancellation bug of the mysql driver. + let (_, mut snapshot_stream) = backfill_stream.into_inner(); + if let Some(msg) = snapshot_stream.next().await { + let Either::Right(msg) = msg else { + bail!("BUG: snapshot_read contains upstream messages"); + }; + match msg? { + None => { + tracing::info!( + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends in the force emit branch" + ); + // End of the snapshot read stream. + // Consume the buffered upstream chunk without filtering by `binlog_low`. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); + } + + // mark backfill has finished + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; + + // commit state because we have received a barrier message + state_impl.commit_state(pending_barrier.epoch).await?; + yield Message::Barrier(pending_barrier); + // end of backfill loop, since backfill has finished + break 'backfill_loop; + } + Some(chunk) => { + // Raise the current pk position. + current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + + let row_count = chunk.cardinality() as u64; + cur_barrier_snapshot_processed_rows += row_count; + total_snapshot_row_count += row_count; + snapshot_read_row_cnt += row_count as usize; + + tracing::debug!( + upstream_table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "force emit a snapshot chunk" + ); + yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); + } + } + } + + // If the number of barriers reaches the snapshot interval, + // consume the buffered upstream chunks. + if let Some(current_pos) = ¤t_pk_pos { + for chunk in upstream_chunk_buffer.drain(..) { + cur_barrier_upstream_processed_rows += chunk.cardinality() as u64; + + // record the consumed binlog offset that will be + // persisted later + consumed_binlog_offset = + get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; + + yield Message::Chunk(mapping_chunk( + mark_cdc_chunk( + &offset_parse_func, + chunk, + current_pos, + &pk_in_output_indices, + &pk_order, + last_binlog_offset.clone(), + )?, + &self.output_indices, + )); + } + } else { + // If no current_pos, means we did not process any snapshot yet. + // we can just ignore the upstream buffer chunk in that case. + upstream_chunk_buffer.clear(); + } + + // Update last seen binlog offset + if consumed_binlog_offset.is_some() { + last_binlog_offset.clone_from(&consumed_binlog_offset); + } + + Self::report_metrics( + &self.metrics, + upstream_table_id, + self.actor_ctx.id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); + + // update and persist current backfill progress + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + false, + ) + .await?; + + state_impl.commit_state(pending_barrier.epoch).await?; + yield Message::Barrier(pending_barrier); } } else if self.disable_backfill { // If backfill is disabled, we just mark the backfill as finished @@ -548,7 +576,7 @@ impl CdcBackfillExecutor { ); state_impl .mutate_state( - current_pk_pos, + current_pk_pos.clone(), last_binlog_offset.clone(), total_snapshot_row_count, true, @@ -571,7 +599,16 @@ impl CdcBackfillExecutor { if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { - // persist the backfill state + // finalized the backfill state + // TODO: unify `mutate_state` and `commit_state` into one method + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; state_impl.commit_state(barrier.epoch).await?; // mark progress as finished diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index bee582d4142e..48a4b569ec09 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -27,36 +27,42 @@ use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader use super::external::ExternalStorageTable; use crate::common::rate_limit::limited_chunk_size; -use crate::executor::backfill::utils::iter_chunks; +use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; pub trait UpstreamTableRead { - fn snapshot_read( + fn snapshot_read_full_table( &self, args: SnapshotReadArgs, - limit: u32, + batch_size: u32, ) -> impl Stream>> + Send + '_; - fn current_binlog_offset( + fn current_cdc_offset( &self, ) -> impl Future>> + Send + '_; } -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct SnapshotReadArgs { pub epoch: u64, pub current_pos: Option, pub ordered: bool, pub rate_limit_rps: Option, + pub pk_in_output_indices: Vec, } impl SnapshotReadArgs { - pub fn new_for_cdc(current_pos: Option, rate_limit_rps: Option) -> Self { + pub fn new( + current_pos: Option, + rate_limit_rps: Option, + pk_in_output_indices: Vec, + ) -> Self { Self { epoch: INVALID_EPOCH, current_pos, ordered: false, rate_limit_rps, + pk_in_output_indices, } } } @@ -80,7 +86,7 @@ impl UpstreamTableReader { impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] - async fn snapshot_read(&self, args: SnapshotReadArgs, read_limit: u32) { + async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) { let primary_keys = self .inner .pk_indices() @@ -91,27 +97,7 @@ impl UpstreamTableRead for UpstreamTableReader { }) .collect_vec(); - tracing::debug!( - "snapshot_read primary keys: {:?}, current_pos: {:?}", - primary_keys, - args.current_pos - ); - - let row_stream = self.inner.table_reader().snapshot_read( - self.inner.schema_table_name(), - args.current_pos, - primary_keys, - read_limit, - ); - - pin_mut!(row_stream); - - let mut builder = DataChunkBuilder::new( - self.inner.schema().data_types(), - limited_chunk_size(args.rate_limit_rps), - ); - let chunk_stream = iter_chunks(row_stream, &mut builder); - + // prepare rate limiter if args.rate_limit_rps == Some(0) { // If limit is 0, we should not read any data from the upstream table. // Keep waiting util the stream is rebuilt. @@ -119,7 +105,6 @@ impl UpstreamTableRead for UpstreamTableReader { future.await; unreachable!(); } - let limiter = args.rate_limit_rps.map(|limit| { tracing::info!(rate_limit = limit, "rate limit applied"); RateLimiter::direct_with_clock( @@ -128,40 +113,147 @@ impl UpstreamTableRead for UpstreamTableReader { ) }); - #[for_await] - for chunk in chunk_stream { - let chunk = chunk?; - let chunk_size = chunk.capacity(); + let mut read_args = args; + // loop to read all data from the table + loop { + tracing::debug!( + "snapshot_read primary keys: {:?}, current_pos: {:?}", + primary_keys, + read_args.current_pos + ); + + let mut read_count: usize = 0; + let row_stream = self.inner.table_reader().snapshot_read( + self.inner.schema_table_name(), + read_args.current_pos.clone(), + primary_keys.clone(), + batch_size, + ); - if args.rate_limit_rps.is_none() || chunk_size == 0 { - // no limit, or empty chunk - yield Some(chunk); - continue; + pin_mut!(row_stream); + let mut builder = DataChunkBuilder::new( + self.inner.schema().data_types(), + limited_chunk_size(read_args.rate_limit_rps), + ); + let chunk_stream = iter_chunks(row_stream, &mut builder); + let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default(); + + #[for_await] + for chunk in chunk_stream { + let chunk = chunk?; + let chunk_size = chunk.capacity(); + read_count += chunk.cardinality(); + current_pk_pos = get_new_pos(&chunk, &read_args.pk_in_output_indices); + + if read_args.rate_limit_rps.is_none() || chunk_size == 0 { + // no limit, or empty chunk + yield Some(chunk); + continue; + } else { + // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more. + // May be should be refactored to a common function later. + let limiter = limiter.as_ref().unwrap(); + let limit = read_args.rate_limit_rps.unwrap() as usize; + + // Because we produce chunks with limited-sized data chunk builder and all rows + // are `Insert`s, the chunk size should never exceed the limit. + assert!(chunk_size <= limit); + + // `InsufficientCapacity` should never happen because we have check the cardinality + limiter + .until_n_ready(NonZeroU32::new(chunk_size as u32).unwrap()) + .await + .unwrap(); + yield Some(chunk); + } } - // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more. - // May be should be refactored to a common function later. - let limiter = limiter.as_ref().unwrap(); - let limit = args.rate_limit_rps.unwrap() as usize; - - // Because we produce chunks with limited-sized data chunk builder and all rows - // are `Insert`s, the chunk size should never exceed the limit. - assert!(chunk_size <= limit); - - // `InsufficientCapacity` should never happen because we have check the cardinality - limiter - .until_n_ready(NonZeroU32::new(chunk_size as u32).unwrap()) - .await - .unwrap(); - yield Some(chunk); + // check read_count if the snapshot batch is finished + if read_count < batch_size as _ { + tracing::debug!("finished loading of full table snapshot"); + yield None; + unreachable!() + } else { + // update PK position and continue to read the table + read_args.current_pos = Some(current_pk_pos); + } } - - yield None; } - async fn current_binlog_offset(&self) -> StreamExecutorResult> { + async fn current_cdc_offset(&self) -> StreamExecutorResult> { let binlog = self.inner.table_reader().current_cdc_offset(); let binlog = binlog.await?; Ok(Some(binlog)) } } + +#[cfg(test)] +mod tests { + + use futures::pin_mut; + use futures_async_stream::for_await; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + use risingwave_connector::source::cdc::external::{ + ExternalTableReader, MySqlExternalTableReader, SchemaTableName, + }; + + use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; + + #[ignore] + #[tokio::test] + async fn test_mysql_table_reader() { + let columns = vec![ + ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64), + ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64), + ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar), + ]; + let rw_schema = Schema { + fields: columns.iter().map(Field::from).collect(), + }; + let props = convert_args!(hashmap!( + "hostname" => "localhost", + "port" => "8306", + "username" => "root", + "password" => "123456", + "database.name" => "mydb", + "table.name" => "orders_rw")); + + let reader = MySqlExternalTableReader::new(props, rw_schema.clone()) + .await + .unwrap(); + + let mut cnt: usize = 0; + let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))])); + loop { + let row_stream = reader.snapshot_read( + SchemaTableName { + schema_name: "mydb".to_string(), + table_name: "orders_rw".to_string(), + }, + start_pk.clone(), + vec!["o_orderkey".to_string()], + 1000, + ); + let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256); + let chunk_stream = iter_chunks(row_stream, &mut builder); + let pk_indices = vec![0]; + pin_mut!(chunk_stream); + #[for_await] + for chunk in chunk_stream { + let chunk = chunk.expect("data"); + start_pk = Some(get_new_pos(&chunk, &pk_indices)); + cnt += chunk.capacity(); + // println!("chunk: {:#?}", chunk); + println!("cnt: {}", cnt); + } + if cnt >= 1499900 { + println!("bye!"); + break; + } + } + } +} diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 3f1c3daa9e1d..ddd51d95a7ee 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -98,6 +98,8 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { state_table, node.rate_limit, disable_backfill, + 1, + 1000, ); Ok((params.info, exec).into()) }