diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index 866bb225438e..a13cc3676792 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -74,11 +74,12 @@ impl StreamChunkBuilder { .iter() .map(|datatype| datatype.create_array_builder(initial_capacity)) .collect(); + let vis_builder = BitmapBuilder::with_capacity(initial_capacity); Self { ops, column_builders, data_types, - vis_builder: BitmapBuilder::default(), + vis_builder, max_chunk_size: Some(max_chunk_size), initial_capacity, size: 0, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index ddcbc3558b90..f42e870196e8 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -89,6 +89,7 @@ pub struct SourceStreamChunkBuilder { descs: Vec, builders: Vec, op_builder: Vec, + vis_builder: BitmapBuilder, } impl SourceStreamChunkBuilder { @@ -102,6 +103,7 @@ impl SourceStreamChunkBuilder { descs, builders, op_builder: Vec::with_capacity(cap), + vis_builder: BitmapBuilder::with_capacity(cap), } } @@ -110,18 +112,21 @@ impl SourceStreamChunkBuilder { descs: &self.descs, builders: &mut self.builders, op_builder: &mut self.op_builder, + vis_builder: &mut self.vis_builder, + visible: true, // write visible rows by default row_meta: None, } } /// Consumes the builder and returns a [`StreamChunk`]. pub fn finish(self) -> StreamChunk { - StreamChunk::new( + StreamChunk::with_visibility( self.op_builder, self.builders .into_iter() .map(|builder| builder.finish().into()) .collect(), + self.vis_builder.finish(), ) } @@ -129,12 +134,12 @@ impl SourceStreamChunkBuilder { /// the builders of the next [`StreamChunk`]. #[must_use] pub fn take(&mut self, next_cap: usize) -> StreamChunk { - let descs = std::mem::take(&mut self.descs); + let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish` let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); builder.finish() } - pub fn op_num(&self) -> usize { + pub fn len(&self) -> usize { self.op_builder.len() } @@ -143,63 +148,6 @@ impl SourceStreamChunkBuilder { } } -/// A builder for building a [`StreamChunk`] that contains only heartbeat rows. -/// Some connectors may emit heartbeat messages to the downstream, and the cdc source -/// rely on the heartbeat messages to keep the source offset up-to-date with upstream. -pub struct HeartbeatChunkBuilder { - builder: SourceStreamChunkBuilder, -} - -impl HeartbeatChunkBuilder { - fn with_capacity(descs: Vec, cap: usize) -> Self { - let builders = descs - .iter() - .map(|desc| desc.data_type.create_array_builder(cap)) - .collect(); - - Self { - builder: SourceStreamChunkBuilder { - descs, - builders, - op_builder: Vec::with_capacity(cap), - }, - } - } - - fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> { - self.builder.row_writer() - } - - /// Consumes the builder and returns a [`StreamChunk`] with all rows marked as invisible - fn finish(self) -> StreamChunk { - // heartbeat chunk should be invisible - let builder = self.builder; - let visibility = BitmapBuilder::zeroed(builder.op_builder.len()); - StreamChunk::with_visibility( - builder.op_builder, - builder - .builders - .into_iter() - .map(|builder| builder.finish().into()) - .collect(), - visibility.finish(), - ) - } - - /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for - /// the builders of the next [`StreamChunk`]. - #[must_use] - fn take(&mut self, next_cap: usize) -> StreamChunk { - let descs = std::mem::take(&mut self.builder.descs); - let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); - builder.finish() - } - - fn is_empty(&self) -> bool { - self.builder.is_empty() - } -} - /// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], /// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically. /// @@ -213,6 +161,10 @@ pub struct SourceStreamChunkRowWriter<'a> { descs: &'a [SourceColumnDesc], builders: &'a mut [ArrayBuilderImpl], op_builder: &'a mut Vec, + vis_builder: &'a mut BitmapBuilder, + + /// Whether the rows written by this writer should be visible in output `StreamChunk`. + visible: bool, /// An optional meta data of the original message. /// @@ -220,6 +172,22 @@ pub struct SourceStreamChunkRowWriter<'a> { row_meta: Option>, } +impl<'a> SourceStreamChunkRowWriter<'a> { + /// Set the meta data of the original message for this row writer. + /// + /// This should always be called except for tests. + fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self { + self.row_meta = Some(row_meta); + self + } + + /// Convert the row writer to invisible row writer. + fn invisible(mut self) -> Self { + self.visible = false; + self + } +} + /// The meta data of the original message for a row writer. /// /// Extracted from the `SourceMessage`. @@ -306,7 +274,7 @@ impl OpAction for OpActionInsert { #[inline(always)] fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.op_builder.push(Op::Insert); + writer.append_op(Op::Insert); } } @@ -332,7 +300,7 @@ impl OpAction for OpActionDelete { #[inline(always)] fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.op_builder.push(Op::Delete); + writer.append_op(Op::Delete); } } @@ -360,24 +328,17 @@ impl OpAction for OpActionUpdate { #[inline(always)] fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.op_builder.push(Op::UpdateDelete); - writer.op_builder.push(Op::UpdateInsert); + writer.append_op(Op::UpdateDelete); + writer.append_op(Op::UpdateInsert); } } -impl<'a> SourceStreamChunkRowWriter<'a> { - /// Set the meta data of the original message for this row writer. - /// - /// This should always be called except for tests. - fn with_meta(self, row_meta: MessageMeta<'a>) -> Self { - Self { - row_meta: Some(row_meta), - ..self - } +impl SourceStreamChunkRowWriter<'_> { + fn append_op(&mut self, op: Op) { + self.op_builder.push(op); + self.vis_builder.append(self.visible); } -} -impl SourceStreamChunkRowWriter<'_> { fn do_action( &mut self, mut f: impl FnMut(&SourceColumnDesc) -> AccessResult, @@ -679,7 +640,7 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096; async fn into_chunk_stream(mut parser: P, data_stream: BoxSourceStream) { let columns = parser.columns().to_vec(); - let mut heartbeat_builder = HeartbeatChunkBuilder::with_capacity(columns.clone(), 0); + let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); struct Transaction { @@ -687,13 +648,13 @@ async fn into_chunk_stream(mut parser: P, data_stream len: usize, } let mut current_transaction = None; - let mut yield_asap = false; // whether we should yield the chunk as soon as possible (txn commits) #[for_await] for batch in data_stream { let batch = batch?; let batch_len = batch.len(); + let mut last_batch_not_yielded = false; if let Some(Transaction { len, id }) = &mut current_transaction { // Dirty state. The last batch is not yielded due to uncommitted transaction. if *len > MAX_ROWS_FOR_TRANSACTION { @@ -704,17 +665,13 @@ async fn into_chunk_stream(mut parser: P, data_stream "transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit" ); *len = 0; // reset `len` while keeping `id` - yield_asap = false; yield builder.take(batch_len); } else { - // Normal transaction. After the transaction is committed, we should yield the last - // batch immediately, so set `yield_asap` to true. - yield_asap = true; + last_batch_not_yielded = true } } else { // Clean state. Reserve capacity for the builder. assert!(builder.is_empty()); - assert!(!yield_asap); let _ = builder.take(batch_len); } @@ -725,11 +682,14 @@ async fn into_chunk_stream(mut parser: P, data_stream offset = msg.offset, "got a empty message, could be a heartbeat" ); - parser.emit_empty_row(heartbeat_builder.row_writer().with_meta(MessageMeta { - meta: &msg.meta, - split_id: &msg.split_id, - offset: &msg.offset, - })); + // Emit an empty invisible row for the heartbeat message. + parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta( + MessageMeta { + meta: &msg.meta, + split_id: &msg.split_id, + offset: &msg.offset, + }, + )); continue; } @@ -743,7 +703,7 @@ async fn into_chunk_stream(mut parser: P, data_stream .observe(lag_ms as f64); } - let old_op_num = builder.op_num(); + let old_len = builder.len(); match parser .parse_one_with_txn( msg.key, @@ -759,12 +719,10 @@ async fn into_chunk_stream(mut parser: P, data_stream // It's possible that parsing multiple rows in a single message PARTIALLY failed. // We still have to maintain the row number in this case. res @ (Ok(ParseResult::Rows) | Err(_)) => { - // The number of rows added to the builder. - let num = builder.op_num() - old_op_num; - - // Aggregate the number of rows in the current transaction. + // Aggregate the number of new rows into the current transaction. if let Some(Transaction { len, .. }) = &mut current_transaction { - *len += num; + let n_new_rows = builder.len() - old_len; + *len += n_new_rows; } if let Err(error) = res { @@ -793,32 +751,28 @@ async fn into_chunk_stream(mut parser: P, data_stream } } - Ok(ParseResult::TransactionControl(txn_ctl)) => { - match txn_ctl { - TransactionControl::Begin { id } => { - if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { - tracing::warn!(current_id, id, "already in transaction"); - } - tracing::debug!("begin upstream transaction: id={}", id); - current_transaction = Some(Transaction { id, len: 0 }); - } - TransactionControl::Commit { id } => { - let current_id = current_transaction.as_ref().map(|t| &t.id); - if current_id != Some(&id) { - tracing::warn!(?current_id, id, "transaction id mismatch"); - } - tracing::debug!("commit upstream transaction: id={}", id); - current_transaction = None; + Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl { + TransactionControl::Begin { id } => { + if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { + tracing::warn!(current_id, id, "already in transaction"); } + tracing::debug!(id, "begin upstream transaction"); + current_transaction = Some(Transaction { id, len: 0 }); } + TransactionControl::Commit { id } => { + let current_id = current_transaction.as_ref().map(|t| &t.id); + if current_id != Some(&id) { + tracing::warn!(?current_id, id, "transaction id mismatch"); + } + tracing::debug!(id, "commit upstream transaction"); + current_transaction = None; - // Not in a transaction anymore and `yield_asap` is set, so we should yield the - // chunk now. - if current_transaction.is_none() && yield_asap { - yield_asap = false; - yield builder.take(batch_len - (i + 1)); + if last_batch_not_yielded { + yield builder.take(batch_len - (i + 1)); + last_batch_not_yielded = false; + } } - } + }, } } @@ -831,8 +785,6 @@ async fn into_chunk_stream(mut parser: P, data_stream // If we are not in a transaction, we should yield the chunk now. if current_transaction.is_none() { - yield_asap = false; - yield builder.take(0); } }