Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): parse message with empty key and payload #15678

Merged
merged 14 commits into from
Mar 25, 2024
10 changes: 10 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,20 @@ echo "--- mysql & postgres cdc validate test"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt'

echo "--- cdc share source test"
# cdc share stream test cases
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'

# create a share source and check whether heartbeat message is received
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.create_source_job.slt'
table_id=`psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs`;
table_count=`psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs`;
if [ $table_count -eq 0 ]; then
echo "ERROR: internal table of cdc share source is empty!"
exit 1
fi

echo "--- mysql & postgres load and check"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt'
# wait for cdc loading
Expand Down
15 changes: 15 additions & 0 deletions e2e_test/source/cdc/cdc.create_source_job.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5001'
);

sleep 2s
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

boolean isCdcSourceJob = request.getIsSourceJob();
boolean isBackfillTable = request.getIsBackfillTable();
LOG.info(
"source_id: {}, isCdcSourceJob: {}, isBackfillTable: {}",
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
request.getSourceId(),
isCdcSourceJob,
isBackfillTable);

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
Expand Down
42 changes: 41 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod upsert_parser;
mod util;

pub use debezium::DEBEZIUM_IGNORE_KEY;
use risingwave_common::buffer::BitmapBuilder;
pub use unified::{AccessError, AccessResult};

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
Expand Down Expand Up @@ -132,6 +133,25 @@ impl SourceStreamChunkBuilder {
builder.finish()
}

fn finish_heartbeat(self) -> StreamChunk {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
// heartbeat chunk should be invisible
let visibility = BitmapBuilder::zeroed(self.op_builder.len());
StreamChunk::with_visibility(
self.op_builder,
self.builders
.into_iter()
.map(|builder| builder.finish().into())
.collect(),
visibility.finish(),
)
}

fn take_heartbeat(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.descs);
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish_heartbeat()
}

pub fn op_num(&self) -> usize {
self.op_builder.len()
}
Expand Down Expand Up @@ -560,6 +580,10 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
self.parse_one(key, payload, writer)
.map_ok(|_| ParseResult::Rows)
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.insert(|_column| Ok(None));
}
}

#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
Expand Down Expand Up @@ -616,6 +640,7 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
Expand Down Expand Up @@ -657,7 +682,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
tracing::debug!(
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
parser.emit_empty_row(heartbeat_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}));
continue;
}

Expand Down Expand Up @@ -750,6 +783,13 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
}
}

// emit heartbeat for each message batch
// we must emit heartbeat chunk before the data chunk,
// otherwise the source offset could be backward due to the heartbeat
if !heartbeat_builder.is_empty() {
yield heartbeat_builder.take_heartbeat(0);
}

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield_asap = false;
Expand Down
20 changes: 16 additions & 4 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,22 @@ pub fn get_split_offset_mapping_from_chunk(
offset_idx: usize,
) -> Option<HashMap<SplitId, String>> {
let mut split_offset_mapping = HashMap::new();
for (_, row) in chunk.rows() {
let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping.insert(split_id, offset.to_string());
if chunk.cardinality() == 0 {
// assumes the chunk is a heartbeat chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the current approach is less type-safe than the original ChunkWithState approach. Essentially we are mixing another kind of message into the output type (StreamChunk) of into_chunk_stream, so the branching here looks fragile to me. cc @xxchan What's your idea on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't fully checked it yet. It seems the old approach is equally unsafe as current approach? Both rely on invisible rows 🤡

Copy link
Contributor Author

@StrikeW StrikeW Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the best solution I can come up after the refactor of #14524, that is emit a invisible chunk to pass the offset to source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have resolved the above comments, any more concern about the "invisible chunk" approach? cc @BugenZhao @xxchan

Copy link
Member

@fuyufjh fuyufjh Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is StreamChunkWithState or the heartbeat chunk here only passed from connector to SourceExecutor? If so, is it possible to pass a Either<StreamChunk, SourceOffset> instead?

Btw, substituting StreamChunk with offset column for StreamChunkWithState (#14524) seems to be a orthogonol problem to me, because neither of them can pass source offset when no input i.e. heartbeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, is it possible to pass a Either<StreamChunk, SourceOffset> instead?

I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk. So do you mean we need to add a new type (SourceOffset) to the Message?

pub enum Message {
Chunk(StreamChunk),
Barrier(Barrier),
Watermark(Watermark),
}

That way we also need to refactor other part to support adding a new message type.

Copy link
Member

@BugenZhao BugenZhao Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What looks weird to me is that we have a specific path for the chunk with cardinality of zero. What about defining the convention like this:

  • All visible rows will be forwarded to the downstream.
  • All rows (including those visible or invisible) will be used to update the offset.

The behavior will be similar to what we implement in this PR, but does not require the ad-hoc path and is more clearly defined. We can iterate over all rows ("with holes") of the chunk to apply the offset updates and yield data optionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All rows (including those visible or invisible) will be used to update the offset.

+1. I refactored the code. Is it looks good to you? @tabVersion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk. So do you mean we need to add a new type (SourceOffset) to the Message?

What looks weird to me is that we have a specific path for the chunk with cardinality of zero.

You guys are right. That will be 2 code paths, which looks even worse.

for i in 0..chunk.capacity() {
let (_, row, _) = chunk.row_at(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use .rows() iteration here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rows() cannot iterate invisible row, the rows in heartbeat chunk is marked as invisible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the logic either.
In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.

Copy link
Contributor Author

@StrikeW StrikeW Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the logic either. In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.

  1. The row is not fully empty, those meta column (split_idx, offset_idx) has not-null value
  2. there is not O(1) method to check whether a row is fully empty, you must check all data column to know whether it is empty

let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping
.entry(split_id)
.or_insert(offset.to_string());
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
for (_, row) in chunk.rows() {
let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping.insert(split_id, offset.to_string());
}
}
Some(split_offset_mapping)
}
Expand Down
Loading