Skip to content

Commit

Permalink
fix(source): parse message with empty key and payload (#15678)
Browse files Browse the repository at this point in the history
Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
2 people authored and StrikeW committed Mar 25, 2024
1 parent 266b008 commit c588a1b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 2 deletions.
10 changes: 10 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,20 @@ echo "--- mysql & postgres cdc validate test"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt'

echo "--- cdc share source test"
# cdc share stream test cases
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'

# create a share source and check whether heartbeat message is received
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.create_source_job.slt'
table_id=`psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs`;
table_count=`psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs`;
if [ $table_count -eq 0 ]; then
echo "ERROR: internal table of cdc share source is empty!"
exit 1
fi

echo "--- mysql & postgres load and check"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt'
# wait for cdc loading
Expand Down
15 changes: 15 additions & 0 deletions e2e_test/source/cdc/cdc.create_source_job.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5001'
);

sleep 2s
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

boolean isCdcSourceJob = request.getIsSourceJob();
boolean isBackfillTable = request.getIsBackfillTable();
LOG.info(
"source_id: {}, is_cdc_source_job: {}, is_backfill_table: {}",
request.getSourceId(),
isCdcSourceJob,
isBackfillTable);

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
Expand Down
80 changes: 79 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod upsert_parser;
mod util;

pub use debezium::DEBEZIUM_IGNORE_KEY;
use risingwave_common::buffer::BitmapBuilder;
pub use unified::{AccessError, AccessResult};

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
Expand Down Expand Up @@ -141,6 +142,63 @@ impl SourceStreamChunkBuilder {
}
}

/// A builder for building a [`StreamChunk`] that contains only heartbeat rows.
/// Some connectors may emit heartbeat messages to the downstream, and the cdc source
/// rely on the heartbeat messages to keep the source offset up-to-date with upstream.
pub struct HeartbeatChunkBuilder {
builder: SourceStreamChunkBuilder,
}

impl HeartbeatChunkBuilder {
fn with_capacity(descs: Vec<SourceColumnDesc>, cap: usize) -> Self {
let builders = descs
.iter()
.map(|desc| desc.data_type.create_array_builder(cap))
.collect();

Self {
builder: SourceStreamChunkBuilder {
descs,
builders,
op_builder: Vec::with_capacity(cap),
},
}
}

fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
self.builder.row_writer()
}

/// Consumes the builder and returns a [`StreamChunk`] with all rows marked as invisible
fn finish(self) -> StreamChunk {
// heartbeat chunk should be invisible
let builder = self.builder;
let visibility = BitmapBuilder::zeroed(builder.op_builder.len());
StreamChunk::with_visibility(
builder.op_builder,
builder
.builders
.into_iter()
.map(|builder| builder.finish().into())
.collect(),
visibility.finish(),
)
}

/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
fn take(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.builder.descs);
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
}

fn is_empty(&self) -> bool {
self.builder.is_empty()
}
}

/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
///
Expand Down Expand Up @@ -560,6 +618,10 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
self.parse_one(key, payload, writer)
.map_ok(|_| ParseResult::Rows)
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.insert(|_column| Ok(None));
}
}

#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
Expand Down Expand Up @@ -616,6 +678,7 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = HeartbeatChunkBuilder::with_capacity(columns.clone(), 0);
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
Expand Down Expand Up @@ -657,7 +720,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
tracing::debug!(
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
parser.emit_empty_row(heartbeat_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}));
continue;
}

Expand Down Expand Up @@ -750,6 +821,13 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
}
}

// emit heartbeat for each message batch
// we must emit heartbeat chunk before the data chunk,
// otherwise the source offset could be backward due to the heartbeat
if !heartbeat_builder.is_empty() {
yield heartbeat_builder.take(0);
}

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield_asap = false;
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ pub fn get_split_offset_mapping_from_chunk(
offset_idx: usize,
) -> Option<HashMap<SplitId, String>> {
let mut split_offset_mapping = HashMap::new();
for (_, row) in chunk.rows() {
// All rows (including those visible or invisible) will be used to update the source offset.
for i in 0..chunk.capacity() {
let (_, row, _) = chunk.row_at(i);
let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping.insert(split_id, offset.to_string());
Expand Down

0 comments on commit c588a1b

Please sign in to comment.