Skip to content

Commit

Permalink
refactor(cdc source): simplify CDC transaction and heartbeat messages…
Browse files Browse the repository at this point in the history
… handling (#16505)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 28, 2024
1 parent 1b4f7ed commit 9be74ae
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 118 deletions.
3 changes: 2 additions & 1 deletion src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ impl StreamChunkBuilder {
.iter()
.map(|datatype| datatype.create_array_builder(initial_capacity))
.collect();
let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
Self {
ops,
column_builders,
data_types,
vis_builder: BitmapBuilder::default(),
vis_builder,
max_chunk_size: Some(max_chunk_size),
initial_capacity,
size: 0,
Expand Down
186 changes: 69 additions & 117 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct SourceStreamChunkBuilder {
descs: Vec<SourceColumnDesc>,
builders: Vec<ArrayBuilderImpl>,
op_builder: Vec<Op>,
vis_builder: BitmapBuilder,
}

impl SourceStreamChunkBuilder {
Expand All @@ -102,6 +103,7 @@ impl SourceStreamChunkBuilder {
descs,
builders,
op_builder: Vec::with_capacity(cap),
vis_builder: BitmapBuilder::with_capacity(cap),
}
}

Expand All @@ -110,31 +112,34 @@ impl SourceStreamChunkBuilder {
descs: &self.descs,
builders: &mut self.builders,
op_builder: &mut self.op_builder,
vis_builder: &mut self.vis_builder,
visible: true, // write visible rows by default
row_meta: None,
}
}

/// Consumes the builder and returns a [`StreamChunk`].
pub fn finish(self) -> StreamChunk {
StreamChunk::new(
StreamChunk::with_visibility(
self.op_builder,
self.builders
.into_iter()
.map(|builder| builder.finish().into())
.collect(),
self.vis_builder.finish(),
)
}

/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
pub fn take(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.descs);
let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish`
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
}

pub fn op_num(&self) -> usize {
pub fn len(&self) -> usize {
self.op_builder.len()
}

Expand All @@ -143,63 +148,6 @@ impl SourceStreamChunkBuilder {
}
}

/// A builder for building a [`StreamChunk`] that contains only heartbeat rows.
/// Some connectors may emit heartbeat messages to the downstream, and the cdc source
/// rely on the heartbeat messages to keep the source offset up-to-date with upstream.
pub struct HeartbeatChunkBuilder {
builder: SourceStreamChunkBuilder,
}

impl HeartbeatChunkBuilder {
fn with_capacity(descs: Vec<SourceColumnDesc>, cap: usize) -> Self {
let builders = descs
.iter()
.map(|desc| desc.data_type.create_array_builder(cap))
.collect();

Self {
builder: SourceStreamChunkBuilder {
descs,
builders,
op_builder: Vec::with_capacity(cap),
},
}
}

fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
self.builder.row_writer()
}

/// Consumes the builder and returns a [`StreamChunk`] with all rows marked as invisible
fn finish(self) -> StreamChunk {
// heartbeat chunk should be invisible
let builder = self.builder;
let visibility = BitmapBuilder::zeroed(builder.op_builder.len());
StreamChunk::with_visibility(
builder.op_builder,
builder
.builders
.into_iter()
.map(|builder| builder.finish().into())
.collect(),
visibility.finish(),
)
}

/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
fn take(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.builder.descs);
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
}

fn is_empty(&self) -> bool {
self.builder.is_empty()
}
}

/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
///
Expand All @@ -213,13 +161,33 @@ pub struct SourceStreamChunkRowWriter<'a> {
descs: &'a [SourceColumnDesc],
builders: &'a mut [ArrayBuilderImpl],
op_builder: &'a mut Vec<Op>,
vis_builder: &'a mut BitmapBuilder,

/// Whether the rows written by this writer should be visible in output `StreamChunk`.
visible: bool,

/// An optional meta data of the original message.
///
/// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
row_meta: Option<MessageMeta<'a>>,
}

impl<'a> SourceStreamChunkRowWriter<'a> {
/// Set the meta data of the original message for this row writer.
///
/// This should always be called except for tests.
fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
self.row_meta = Some(row_meta);
self
}

/// Convert the row writer to invisible row writer.
fn invisible(mut self) -> Self {
self.visible = false;
self
}
}

/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
Expand Down Expand Up @@ -306,7 +274,7 @@ impl OpAction for OpActionInsert {

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.op_builder.push(Op::Insert);
writer.append_op(Op::Insert);
}
}

Expand All @@ -332,7 +300,7 @@ impl OpAction for OpActionDelete {

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.op_builder.push(Op::Delete);
writer.append_op(Op::Delete);
}
}

Expand Down Expand Up @@ -360,24 +328,17 @@ impl OpAction for OpActionUpdate {

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.op_builder.push(Op::UpdateDelete);
writer.op_builder.push(Op::UpdateInsert);
writer.append_op(Op::UpdateDelete);
writer.append_op(Op::UpdateInsert);
}
}

impl<'a> SourceStreamChunkRowWriter<'a> {
/// Set the meta data of the original message for this row writer.
///
/// This should always be called except for tests.
fn with_meta(self, row_meta: MessageMeta<'a>) -> Self {
Self {
row_meta: Some(row_meta),
..self
}
impl SourceStreamChunkRowWriter<'_> {
fn append_op(&mut self, op: Op) {
self.op_builder.push(op);
self.vis_builder.append(self.visible);
}
}

impl SourceStreamChunkRowWriter<'_> {
fn do_action<A: OpAction>(
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output>,
Expand Down Expand Up @@ -679,21 +640,21 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = HeartbeatChunkBuilder::with_capacity(columns.clone(), 0);
let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
id: Box<str>,
len: usize,
}
let mut current_transaction = None;
let mut yield_asap = false; // whether we should yield the chunk as soon as possible (txn commits)

#[for_await]
for batch in data_stream {
let batch = batch?;
let batch_len = batch.len();

let mut last_batch_not_yielded = false;
if let Some(Transaction { len, id }) = &mut current_transaction {
// Dirty state. The last batch is not yielded due to uncommitted transaction.
if *len > MAX_ROWS_FOR_TRANSACTION {
Expand All @@ -704,17 +665,13 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
"transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit"
);
*len = 0; // reset `len` while keeping `id`
yield_asap = false;
yield builder.take(batch_len);
} else {
// Normal transaction. After the transaction is committed, we should yield the last
// batch immediately, so set `yield_asap` to true.
yield_asap = true;
last_batch_not_yielded = true
}
} else {
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
assert!(!yield_asap);
let _ = builder.take(batch_len);
}

Expand All @@ -725,11 +682,14 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
parser.emit_empty_row(heartbeat_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}));
// Emit an empty invisible row for the heartbeat message.
parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
},
));
continue;
}

Expand All @@ -743,7 +703,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
.observe(lag_ms as f64);
}

let old_op_num = builder.op_num();
let old_len = builder.len();
match parser
.parse_one_with_txn(
msg.key,
Expand All @@ -759,12 +719,10 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// It's possible that parsing multiple rows in a single message PARTIALLY failed.
// We still have to maintain the row number in this case.
res @ (Ok(ParseResult::Rows) | Err(_)) => {
// The number of rows added to the builder.
let num = builder.op_num() - old_op_num;

// Aggregate the number of rows in the current transaction.
// Aggregate the number of new rows into the current transaction.
if let Some(Transaction { len, .. }) = &mut current_transaction {
*len += num;
let n_new_rows = builder.len() - old_len;
*len += n_new_rows;
}

if let Err(error) = res {
Expand Down Expand Up @@ -793,32 +751,28 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
}
}

Ok(ParseResult::TransactionControl(txn_ctl)) => {
match txn_ctl {
TransactionControl::Begin { id } => {
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(current_id, id, "already in transaction");
}
tracing::debug!("begin upstream transaction: id={}", id);
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(?current_id, id, "transaction id mismatch");
}
tracing::debug!("commit upstream transaction: id={}", id);
current_transaction = None;
Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
TransactionControl::Begin { id } => {
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(current_id, id, "already in transaction");
}
tracing::debug!(id, "begin upstream transaction");
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(?current_id, id, "transaction id mismatch");
}
tracing::debug!(id, "commit upstream transaction");
current_transaction = None;

// Not in a transaction anymore and `yield_asap` is set, so we should yield the
// chunk now.
if current_transaction.is_none() && yield_asap {
yield_asap = false;
yield builder.take(batch_len - (i + 1));
if last_batch_not_yielded {
yield builder.take(batch_len - (i + 1));
last_batch_not_yielded = false;
}
}
}
},
}
}

Expand All @@ -831,8 +785,6 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield_asap = false;

yield builder.take(0);
}
}
Expand Down

0 comments on commit 9be74ae

Please sign in to comment.