Skip to content

Commit

Permalink
fix(cdc-backfill): also persist the finish flag even though snapshot …
Browse files Browse the repository at this point in the history
…is empty (#12002)
  • Loading branch information
StrikeW authored Aug 31, 2023
1 parent 96a49f7 commit 8ea014f
Showing 1 changed file with 105 additions and 72 deletions.
177 changes: 105 additions & 72 deletions src/stream/src/executor/backfill/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{JsonbVal, ScalarRefImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset};
use risingwave_connector::source::{SplitImpl, SplitMetaData};
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_storage::StateStore;
use serde_json::Value;

Expand All @@ -43,7 +44,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor,
ExecutorInfo, Message, Mutation, PkIndices, PkIndicesRef, SourceStateTableHandler,
StreamExecutorError,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};

Expand Down Expand Up @@ -109,9 +110,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
async fn execute_inner(mut self) {
// The primary key columns, in the output columns of the upstream_table scan.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();

tracing::info!("pk_in_output_indices: {:?}", pk_in_output_indices);

let pk_order = self.upstream_table.pk_order_types().to_vec();

let upstream_table_id = self.upstream_table.table_id().table_id;
Expand All @@ -123,14 +121,16 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// `None` means it starts from the beginning.
let mut current_pk_pos: Option<OwnedRow>;

tracing::info!(upstream_table_id, ?pk_in_output_indices);

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let init_epoch = first_barrier.epoch.prev;

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
let mut invalid_backfill = false;
let mut split_id: Option<Arc<str>> = None;
let mut split_id: Option<SplitId> = None;
let mut cdc_split: Option<SplitImpl> = None;
if let Some(mutation) = first_barrier.mutation.as_ref() {
match mutation.as_ref() {
Expand Down Expand Up @@ -257,9 +257,9 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let _ = Pin::new(&mut upstream).peek().await;

tracing::info!(
"start the bacfill loop: [initial] binlog offset {:?}",
last_binlog_offset,
);
upstream_table_id,
initial_binlog_offset = ?last_binlog_offset,
"start the bacfill loop");

'backfill_loop: loop {
let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
Expand Down Expand Up @@ -337,10 +337,11 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}

// seal current epoch even though there is no data
self.source_state_handler
.state_store
.commit(barrier.epoch)
.await?;
Self::persist_state(
&mut self.source_state_handler,
barrier.epoch,
)
.await?;

yield Message::Barrier(barrier);
// Break the for loop and start a new snapshot read stream.
Expand Down Expand Up @@ -386,8 +387,10 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
match msg? {
None => {
tracing::info!(
"snapshot read stream ends: last_binlog_offset {:?}, current_pk_pos {:?}",
last_binlog_offset, current_pk_pos
upstream_table_id,
?last_binlog_offset,
?current_pk_pos,
"snapshot read stream ends"
);
// End of the snapshot read stream.
// We should not mark the chunk anymore,
Expand All @@ -404,59 +407,14 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
));
}

// When snapshot read stream ends, we should persist two states:
// 1) a backfill finish flag to denote the backfill has done
// 2) a consumed binlog offset to denote the last binlog offset
if let Some(split_id) = split_id.as_ref() {
let mut key = split_id.to_string();
key.push_str(BACKFILL_STATE_KEY_SUFFIX);
self.source_state_handler
.set(key.into(), JsonbVal::from(Value::Bool(true)))
.await?;

if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut()
&& let Some(s) = split.mysql_split.as_mut() {
let start_offset =
last_binlog_offset.as_ref().map(|cdc_offset| {
let source_offset =
if let CdcOffset::MySql(o) = cdc_offset
{
DebeziumSourceOffset {
file: Some(o.filename.clone()),
pos: Some(o.position),
..Default::default()
}
} else {
DebeziumSourceOffset::default()
};

let mut server = "RW_CDC_".to_string();
server.push_str(
upstream_table_id.to_string().as_str(),
);
DebeziumOffset {
source_partition: hashmap! {
"server".to_string() => server
},
source_offset,
}
});

// persist the last binlog offset into split state
s.inner.start_offset = start_offset.map(|o| {
let value = serde_json::to_value(o).unwrap();
value.to_string()
});
s.inner.snapshot_done = true;
}

if let Some(split_impl) = cdc_split {
self.source_state_handler
.set(split_impl.id(), split_impl.encode_to_json())
.await?
}
}

Self::write_backfill_state(
&mut self.source_state_handler,
upstream_table_id,
&split_id,
&mut cdc_split,
last_binlog_offset.clone(),
)
.await?;
break 'backfill_loop;
}
Some(chunk) => {
Expand All @@ -483,6 +441,15 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}
}
} else {
Self::write_backfill_state(
&mut self.source_state_handler,
upstream_table_id,
&split_id,
&mut cdc_split,
None,
)
.await?;
}

tracing::debug!(
Expand All @@ -500,15 +467,81 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
if let Some(msg) = mapping_message(msg?, &self.output_indices) {
// persist the backfill state if any
if let Message::Barrier(barrier) = &msg {
self.source_state_handler
.state_store
.commit(barrier.epoch)
.await?;
Self::persist_state(&mut self.source_state_handler, barrier.epoch).await?;
}
yield msg;
}
}
}

/// When snapshot read stream ends, we should persist two states:
/// 1) a backfill finish flag to denote the backfill has done
/// 2) a consumed binlog offset to denote the last binlog offset
/// which will be committed to the state store upon next barrier.
async fn write_backfill_state(
source_state_handler: &mut SourceStateTableHandler<S>,
upstream_table_id: u32,
split_id: &Option<SplitId>,
cdc_split: &mut Option<SplitImpl>,
last_binlog_offset: Option<CdcOffset>,
) -> StreamExecutorResult<()> {
if let Some(split_id) = split_id.as_ref() {
let mut key = split_id.to_string();
key.push_str(BACKFILL_STATE_KEY_SUFFIX);
source_state_handler
.set(key.into(), JsonbVal::from(Value::Bool(true)))
.await?;

if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut()
&& let Some(s) = split.mysql_split.as_mut() {
let start_offset =
last_binlog_offset.as_ref().map(|cdc_offset| {
let source_offset =
if let CdcOffset::MySql(o) = cdc_offset
{
DebeziumSourceOffset {
file: Some(o.filename.clone()),
pos: Some(o.position),
..Default::default()
}
} else {
DebeziumSourceOffset::default()
};

let mut server = "RW_CDC_".to_string();
server.push_str(
upstream_table_id.to_string().as_str(),
);
DebeziumOffset {
source_partition: hashmap! {
"server".to_string() => server
},
source_offset,
}
});

// persist the last binlog offset into split state
s.inner.start_offset = start_offset.map(|o| {
let value = serde_json::to_value(o).unwrap();
value.to_string()
});
s.inner.snapshot_done = true;
}
if let Some(split_impl) = cdc_split {
source_state_handler
.set(split_impl.id(), split_impl.encode_to_json())
.await?
}
}
Ok(())
}

async fn persist_state(
source_state_handler: &mut SourceStateTableHandler<S>,
new_epoch: EpochPair,
) -> StreamExecutorResult<()> {
source_state_handler.state_store.commit(new_epoch).await
}
}

impl<S: StateStore> Executor for CdcBackfillExecutor<S> {
Expand Down

0 comments on commit 8ea014f

Please sign in to comment.