From 8ea014fece04db92aeea8a1a72c203f8965ce7af Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 31 Aug 2023 21:07:13 +0800 Subject: [PATCH] fix(cdc-backfill): also persist the finish flag even though snapshot is empty (#12002) --- .../src/executor/backfill/cdc_backfill.rs | 177 +++++++++++------- 1 file changed, 105 insertions(+), 72 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index 10eba2adce9ed..55742348bbf03 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -27,8 +27,9 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{JsonbVal, ScalarRefImpl}; +use risingwave_common::util::epoch::EpochPair; use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset}; -use risingwave_connector::source::{SplitImpl, SplitMetaData}; +use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_storage::StateStore; use serde_json::Value; @@ -43,7 +44,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, Message, Mutation, PkIndices, PkIndicesRef, SourceStateTableHandler, - StreamExecutorError, + StreamExecutorError, StreamExecutorResult, }; use crate::task::{ActorId, CreateMviewProgress}; @@ -109,9 +110,6 @@ impl CdcBackfillExecutor { async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); - - tracing::info!("pk_in_output_indices: {:?}", pk_in_output_indices); - let pk_order = self.upstream_table.pk_order_types().to_vec(); let upstream_table_id = self.upstream_table.table_id().table_id; @@ -123,6 +121,8 @@ impl CdcBackfillExecutor { // `None` means it starts from the beginning. let mut current_pk_pos: Option; + tracing::info!(upstream_table_id, ?pk_in_output_indices); + // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; let init_epoch = first_barrier.epoch.prev; @@ -130,7 +130,7 @@ impl CdcBackfillExecutor { // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. let mut invalid_backfill = false; - let mut split_id: Option> = None; + let mut split_id: Option = None; let mut cdc_split: Option = None; if let Some(mutation) = first_barrier.mutation.as_ref() { match mutation.as_ref() { @@ -257,9 +257,9 @@ impl CdcBackfillExecutor { let _ = Pin::new(&mut upstream).peek().await; tracing::info!( - "start the bacfill loop: [initial] binlog offset {:?}", - last_binlog_offset, - ); + upstream_table_id, + initial_binlog_offset = ?last_binlog_offset, + "start the bacfill loop"); 'backfill_loop: loop { let mut upstream_chunk_buffer: Vec = vec![]; @@ -337,10 +337,11 @@ impl CdcBackfillExecutor { } // seal current epoch even though there is no data - self.source_state_handler - .state_store - .commit(barrier.epoch) - .await?; + Self::persist_state( + &mut self.source_state_handler, + barrier.epoch, + ) + .await?; yield Message::Barrier(barrier); // Break the for loop and start a new snapshot read stream. @@ -386,8 +387,10 @@ impl CdcBackfillExecutor { match msg? { None => { tracing::info!( - "snapshot read stream ends: last_binlog_offset {:?}, current_pk_pos {:?}", - last_binlog_offset, current_pk_pos + upstream_table_id, + ?last_binlog_offset, + ?current_pk_pos, + "snapshot read stream ends" ); // End of the snapshot read stream. // We should not mark the chunk anymore, @@ -404,59 +407,14 @@ impl CdcBackfillExecutor { )); } - // When snapshot read stream ends, we should persist two states: - // 1) a backfill finish flag to denote the backfill has done - // 2) a consumed binlog offset to denote the last binlog offset - if let Some(split_id) = split_id.as_ref() { - let mut key = split_id.to_string(); - key.push_str(BACKFILL_STATE_KEY_SUFFIX); - self.source_state_handler - .set(key.into(), JsonbVal::from(Value::Bool(true))) - .await?; - - if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut() - && let Some(s) = split.mysql_split.as_mut() { - let start_offset = - last_binlog_offset.as_ref().map(|cdc_offset| { - let source_offset = - if let CdcOffset::MySql(o) = cdc_offset - { - DebeziumSourceOffset { - file: Some(o.filename.clone()), - pos: Some(o.position), - ..Default::default() - } - } else { - DebeziumSourceOffset::default() - }; - - let mut server = "RW_CDC_".to_string(); - server.push_str( - upstream_table_id.to_string().as_str(), - ); - DebeziumOffset { - source_partition: hashmap! { - "server".to_string() => server - }, - source_offset, - } - }); - - // persist the last binlog offset into split state - s.inner.start_offset = start_offset.map(|o| { - let value = serde_json::to_value(o).unwrap(); - value.to_string() - }); - s.inner.snapshot_done = true; - } - - if let Some(split_impl) = cdc_split { - self.source_state_handler - .set(split_impl.id(), split_impl.encode_to_json()) - .await? - } - } - + Self::write_backfill_state( + &mut self.source_state_handler, + upstream_table_id, + &split_id, + &mut cdc_split, + last_binlog_offset.clone(), + ) + .await?; break 'backfill_loop; } Some(chunk) => { @@ -483,6 +441,15 @@ impl CdcBackfillExecutor { } } } + } else { + Self::write_backfill_state( + &mut self.source_state_handler, + upstream_table_id, + &split_id, + &mut cdc_split, + None, + ) + .await?; } tracing::debug!( @@ -500,15 +467,81 @@ impl CdcBackfillExecutor { if let Some(msg) = mapping_message(msg?, &self.output_indices) { // persist the backfill state if any if let Message::Barrier(barrier) = &msg { - self.source_state_handler - .state_store - .commit(barrier.epoch) - .await?; + Self::persist_state(&mut self.source_state_handler, barrier.epoch).await?; } yield msg; } } } + + /// When snapshot read stream ends, we should persist two states: + /// 1) a backfill finish flag to denote the backfill has done + /// 2) a consumed binlog offset to denote the last binlog offset + /// which will be committed to the state store upon next barrier. + async fn write_backfill_state( + source_state_handler: &mut SourceStateTableHandler, + upstream_table_id: u32, + split_id: &Option, + cdc_split: &mut Option, + last_binlog_offset: Option, + ) -> StreamExecutorResult<()> { + if let Some(split_id) = split_id.as_ref() { + let mut key = split_id.to_string(); + key.push_str(BACKFILL_STATE_KEY_SUFFIX); + source_state_handler + .set(key.into(), JsonbVal::from(Value::Bool(true))) + .await?; + + if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut() + && let Some(s) = split.mysql_split.as_mut() { + let start_offset = + last_binlog_offset.as_ref().map(|cdc_offset| { + let source_offset = + if let CdcOffset::MySql(o) = cdc_offset + { + DebeziumSourceOffset { + file: Some(o.filename.clone()), + pos: Some(o.position), + ..Default::default() + } + } else { + DebeziumSourceOffset::default() + }; + + let mut server = "RW_CDC_".to_string(); + server.push_str( + upstream_table_id.to_string().as_str(), + ); + DebeziumOffset { + source_partition: hashmap! { + "server".to_string() => server + }, + source_offset, + } + }); + + // persist the last binlog offset into split state + s.inner.start_offset = start_offset.map(|o| { + let value = serde_json::to_value(o).unwrap(); + value.to_string() + }); + s.inner.snapshot_done = true; + } + if let Some(split_impl) = cdc_split { + source_state_handler + .set(split_impl.id(), split_impl.encode_to_json()) + .await? + } + } + Ok(()) + } + + async fn persist_state( + source_state_handler: &mut SourceStateTableHandler, + new_epoch: EpochPair, + ) -> StreamExecutorResult<()> { + source_state_handler.state_store.commit(new_epoch).await + } } impl Executor for CdcBackfillExecutor {