From 37667316a66153bb7740978a409b7b068dd4be98 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 11 Apr 2024 13:54:00 +0800 Subject: [PATCH 01/11] wip: optimize cdc backfill --- .../src/executor/backfill/cdc/cdc_backfill.rs | 80 +++++++++++-------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index be53266868116..40133e58648b5 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -77,6 +77,8 @@ pub struct CdcBackfillExecutor { chunk_size: usize, disable_backfill: bool, + + snapshot_interval: u32, } impl CdcBackfillExecutor { @@ -102,6 +104,7 @@ impl CdcBackfillExecutor { metrics, chunk_size, disable_backfill, + snapshot_interval: 1, } } @@ -253,6 +256,7 @@ impl CdcBackfillExecutor { let mut cur_barrier_snapshot_processed_rows: u64 = 0; let mut cur_barrier_upstream_processed_rows: u64 = 0; + let mut barrier_count: u32 = 0; #[for_await] for either in &mut backfill_stream { @@ -261,6 +265,10 @@ impl CdcBackfillExecutor { Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { + barrier_count += 1; + let can_start_new_snapshot = + (barrier_count == self.snapshot_interval); + if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; match mutation { @@ -331,13 +339,11 @@ impl CdcBackfillExecutor { &pk_in_output_indices, )); - let chunk_cardinality = - chunk.cardinality() as u64; + let row_count = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += - chunk_cardinality; - total_snapshot_row_count += chunk_cardinality; - snapshot_read_row_cnt += - chunk_cardinality as usize; + row_count; + total_snapshot_row_count += row_count; + snapshot_read_row_cnt += row_count as usize; tracing::debug!( upstream_table_id, @@ -354,32 +360,36 @@ impl CdcBackfillExecutor { } } - // If it is a barrier, switch snapshot and consume buffered - // upstream chunk. - // If no current_pos, means we did not process any snapshot yet. - // In that case we can just ignore the upstream buffer chunk. - if let Some(current_pos) = ¤t_pk_pos { - for chunk in upstream_chunk_buffer.drain(..) { - cur_barrier_upstream_processed_rows += - chunk.cardinality() as u64; - - // record the consumed binlog offset that will be - // persisted later - consumed_binlog_offset = get_cdc_chunk_last_offset( - &offset_parse_func, - &chunk, - )?; - yield Message::Chunk(mapping_chunk( - mark_cdc_chunk( + if can_start_new_snapshot { + // If the number of barriers reaches the snapshot interval, + // consume the buffered upstream chunks. + if let Some(current_pos) = ¤t_pk_pos { + for chunk in upstream_chunk_buffer.drain(..) { + cur_barrier_upstream_processed_rows += + chunk.cardinality() as u64; + + // record the consumed binlog offset that will be + // persisted later + consumed_binlog_offset = get_cdc_chunk_last_offset( &offset_parse_func, - chunk, - current_pos, - &pk_in_output_indices, - &pk_order, - last_binlog_offset.clone(), - )?, - &self.output_indices, - )); + &chunk, + )?; + yield Message::Chunk(mapping_chunk( + mark_cdc_chunk( + &offset_parse_func, + chunk, + current_pos, + &pk_in_output_indices, + &pk_order, + last_binlog_offset.clone(), + )?, + &self.output_indices, + )); + } + } else { + // If no current_pos, means we did not process any snapshot yet. + // we can just ignore the upstream buffer chunk in that case. + upstream_chunk_buffer.clear(); } } @@ -423,10 +433,12 @@ impl CdcBackfillExecutor { total_snapshot_row_count, ); } - yield Message::Barrier(barrier); - // Break the for loop and start a new snapshot read stream. - break; + + if can_start_new_snapshot { + // Break the for loop and start a new snapshot read stream. + break; + } } Message::Chunk(chunk) => { // skip empty upstream chunk From b74f43c13bb121b16614aa04e9ee9c0e7e1a0af8 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 15 Apr 2024 18:19:47 +0800 Subject: [PATCH 02/11] WIP: encapsulate snapshot read full table in a function --- .../src/executor/backfill/cdc/cdc_backfill.rs | 338 ++++++++++-------- .../backfill/cdc/upstream_table/snapshot.rs | 47 ++- 2 files changed, 229 insertions(+), 156 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 40133e58648b5..ad2ff119cba22 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -46,7 +46,7 @@ use crate::executor::backfill::utils::{ }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, + expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgress; @@ -68,7 +68,8 @@ pub struct CdcBackfillExecutor { output_indices: Vec, /// State table of the `CdcBackfill` executor - state_table: StateTable, + // state_table: StateTable, + state_impl: CdcBackfillState, progress: Option, @@ -94,12 +95,20 @@ impl CdcBackfillExecutor { chunk_size: usize, disable_backfill: bool, ) -> Self { + let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap(); + let upstream_table_id = external_table.table_id().table_id; + let state_impl = CdcBackfillState::new( + upstream_table_id, + state_table, + pk_in_output_indices.len() + METADATA_STATE_LEN, + ); + Self { actor_ctx, external_table, upstream, output_indices, - state_table, + state_impl, progress, metrics, chunk_size, @@ -108,6 +117,30 @@ impl CdcBackfillExecutor { } } + fn report_metrics( + metrics: &Arc, + upstream_table_id: u32, + actor_id: u32, + snapshot_processed_row_count: u64, + upstream_processed_row_count: u64, + ) { + metrics + .cdc_backfill_snapshot_read_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(snapshot_processed_row_count); + + metrics + .cdc_backfill_upstream_output_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(upstream_processed_row_count); + } + #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. @@ -132,11 +165,7 @@ impl CdcBackfillExecutor { // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. - let mut state_impl = CdcBackfillState::new( - upstream_table_id, - self.state_table, - pk_in_output_indices.len() + METADATA_STATE_LEN, - ); + let mut state_impl = self.state_impl; let mut upstream = transform_upstream(upstream, &upstream_table_schema) .boxed() @@ -155,7 +184,7 @@ impl CdcBackfillExecutor { // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; - let mut snapshot_read_epoch; + // let mut snapshot_read_epoch; let mut last_binlog_offset: Option = state .last_cdc_offset @@ -236,8 +265,13 @@ impl CdcBackfillExecutor { 'backfill_loop: loop { let left_upstream = upstream.by_ref().map(Either::Left); + let mut has_snapshot_read = false; let mut snapshot_read_row_cnt: usize = 0; - let args = SnapshotReadArgs::new_for_cdc(current_pk_pos.clone(), self.chunk_size); + let args = SnapshotReadArgs::new( + current_pk_pos.clone(), + self.chunk_size, + pk_in_output_indices.clone(), + ); let right_snapshot = pin!(upstream_table_reader .snapshot_read(args, snapshot_read_limit as u32) @@ -257,6 +291,7 @@ impl CdcBackfillExecutor { let mut cur_barrier_snapshot_processed_rows: u64 = 0; let mut cur_barrier_upstream_processed_rows: u64 = 0; let mut barrier_count: u32 = 0; + let mut pending_barrier = None; #[for_await] for either in &mut backfill_stream { @@ -267,7 +302,7 @@ impl CdcBackfillExecutor { Message::Barrier(barrier) => { barrier_count += 1; let can_start_new_snapshot = - (barrier_count == self.snapshot_interval); + barrier_count == self.snapshot_interval; if let Some(mutation) = barrier.mutation.as_deref() { use crate::executor::Mutation; @@ -284,137 +319,15 @@ impl CdcBackfillExecutor { } } - // ensure the snapshot stream is consumed once - // otherwise, reconstruct a snapshot stream with same pk offset may return an - // empty stream (don't know the cause) - if snapshot_read_row_cnt == 0 { - pin_mut!(backfill_stream); - let (_, mut snapshot_stream) = - backfill_stream.get_pin_mut(); - if let Some(msg) = snapshot_stream.next().await { - let Either::Right(msg) = msg else { - bail!( - "BUG: snapshot_read contains upstream messages" - ); - }; - match msg? { - None => { - tracing::info!( - upstream_table_id, - ?last_binlog_offset, - ?current_pk_pos, - "snapshot read stream ends" - ); - // End of the snapshot read stream. - // Consume the buffered upstream chunk without filtering by `binlog_low`. - for chunk in upstream_chunk_buffer.drain(..) { - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - - // mark backfill has finished - state_impl - .mutate_state( - current_pk_pos, - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // commit state because we have received a barrier message - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); - // end of backfill loop, since backfill has finished - break 'backfill_loop; - } - Some(chunk) => { - // Raise the current pk position. - // Because snapshot streams are ordered by pk, so we can - // use the last row to update `current_pk_pos`. - current_pk_pos = Some(get_new_pos( - &chunk, - &pk_in_output_indices, - )); - - let row_count = chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += - row_count; - total_snapshot_row_count += row_count; - snapshot_read_row_cnt += row_count as usize; - - tracing::debug!( - upstream_table_id, - ?current_pk_pos, - ?snapshot_read_row_cnt, - "force emit a snapshot chunk" - ); - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - } - } - } - - if can_start_new_snapshot { - // If the number of barriers reaches the snapshot interval, - // consume the buffered upstream chunks. - if let Some(current_pos) = ¤t_pk_pos { - for chunk in upstream_chunk_buffer.drain(..) { - cur_barrier_upstream_processed_rows += - chunk.cardinality() as u64; - - // record the consumed binlog offset that will be - // persisted later - consumed_binlog_offset = get_cdc_chunk_last_offset( - &offset_parse_func, - &chunk, - )?; - yield Message::Chunk(mapping_chunk( - mark_cdc_chunk( - &offset_parse_func, - chunk, - current_pos, - &pk_in_output_indices, - &pk_order, - last_binlog_offset.clone(), - )?, - &self.output_indices, - )); - } - } else { - // If no current_pos, means we did not process any snapshot yet. - // we can just ignore the upstream buffer chunk in that case. - upstream_chunk_buffer.clear(); - } - } - - self.metrics - .cdc_backfill_snapshot_read_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_ctx.id.to_string().as_str(), - ]) - .inc_by(cur_barrier_snapshot_processed_rows); - - self.metrics - .cdc_backfill_upstream_output_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_ctx.id.to_string().as_str(), - ]) - .inc_by(cur_barrier_upstream_processed_rows); - - // Update last seen binlog offset - if consumed_binlog_offset.is_some() { - last_binlog_offset.clone_from(&consumed_binlog_offset); - } + Self::report_metrics( + &self.metrics, + upstream_table_id, + self.actor_ctx.id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); - // update and persist backfill state + // update and persist current backfill progress state_impl .mutate_state( current_pk_pos.clone(), @@ -425,19 +338,25 @@ impl CdcBackfillExecutor { .await?; state_impl.commit_state(barrier.epoch).await?; - snapshot_read_epoch = barrier.epoch.prev; - if let Some(progress) = self.progress.as_mut() { - progress.update( - barrier.epoch.curr, - snapshot_read_epoch, - total_snapshot_row_count, - ); - } - yield Message::Barrier(barrier); - + // snapshot_read_epoch = barrier.epoch.prev; + // if let Some(progress) = self.progress.as_mut() { + // progress.update( + // barrier.epoch.curr, + // snapshot_read_epoch, + // total_snapshot_row_count, + // ); + // } + + // when processing a barrier, check whether can start a new snapshot + // if the number of barriers reaches the snapshot interval if can_start_new_snapshot { - // Break the for loop and start a new snapshot read stream. + pending_barrier = Some(barrier); + has_snapshot_read = snapshot_read_row_cnt > 0; + // Break the for loop and prepare to start a new snapshot break; + } else { + // emit barrier and continue consume the backfill stream + yield Message::Barrier(barrier); } } Message::Chunk(chunk) => { @@ -513,9 +432,10 @@ impl CdcBackfillExecutor { ) .await?; - // exit backfill + // backfill has finished break 'backfill_loop; } else { + // TODO: // break the for loop to reconstruct a new snapshot with pk offset // to ensure we load all historical data break; @@ -547,6 +467,118 @@ impl CdcBackfillExecutor { } } } + + // ensure the snapshot stream is consumed once in the snapshot interval + // otherwise, reconstruct a snapshot stream with same pk offset may return an + // empty stream (don't know the cause) + if !has_snapshot_read { + // pin_mut!(backfill_stream); + let (_, mut snapshot_stream) = backfill_stream.into_inner(); + if let Some(msg) = snapshot_stream.next().await { + let Either::Right(msg) = msg else { + bail!("BUG: snapshot_read contains upstream messages"); + }; + match msg? { + None => { + tracing::info!( + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends" + ); + // End of the snapshot read stream. + // Consume the buffered upstream chunk without filtering by `binlog_low`. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); + } + + // mark backfill has finished + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; + + // commit state because we have received a barrier message + let barrier = pending_barrier.expect("pending_barrier must exist"); + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + // end of backfill loop, since backfill has finished + break 'backfill_loop; + } + Some(chunk) => { + // Raise the current pk position. + // Because snapshot streams are ordered by pk, so we can + // use the last row to update `current_pk_pos`. + current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + + let row_count = chunk.cardinality() as u64; + cur_barrier_snapshot_processed_rows += row_count; + total_snapshot_row_count += row_count; + snapshot_read_row_cnt += row_count as usize; + + tracing::debug!( + upstream_table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "force emit a snapshot chunk" + ); + yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); + } + } + } + } + + // If the number of barriers reaches the snapshot interval, + // consume the buffered upstream chunks. + if let Some(current_pos) = ¤t_pk_pos { + for chunk in upstream_chunk_buffer.drain(..) { + cur_barrier_upstream_processed_rows += chunk.cardinality() as u64; + + // record the consumed binlog offset that will be + // persisted later + consumed_binlog_offset = + get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; + + yield Message::Chunk(mapping_chunk( + mark_cdc_chunk( + &offset_parse_func, + chunk, + current_pos, + &pk_in_output_indices, + &pk_order, + last_binlog_offset.clone(), + )?, + &self.output_indices, + )); + } + } else { + // If no current_pos, means we did not process any snapshot yet. + // we can just ignore the upstream buffer chunk in that case. + upstream_chunk_buffer.clear(); + } + + // Update last seen binlog offset + if consumed_binlog_offset.is_some() { + last_binlog_offset.clone_from(&consumed_binlog_offset); + } + + Self::report_metrics( + &self.metrics, + upstream_table_id, + self.actor_ctx.id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); + + let barrier = pending_barrier.expect("pending_barrier must exist"); + yield Message::Barrier(barrier); } } else if self.disable_backfill { // If backfill is disabled, we just mark the backfill as finished diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 6835b7cd2c776..f6cf2021c49b8 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -23,7 +23,7 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader}; use super::external::ExternalStorageTable; -use crate::executor::backfill::utils::iter_chunks; +use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; pub trait UpstreamTableRead { @@ -38,21 +38,27 @@ pub trait UpstreamTableRead { ) -> impl Future>> + Send + '_; } -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct SnapshotReadArgs { pub epoch: u64, pub current_pos: Option, pub ordered: bool, pub chunk_size: usize, + pub pk_in_output_indices: Vec, } impl SnapshotReadArgs { - pub fn new_for_cdc(current_pos: Option, chunk_size: usize) -> Self { + pub fn new( + current_pos: Option, + chunk_size: usize, + pk_in_output_indices: Vec, + ) -> Self { Self { epoch: INVALID_EPOCH, current_pos, ordered: false, chunk_size, + pk_in_output_indices, } } } @@ -74,6 +80,41 @@ impl UpstreamTableReader { } } +impl UpstreamTableReader { + #[try_stream(ok = Option, error = StreamExecutorError)] + pub async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, limit: u32) { + let mut read_args = args; + + 'read_loop: loop { + let mut read_count: usize = 0; + let chunk_stream = self.snapshot_read(read_args.clone(), limit); + let mut current_pk_pos = read_args.current_pos.clone().unwrap_or(OwnedRow::default()); + #[for_await] + for chunk in chunk_stream { + let chunk = chunk?; + + match chunk { + Some(chunk) => { + read_count += chunk.cardinality(); + current_pk_pos = get_new_pos(&chunk, &read_args.pk_in_output_indices); + yield Some(chunk); + } + None => { + // reach the end of the table + if read_count < limit as _ { + break 'read_loop; + } else { + // update PK position + read_args.current_pos = Some(current_pk_pos); + break; + } + } + } + } + } + } +} + impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read(&self, args: SnapshotReadArgs, limit: u32) { From 5c6e8cfc0f1d02d1f563c71686fc2e646861640e Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 15 Apr 2024 18:53:12 +0800 Subject: [PATCH 03/11] refactor to start snapshot in a fixed interval --- .../src/executor/backfill/cdc/cdc_backfill.rs | 102 ++++++++---------- .../backfill/cdc/upstream_table/snapshot.rs | 1 + 2 files changed, 43 insertions(+), 60 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index ad2ff119cba22..c0eeefa49d81c 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -46,7 +46,7 @@ use crate::executor::backfill::utils::{ }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ - expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, Execute, Executor, Message, + expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgress; @@ -68,7 +68,6 @@ pub struct CdcBackfillExecutor { output_indices: Vec, /// State table of the `CdcBackfill` executor - // state_table: StateTable, state_impl: CdcBackfillState, progress: Option, @@ -113,7 +112,8 @@ impl CdcBackfillExecutor { metrics, chunk_size, disable_backfill, - snapshot_interval: 1, + // TODO: make this option configuable in the WITH clause + snapshot_interval: 5, } } @@ -267,14 +267,14 @@ impl CdcBackfillExecutor { let mut has_snapshot_read = false; let mut snapshot_read_row_cnt: usize = 0; - let args = SnapshotReadArgs::new( + let read_args = SnapshotReadArgs::new( current_pk_pos.clone(), self.chunk_size, pk_in_output_indices.clone(), ); let right_snapshot = pin!(upstream_table_reader - .snapshot_read(args, snapshot_read_limit as u32) + .snapshot_read_full_table(read_args, snapshot_read_limit as u32) .map(Either::Right)); let (right_snapshot, valve) = pausable(right_snapshot); @@ -338,20 +338,10 @@ impl CdcBackfillExecutor { .await?; state_impl.commit_state(barrier.epoch).await?; - // snapshot_read_epoch = barrier.epoch.prev; - // if let Some(progress) = self.progress.as_mut() { - // progress.update( - // barrier.epoch.curr, - // snapshot_read_epoch, - // total_snapshot_row_count, - // ); - // } - // when processing a barrier, check whether can start a new snapshot // if the number of barriers reaches the snapshot interval if can_start_new_snapshot { pending_barrier = Some(barrier); - has_snapshot_read = snapshot_read_row_cnt > 0; // Break the for loop and prepare to start a new snapshot break; } else { @@ -403,45 +393,40 @@ impl CdcBackfillExecutor { Either::Right(msg) => { match msg? { None => { - if snapshot_read_row_cnt < snapshot_read_limit { - tracing::info!( - upstream_table_id, - ?last_binlog_offset, - ?current_pk_pos, - "snapshot read stream ends" - ); - // If the snapshot read stream ends with less than `limit` rows, - // it means all historical data has been loaded. - // We should not mark the chunk anymore, - // otherwise, we will ignore some rows in the buffer. - // Here we choose to never mark the chunk. - // Consume with the renaming stream buffer chunk without mark. - for chunk in upstream_chunk_buffer.drain(..) { - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - - state_impl - .mutate_state( - current_pk_pos.clone(), - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // backfill has finished - break 'backfill_loop; - } else { - // TODO: - // break the for loop to reconstruct a new snapshot with pk offset - // to ensure we load all historical data - break; + tracing::info!( + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends" + ); + // If the snapshot read stream ends with less than `limit` rows, + // it means all historical data has been loaded. + // We should not mark the chunk anymore, + // otherwise, we will ignore some rows in the buffer. + // Here we choose to never mark the chunk. + // Consume with the renaming stream buffer chunk without mark. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); } + + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; + + // backfill has finished + break 'backfill_loop; } Some(chunk) => { + has_snapshot_read = true; + // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. @@ -456,8 +441,6 @@ impl CdcBackfillExecutor { let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_row_count += chunk_cardinality; - // count the number of rows in the snapshot - snapshot_read_row_cnt += chunk_cardinality as usize; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, @@ -468,11 +451,12 @@ impl CdcBackfillExecutor { } } - // ensure the snapshot stream is consumed once in the snapshot interval - // otherwise, reconstruct a snapshot stream with same pk offset may return an + assert!(pending_barrier.is_some(), "pending_barrier must exist"); + // Before start a new snapshot, we should ensure the snapshot stream is + // consumed once in the period of snapshot interval. Because if we + // reconstruct a snapshot stream with same pk offset, it will return an // empty stream (don't know the cause) if !has_snapshot_read { - // pin_mut!(backfill_stream); let (_, mut snapshot_stream) = backfill_stream.into_inner(); if let Some(msg) = snapshot_stream.next().await { let Either::Right(msg) = msg else { @@ -506,7 +490,7 @@ impl CdcBackfillExecutor { .await?; // commit state because we have received a barrier message - let barrier = pending_barrier.expect("pending_barrier must exist"); + let barrier = pending_barrier.unwrap(); state_impl.commit_state(barrier.epoch).await?; yield Message::Barrier(barrier); // end of backfill loop, since backfill has finished @@ -514,8 +498,6 @@ impl CdcBackfillExecutor { } Some(chunk) => { // Raise the current pk position. - // Because snapshot streams are ordered by pk, so we can - // use the last row to update `current_pk_pos`. current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); let row_count = chunk.cardinality() as u64; @@ -577,7 +559,7 @@ impl CdcBackfillExecutor { cur_barrier_upstream_processed_rows, ); - let barrier = pending_barrier.expect("pending_barrier must exist"); + let barrier = pending_barrier.unwrap(); yield Message::Barrier(barrier); } } else if self.disable_backfill { diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index f6cf2021c49b8..6d42fa6a931e9 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -81,6 +81,7 @@ impl UpstreamTableReader { } impl UpstreamTableReader { + /// Continuously read the rows from the upstream table until reaching the end of the table #[try_stream(ok = Option, error = StreamExecutorError)] pub async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, limit: u32) { let mut read_args = args; From adff2d459ffdcc09dbef17666749101101510ad5 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 18 Apr 2024 11:20:15 +0800 Subject: [PATCH 04/11] WIP: test the upstream buffered events --- .../src/executor/backfill/cdc/cdc_backfill.rs | 14 +++++++++----- .../backfill/cdc/upstream_table/snapshot.rs | 6 ++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index c0eeefa49d81c..2fd22320e1c70 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -342,6 +342,12 @@ impl CdcBackfillExecutor { // if the number of barriers reaches the snapshot interval if can_start_new_snapshot { pending_barrier = Some(barrier); + tracing::debug!( + upstream_table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "Prepare to start a new snapshot" + ); // Break the for loop and prepare to start a new snapshot break; } else { @@ -358,8 +364,7 @@ impl CdcBackfillExecutor { let chunk_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; - tracing::trace!( - target: "events::stream::cdc_backfill", + tracing::debug!( "recv changelog chunk: chunk_offset {:?}, capactiy {}", chunk_binlog_offset, chunk.capacity() @@ -372,8 +377,7 @@ impl CdcBackfillExecutor { if let Some(chunk_offset) = chunk_binlog_offset && chunk_offset < *last_binlog_offset { - tracing::trace!( - target: "events::stream::cdc_backfill", + tracing::debug!( "skip changelog chunk: chunk_offset {:?}, capacity {}", chunk_offset, chunk.capacity() @@ -468,7 +472,7 @@ impl CdcBackfillExecutor { upstream_table_id, ?last_binlog_offset, ?current_pk_pos, - "snapshot read stream ends" + "snapshot read stream ends in the force emit branch" ); // End of the snapshot read stream. // Consume the buffered upstream chunk without filtering by `binlog_low`. diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 6d42fa6a931e9..1700b5e139109 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -86,7 +86,7 @@ impl UpstreamTableReader { pub async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, limit: u32) { let mut read_args = args; - 'read_loop: loop { + loop { let mut read_count: usize = 0; let chunk_stream = self.snapshot_read(read_args.clone(), limit); let mut current_pk_pos = read_args.current_pos.clone().unwrap_or(OwnedRow::default()); @@ -103,7 +103,9 @@ impl UpstreamTableReader { None => { // reach the end of the table if read_count < limit as _ { - break 'read_loop; + tracing::debug!("finished loading of table snapshot"); + yield None; + unreachable!("snapshot stream is ended, should not reach here"); } else { // update PK position read_args.current_pos = Some(current_pk_pos); From 65df1385edb7b682bce38f9cb2e0d33f16196805 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sat, 20 Apr 2024 16:03:24 +0800 Subject: [PATCH 05/11] refine ut --- src/compute/tests/cdc_tests.rs | 43 +++++++++++++++++-- .../cdc/external/mock_external_table.rs | 24 +++++++---- .../src/executor/backfill/cdc/cdc_backfill.rs | 25 +++++++---- .../backfill/cdc/upstream_table/snapshot.rs | 24 +---------- src/stream/src/from_proto/stream_cdc_scan.rs | 2 + 5 files changed, 74 insertions(+), 44 deletions(-) diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 760c76ca6deba..ca0c60f319746 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -25,7 +25,10 @@ use futures::stream::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, ScanRange}; -use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder}; +use risingwave_common::array::{ + Array, ArrayBuilder, DataChunk, DataChunkTestExt, Op, StreamChunk, Utf8ArrayBuilder, +}; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -129,7 +132,7 @@ impl Execute for MockOffsetGenExecutor { } #[tokio::test] -async fn test_cdc_backfill() -> StreamResult<()> { +async fn test_cdc_backfill_basic() -> StreamResult<()> { use risingwave_common::types::DataType; let memory_state_store = MemoryStateStore::new(); @@ -186,6 +189,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { let actor_id = 0x1a; + // create state table let state_schema = Schema::new(vec![ Field::with_name(DataType::Varchar, "split_id"), Field::with_name(DataType::Int64, "id"), // pk @@ -227,6 +231,8 @@ async fn test_cdc_backfill() -> StreamResult<()> { state_table, 4, // 4 rows in a snapshot chunk false, + 1, + 1, ) .boxed(), ); @@ -246,9 +252,10 @@ async fn test_cdc_backfill() -> StreamResult<()> { .boxed() .execute(); + // construct upstream chunks let chunk1_payload = vec![ r#"{ "payload": { "before": null, "after": { "id": 1, "price": 10.01}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, - r#"{ "payload": { "before": null, "after": { "id": 2, "price": 2.02}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, + r#"{ "payload": { "before": null, "after": { "id": 2, "price": 22.22}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 3, "price": 3.03}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 4, "price": 4.04}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 5, "price": 5.05}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, @@ -256,6 +263,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { ]; let chunk2_payload = vec![ + r#"{ "payload": { "before": null, "after": { "id": 1, "price": 11.11}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 6, "price": 10.08}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 199, "price": 40.5}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, r#"{ "payload": { "before": null, "after": { "id": 978, "price": 72.6}, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002"}, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#, @@ -311,11 +319,22 @@ async fn test_cdc_backfill() -> StreamResult<()> { // ingest data and barrier let interval = Duration::from_millis(10); + + // send a dummy barrier to trigger the backfill, since + // cdc backfill will wait for a barrier before start + curr_epoch.inc_epoch(); + tx.push_barrier(curr_epoch, false); + + // first chunk tx.push_chunk(stream_chunk1); + tokio::time::sleep(interval).await; + + // barrier to trigger emit buffered events curr_epoch.inc_epoch(); tx.push_barrier(curr_epoch, false); + // second chunk tx.push_chunk(stream_chunk2); tokio::time::sleep(interval).await; @@ -352,9 +371,25 @@ async fn test_cdc_backfill() -> StreamResult<()> { None, None, )); + + // check result let mut stream = scan.execute(); while let Some(message) = stream.next().await { - println!("[scan] chunk: {:#?}", message.unwrap()); + let chunk = message.expect("scan a chunk"); + let expect = DataChunk::from_pretty( + "I F + 1 11.11 + 2 22.22 + 3 3.03 + 4 4.04 + 5 5.05 + 6 10.08 + 8 1.0008 + 134 41.7 + 199 40.5 + 978 72.6", + ); + assert_eq!(expect, chunk); } mview_handle.await.unwrap()?; diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 4a652e1c1628c..1cdc55a41b64d 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -39,9 +39,15 @@ impl MockExternalTableReader { } pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { - Box::new(move |_| Ok(CdcOffset::MySql(MySqlOffset::default()))) + Box::new(move |offset| { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) + }) } + /// The snapshot will emit to downstream all in once, because it is too small. + /// After that we will emit the buffered upstream chunks all in one. #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner(&self) { let snap_idx = self @@ -49,18 +55,18 @@ impl MockExternalTableReader { .fetch_add(1, std::sync::atomic::Ordering::Relaxed); println!("snapshot read: idx {}", snap_idx); - let snap0 = vec![OwnedRow::new(vec![ - Some(ScalarImpl::Int64(1)), - Some(ScalarImpl::Float64(1.0001.into())), - ])]; - let snap1 = vec![ + let snap0 = vec![ OwnedRow::new(vec![ Some(ScalarImpl::Int64(1)), - Some(ScalarImpl::Float64(10.01.into())), + Some(ScalarImpl::Float64(1.0001.into())), + ]), + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(1)), + Some(ScalarImpl::Float64(11.00.into())), ]), OwnedRow::new(vec![ Some(ScalarImpl::Int64(2)), - Some(ScalarImpl::Float64(2.02.into())), + Some(ScalarImpl::Float64(22.00.into())), ]), OwnedRow::new(vec![ Some(ScalarImpl::Int64(5)), @@ -76,7 +82,7 @@ impl MockExternalTableReader { ]), ]; - let snapshots = [snap0, snap1]; + let snapshots = vec![snap0]; if snap_idx >= snapshots.len() { return Ok(()); } diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index ff014b0d54956..37c8f1b1d0351 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -79,6 +79,8 @@ pub struct CdcBackfillExecutor { disable_backfill: bool, snapshot_interval: u32, + + snapshot_read_limit: u32, } impl CdcBackfillExecutor { @@ -93,6 +95,8 @@ impl CdcBackfillExecutor { state_table: StateTable, chunk_size: usize, disable_backfill: bool, + snapshot_interval: u32, + snapshot_read_limit: u32, ) -> Self { let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap(); let upstream_table_id = external_table.table_id().table_id; @@ -113,7 +117,8 @@ impl CdcBackfillExecutor { chunk_size, disable_backfill, // TODO: make this option configuable in the WITH clause - snapshot_interval: 5, + snapshot_interval, + snapshot_read_limit, } } @@ -184,11 +189,10 @@ impl CdcBackfillExecutor { // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; - // let mut snapshot_read_epoch; let mut last_binlog_offset: Option = state .last_cdc_offset - .map_or(upstream_table_reader.current_binlog_offset().await?, Some); + .map_or(upstream_table_reader.current_cdc_offset().await?, Some); let offset_parse_func = upstream_table_reader .inner() @@ -251,6 +255,11 @@ impl CdcBackfillExecutor { } } + println!( + "start cdc backfill loop, initial_binlog_offset {:?}", + last_binlog_offset + ); + tracing::info!(upstream_table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, @@ -258,7 +267,7 @@ impl CdcBackfillExecutor { "start cdc backfill loop"); // TODO: make the limit configurable - let snapshot_read_limit: usize = 1000; + // let snapshot_read_limit: usize = 1000; // the buffer will be drained when a barrier comes let mut upstream_chunk_buffer: Vec = vec![]; @@ -274,7 +283,7 @@ impl CdcBackfillExecutor { ); let right_snapshot = pin!(upstream_table_reader - .snapshot_read_full_table(read_args, snapshot_read_limit as u32) + .snapshot_read_full_table(read_args, self.snapshot_read_limit as u32) .map(Either::Right)); let (right_snapshot, valve) = pausable(right_snapshot); @@ -414,12 +423,10 @@ impl CdcBackfillExecutor { ?current_pk_pos, "snapshot read stream ends" ); - // If the snapshot read stream ends with less than `limit` rows, - // it means all historical data has been loaded. + // If the snapshot read stream ends, it means all historical + // data has been loaded. // We should not mark the chunk anymore, // otherwise, we will ignore some rows in the buffer. - // Here we choose to never mark the chunk. - // Consume with the renaming stream buffer chunk without mark. for chunk in upstream_chunk_buffer.drain(..) { yield Message::Chunk(mapping_chunk( chunk, diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 8057d9ce960e8..16a91470c2e14 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -13,12 +13,9 @@ // limitations under the License. use std::future::Future; -use std::num::NonZeroU32; use futures::{pin_mut, Stream}; use futures_async_stream::try_stream; -use governor::clock::MonotonicClock; -use governor::{Quota, RateLimiter}; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::row::OwnedRow; @@ -36,7 +33,7 @@ pub trait UpstreamTableRead { limit: u32, ) -> impl Stream>> + Send + '_; - fn current_binlog_offset( + fn current_cdc_offset( &self, ) -> impl Future>> + Send + '_; } @@ -152,32 +149,15 @@ impl UpstreamTableRead for UpstreamTableReader { let mut builder = DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); let chunk_stream = iter_chunks(row_stream, &mut builder); - if args.chunk_size == 0 { - // If limit is 0, we should not read any data from the upstream table. - // Keep waiting util the stream is rebuilt. - let future = futures::future::pending::<()>(); - future.await; - } - let limiter = { - let quota = Quota::per_second(NonZeroU32::new(args.chunk_size as u32).unwrap()); - let clock = MonotonicClock; - RateLimiter::direct_with_clock(quota, &clock) - }; #[for_await] for chunk in chunk_stream { let chunk = chunk?; - if chunk.cardinality() != 0 { - limiter - .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) - .await - .unwrap(); - } yield Some(chunk); } yield None; } - async fn current_binlog_offset(&self) -> StreamExecutorResult> { + async fn current_cdc_offset(&self) -> StreamExecutorResult> { let binlog = self.inner.table_reader().current_cdc_offset(); let binlog = binlog.await?; Ok(Some(binlog)) diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 2f035fc4f270a..d7f8f1fd94394 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -105,6 +105,8 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { state_table, backfill_chunk_size, disable_backfill, + 1, + 1000, ); Ok((params.info, exec).into()) } From 3faa90cd4cf575df78778720fa320c30eda752d2 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 21 Apr 2024 14:43:35 +0800 Subject: [PATCH 06/11] clean code --- src/compute/tests/cdc_tests.rs | 2 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 17 ++++++----------- .../backfill/cdc/upstream_table/snapshot.rs | 2 +- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index ca0c60f319746..2904606b7fa63 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -232,7 +232,7 @@ async fn test_cdc_backfill_basic() -> StreamResult<()> { 4, // 4 rows in a snapshot chunk false, 1, - 1, + 4, ) .boxed(), ); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 37c8f1b1d0351..d39bb5eeb79e9 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -70,6 +70,8 @@ pub struct CdcBackfillExecutor { /// State table of the `CdcBackfill` executor state_impl: CdcBackfillState, + // TODO: introduce a CdcBackfillProgress to report finish to Meta + // This object is just a stub right now progress: Option, metrics: Arc, @@ -78,6 +80,7 @@ pub struct CdcBackfillExecutor { disable_backfill: bool, + // TODO: make these options configurable snapshot_interval: u32, snapshot_read_limit: u32, @@ -116,7 +119,6 @@ impl CdcBackfillExecutor { metrics, chunk_size, disable_backfill, - // TODO: make this option configuable in the WITH clause snapshot_interval, snapshot_read_limit, } @@ -255,20 +257,12 @@ impl CdcBackfillExecutor { } } - println!( - "start cdc backfill loop, initial_binlog_offset {:?}", - last_binlog_offset - ); - tracing::info!(upstream_table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, "start cdc backfill loop"); - // TODO: make the limit configurable - // let snapshot_read_limit: usize = 1000; - // the buffer will be drained when a barrier comes let mut upstream_chunk_buffer: Vec = vec![]; 'backfill_loop: loop { @@ -283,7 +277,7 @@ impl CdcBackfillExecutor { ); let right_snapshot = pin!(upstream_table_reader - .snapshot_read_full_table(read_args, self.snapshot_read_limit as u32) + .snapshot_read_full_table(read_args, self.snapshot_read_limit) .map(Either::Right)); let (right_snapshot, valve) = pausable(right_snapshot); @@ -309,6 +303,7 @@ impl CdcBackfillExecutor { Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { + // increase the barrier count and check whether need to start a new snapshot barrier_count += 1; let can_start_new_snapshot = barrier_count == self.snapshot_interval; @@ -368,7 +363,7 @@ impl CdcBackfillExecutor { ?snapshot_read_row_cnt, "Prepare to start a new snapshot" ); - // Break the for loop and prepare to start a new snapshot + // Break the loop for consuming snapshot and prepare to start a new snapshot break; } else { // emit barrier and continue consume the backfill stream diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 16a91470c2e14..e7f020b128646 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -89,7 +89,7 @@ impl UpstreamTableReader { loop { let mut read_count: usize = 0; let chunk_stream = self.snapshot_read(read_args.clone(), limit); - let mut current_pk_pos = read_args.current_pos.clone().unwrap_or(OwnedRow::default()); + let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default(); #[for_await] for chunk in chunk_stream { let chunk = chunk?; From 2931d9c50739ce484a05fd7a1c86dd5f294f49e6 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 22 Apr 2024 13:01:26 +0800 Subject: [PATCH 07/11] minor --- src/compute/tests/cdc_tests.rs | 3 +-- src/stream/src/executor/backfill/cdc/cdc_backfill.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 2904606b7fa63..c5e295ca1eba8 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -28,7 +28,6 @@ use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor, use risingwave_common::array::{ Array, ArrayBuilder, DataChunk, DataChunkTestExt, Op, StreamChunk, Utf8ArrayBuilder, }; -use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -132,7 +131,7 @@ impl Execute for MockOffsetGenExecutor { } #[tokio::test] -async fn test_cdc_backfill_basic() -> StreamResult<()> { +async fn test_cdc_backfill() -> StreamResult<()> { use risingwave_common::types::DataType; let memory_state_store = MemoryStateStore::new(); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index d39bb5eeb79e9..429d4efa30021 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -379,7 +379,7 @@ impl CdcBackfillExecutor { let chunk_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?; - tracing::debug!( + tracing::trace!( "recv changelog chunk: chunk_offset {:?}, capactiy {}", chunk_binlog_offset, chunk.capacity() @@ -392,7 +392,7 @@ impl CdcBackfillExecutor { if let Some(chunk_offset) = chunk_binlog_offset && chunk_offset < *last_binlog_offset { - tracing::debug!( + tracing::trace!( "skip changelog chunk: chunk_offset {:?}, capacity {}", chunk_offset, chunk.capacity() From b587a8b6ef8f6d9150fdb877efffccb5e24da132 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Apr 2024 13:46:39 +0800 Subject: [PATCH 08/11] fix mysql incomplete scan problem --- src/connector/src/source/cdc/external/mod.rs | 6 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 116 ++++++------ .../backfill/cdc/upstream_table/snapshot.rs | 171 +++++++++++------- 3 files changed, 167 insertions(+), 126 deletions(-) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index bd2b623731c7c..6be1f86098a6c 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -576,10 +576,12 @@ impl ExternalTableReaderImpl { mod tests { use futures::pin_mut; - use futures_async_stream::for_await; + use futures_async_stream::{for_await, try_stream}; use maplit::{convert_args, hashmap}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; - use risingwave_common::types::DataType; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use crate::source::cdc::external::{ CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 429d4efa30021..e4cfa2d76bb13 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -268,7 +268,6 @@ impl CdcBackfillExecutor { 'backfill_loop: loop { let left_upstream = upstream.by_ref().map(Either::Left); - let mut has_snapshot_read = false; let mut snapshot_read_row_cnt: usize = 0; let read_args = SnapshotReadArgs::new( current_pk_pos.clone(), @@ -442,8 +441,6 @@ impl CdcBackfillExecutor { break 'backfill_loop; } Some(chunk) => { - has_snapshot_read = true; - // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. @@ -469,67 +466,62 @@ impl CdcBackfillExecutor { } assert!(pending_barrier.is_some(), "pending_barrier must exist"); - // Before start a new snapshot, we should ensure the snapshot stream is - // consumed once in the period of snapshot interval. Because if we - // reconstruct a snapshot stream with same pk offset, it will return an - // empty stream (don't know the cause) - if !has_snapshot_read { - let (_, mut snapshot_stream) = backfill_stream.into_inner(); - if let Some(msg) = snapshot_stream.next().await { - let Either::Right(msg) = msg else { - bail!("BUG: snapshot_read contains upstream messages"); - }; - match msg? { - None => { - tracing::info!( - upstream_table_id, - ?last_binlog_offset, - ?current_pk_pos, - "snapshot read stream ends in the force emit branch" - ); - // End of the snapshot read stream. - // Consume the buffered upstream chunk without filtering by `binlog_low`. - for chunk in upstream_chunk_buffer.drain(..) { - yield Message::Chunk(mapping_chunk( - chunk, - &self.output_indices, - )); - } - - // mark backfill has finished - state_impl - .mutate_state( - current_pk_pos, - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // commit state because we have received a barrier message - let barrier = pending_barrier.unwrap(); - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); - // end of backfill loop, since backfill has finished - break 'backfill_loop; - } - Some(chunk) => { - // Raise the current pk position. - current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); - - let row_count = chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += row_count; - total_snapshot_row_count += row_count; - snapshot_read_row_cnt += row_count as usize; - - tracing::debug!( - upstream_table_id, - ?current_pk_pos, - ?snapshot_read_row_cnt, - "force emit a snapshot chunk" - ); + // Here we have to ensure the snapshot stream is consumed at least once, + // since the barrier event can kick in anytime. + // Otherwise, the result set of the new snapshot stream may become empty. + // It maybe a cancellation bug of the mysql driver. + let (_, mut snapshot_stream) = backfill_stream.into_inner(); + if let Some(msg) = snapshot_stream.next().await { + let Either::Right(msg) = msg else { + bail!("BUG: snapshot_read contains upstream messages"); + }; + match msg? { + None => { + tracing::info!( + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends in the force emit branch" + ); + // End of the snapshot read stream. + // Consume the buffered upstream chunk without filtering by `binlog_low`. + for chunk in upstream_chunk_buffer.drain(..) { yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); } + + // mark backfill has finished + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; + + // commit state because we have received a barrier message + let barrier = pending_barrier.unwrap(); + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + // end of backfill loop, since backfill has finished + break 'backfill_loop; + } + Some(chunk) => { + // Raise the current pk position. + current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + + let row_count = chunk.cardinality() as u64; + cur_barrier_snapshot_processed_rows += row_count; + total_snapshot_row_count += row_count; + snapshot_read_row_cnt += row_count as usize; + + tracing::debug!( + upstream_table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "force emit a snapshot chunk" + ); + yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); } } } diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index e7f020b128646..e2dc093596c25 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -27,10 +27,10 @@ use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; pub trait UpstreamTableRead { - fn snapshot_read( + fn snapshot_read_full_table( &self, args: SnapshotReadArgs, - limit: u32, + batch_size: u32, ) -> impl Stream>> + Send + '_; fn current_cdc_offset( @@ -80,47 +80,9 @@ impl UpstreamTableReader { } } -impl UpstreamTableReader { - /// Continuously read the rows from the upstream table until reaching the end of the table - #[try_stream(ok = Option, error = StreamExecutorError)] - pub async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, limit: u32) { - let mut read_args = args; - - loop { - let mut read_count: usize = 0; - let chunk_stream = self.snapshot_read(read_args.clone(), limit); - let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default(); - #[for_await] - for chunk in chunk_stream { - let chunk = chunk?; - - match chunk { - Some(chunk) => { - read_count += chunk.cardinality(); - current_pk_pos = get_new_pos(&chunk, &read_args.pk_in_output_indices); - yield Some(chunk); - } - None => { - // reach the end of the table - if read_count < limit as _ { - tracing::debug!("finished loading of table snapshot"); - yield None; - unreachable!("snapshot stream is ended, should not reach here"); - } else { - // update PK position - read_args.current_pos = Some(current_pk_pos); - break; - } - } - } - } - } - } -} - impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] - async fn snapshot_read(&self, args: SnapshotReadArgs, limit: u32) { + async fn snapshot_read_full_table(&self, read_args: SnapshotReadArgs, batch_size: u32) { let primary_keys = self .inner .pk_indices() @@ -131,30 +93,45 @@ impl UpstreamTableRead for UpstreamTableReader { }) .collect_vec(); - tracing::debug!( - "snapshot_read primary keys: {:?}, current_pos: {:?}", - primary_keys, - args.current_pos - ); - - let row_stream = self.inner.table_reader().snapshot_read( - self.inner.schema_table_name(), - args.current_pos, - primary_keys, - limit, - ); - - pin_mut!(row_stream); + let mut read_args = read_args; + loop { + tracing::debug!( + "snapshot_read primary keys: {:?}, current_pos: {:?}", + primary_keys, + read_args.current_pos + ); + let mut read_count: usize = 0; + let row_stream = self.inner.table_reader().snapshot_read( + self.inner.schema_table_name(), + read_args.current_pos.clone(), + primary_keys.clone(), + batch_size, + ); + + pin_mut!(row_stream); + let mut builder = + DataChunkBuilder::new(self.inner.schema().data_types(), read_args.chunk_size); + let chunk_stream = iter_chunks(row_stream, &mut builder); + let mut current_pk_pos = read_args.current_pos.clone().unwrap_or_default(); - let mut builder = DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); - let chunk_stream = iter_chunks(row_stream, &mut builder); + #[for_await] + for chunk in chunk_stream { + let chunk = chunk?; + read_count += chunk.cardinality(); + current_pk_pos = get_new_pos(&chunk, &read_args.pk_in_output_indices); + yield Some(chunk); + } - #[for_await] - for chunk in chunk_stream { - let chunk = chunk?; - yield Some(chunk); + // check read_count if the snapshot batch is finished + if read_count < batch_size as _ { + tracing::debug!("finished loading of full table snapshot"); + yield None; + unreachable!() + } else { + // update PK position and continue to read the table + read_args.current_pos = Some(current_pk_pos); + } } - yield None; } async fn current_cdc_offset(&self) -> StreamExecutorResult> { @@ -163,3 +140,73 @@ impl UpstreamTableRead for UpstreamTableReader { Ok(Some(binlog)) } } +#[cfg(test)] +mod tests { + + use futures::pin_mut; + use futures_async_stream::{for_await, try_stream}; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + use risingwave_connector::source::cdc::external::{ + ExternalTableReader, MySqlExternalTableReader, SchemaTableName, + }; + + use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; + + #[ignore] + #[tokio::test] + async fn test_mysql_table_reader() { + let columns = vec![ + ColumnDesc::named("o_orderkey", ColumnId::new(1), DataType::Int64), + ColumnDesc::named("o_custkey", ColumnId::new(2), DataType::Int64), + ColumnDesc::named("o_orderstatus", ColumnId::new(3), DataType::Varchar), + ]; + let rw_schema = Schema { + fields: columns.iter().map(Field::from).collect(), + }; + let props = convert_args!(hashmap!( + "hostname" => "localhost", + "port" => "8306", + "username" => "root", + "password" => "123456", + "database.name" => "mydb", + "table.name" => "orders_rw")); + + let reader = MySqlExternalTableReader::new(props, rw_schema.clone()) + .await + .unwrap(); + + let mut cnt: usize = 0; + let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))])); + loop { + let row_stream = reader.snapshot_read( + SchemaTableName { + schema_name: "mydb".to_string(), + table_name: "orders_rw".to_string(), + }, + start_pk.clone(), + vec!["o_orderkey".to_string()], + 1000, + ); + let mut builder = DataChunkBuilder::new(rw_schema.clone().data_types(), 256); + let chunk_stream = iter_chunks(row_stream, &mut builder); + let pk_indices = vec![0]; + pin_mut!(chunk_stream); + #[for_await] + for chunk in chunk_stream { + let chunk = chunk.expect("data"); + start_pk = Some(get_new_pos(&chunk, &pk_indices)); + cnt += chunk.capacity(); + // println!("chunk: {:#?}", chunk); + println!("cnt: {}", cnt); + } + if cnt >= 1499900 { + println!("bye!"); + break; + } + } + } +} From 833a8be8c194b6a19ccbd45b8f3fccdc85b0e611 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Apr 2024 18:38:06 +0800 Subject: [PATCH 09/11] fix state commit --- src/connector/src/source/cdc/external/mod.rs | 4 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 67 +++++++++++-------- .../backfill/cdc/upstream_table/snapshot.rs | 2 +- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 6be1f86098a6c..ca704895906d5 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -576,12 +576,10 @@ impl ExternalTableReaderImpl { mod tests { use futures::pin_mut; - use futures_async_stream::{for_await, try_stream}; + use futures_async_stream::for_await; use maplit::{convert_args, hashmap}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; - use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; - use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use crate::source::cdc::external::{ CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index c92a4b995e293..7655abac857e6 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -333,20 +333,10 @@ impl CdcBackfillExecutor { cur_barrier_upstream_processed_rows, ); - // update and persist current backfill progress - state_impl - .mutate_state( - current_pk_pos.clone(), - last_binlog_offset.clone(), - total_snapshot_row_count, - false, - ) - .await?; - state_impl.commit_state(barrier.epoch).await?; - // when processing a barrier, check whether can start a new snapshot // if the number of barriers reaches the snapshot interval if can_start_new_snapshot { + // staging the barrier pending_barrier = Some(barrier); tracing::debug!( upstream_table_id, @@ -357,6 +347,18 @@ impl CdcBackfillExecutor { // Break the loop for consuming snapshot and prepare to start a new snapshot break; } else { + // update and persist current backfill progress + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + false, + ) + .await?; + + state_impl.commit_state(barrier.epoch).await?; + // emit barrier and continue consume the backfill stream yield Message::Barrier(barrier); } @@ -420,16 +422,7 @@ impl CdcBackfillExecutor { )); } - state_impl - .mutate_state( - current_pk_pos.clone(), - last_binlog_offset.clone(), - total_snapshot_row_count, - true, - ) - .await?; - - // backfill has finished + // backfill has finished, exit the backfill loop and persist the state when we recv a barrier break 'backfill_loop; } Some(chunk) => { @@ -458,6 +451,8 @@ impl CdcBackfillExecutor { } assert!(pending_barrier.is_some(), "pending_barrier must exist"); + let pending_barrier = pending_barrier.unwrap(); + // Here we have to ensure the snapshot stream is consumed at least once, // since the barrier event can kick in anytime. // Otherwise, the result set of the new snapshot stream may become empty. @@ -492,9 +487,8 @@ impl CdcBackfillExecutor { .await?; // commit state because we have received a barrier message - let barrier = pending_barrier.unwrap(); - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); + state_impl.commit_state(pending_barrier.epoch).await?; + yield Message::Barrier(pending_barrier); // end of backfill loop, since backfill has finished break 'backfill_loop; } @@ -560,8 +554,18 @@ impl CdcBackfillExecutor { cur_barrier_upstream_processed_rows, ); - let barrier = pending_barrier.unwrap(); - yield Message::Barrier(barrier); + // update and persist current backfill progress + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + false, + ) + .await?; + + state_impl.commit_state(pending_barrier.epoch).await?; + yield Message::Barrier(pending_barrier); } } else if self.disable_backfill { // If backfill is disabled, we just mark the backfill as finished @@ -595,7 +599,16 @@ impl CdcBackfillExecutor { if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { - // persist the backfill state + // finalized the backfill state + // TODO: unify `mutate_state` and `commit_state` into one method + state_impl + .mutate_state( + current_pk_pos.clone(), + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; state_impl.commit_state(barrier.epoch).await?; // mark progress as finished diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index fa9d11d44f8d1..48a4b569ec09e 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -191,7 +191,7 @@ impl UpstreamTableRead for UpstreamTableReader { mod tests { use futures::pin_mut; - use futures_async_stream::{for_await, try_stream}; + use futures_async_stream::for_await; use maplit::{convert_args, hashmap}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; use risingwave_common::row::OwnedRow; From 3f3535694d8fa9f67d8e1d2e795a0eb34cc38c87 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Apr 2024 18:51:38 +0800 Subject: [PATCH 10/11] minor --- src/stream/src/executor/backfill/cdc/cdc_backfill.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 7655abac857e6..8b32fe63320a0 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -479,7 +479,7 @@ impl CdcBackfillExecutor { // mark backfill has finished state_impl .mutate_state( - current_pk_pos, + current_pk_pos.clone(), last_binlog_offset.clone(), total_snapshot_row_count, true, @@ -576,7 +576,7 @@ impl CdcBackfillExecutor { ); state_impl .mutate_state( - current_pk_pos, + current_pk_pos.clone(), last_binlog_offset.clone(), total_snapshot_row_count, true, From d5b5a9f94afbe6a96b85a6121c66699c83f4db21 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Apr 2024 20:18:25 +0800 Subject: [PATCH 11/11] minor --- src/connector/src/source/cdc/external/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index ca704895906d5..bd2b623731c7c 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -579,7 +579,7 @@ mod tests { use futures_async_stream::for_await; use maplit::{convert_args, hashmap}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::DataType; use crate::source::cdc::external::{ CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName,