Skip to content

Commit

Permalink
move transofrm upstream chunk from merge exec to cdc backfill exec
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Sep 21, 2023
1 parent 9d7e16b commit 7d684b9
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ var record = event.value();
var message =
CdcMessage.newBuilder()
.setOffset(offsetStr)
.setTableName(fullTableName)
.setFullTableName(fullTableName)
.setPartition(String.valueOf(sourceId))
.setPayload(new String(payload, StandardCharsets.UTF_8))
.build();
LOG.debug("record => {}", message.getPayload());
LOG.info("record => {}", message.getPayload());
builder.addEvents(message);
committer.markProcessed(event);
}
Expand Down
202 changes: 193 additions & 9 deletions src/stream/src/executor/backfill/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ use anyhow::anyhow;
use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use futures_async_stream::{for_await, try_stream};
use itertools::Itertools;
use maplit::hashmap;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{JsonbVal, ScalarRefImpl};
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, JsonbVal, ScalarRefImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::{
DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset};
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_connector::source::{
SourceColumnDesc, SourceContext, SplitId, SplitImpl, SplitMetaData,
};
use risingwave_storage::StateStore;
use serde_json::Value;

Expand All @@ -43,8 +50,8 @@ use crate::executor::backfill::utils::{
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor,
ExecutorInfo, Message, Mutation, PkIndices, PkIndicesRef, SourceStateTableHandler,
StreamExecutorError, StreamExecutorResult,
ExecutorInfo, Message, MessageStream, Mutation, PkIndices, PkIndicesRef,
SourceStateTableHandler, StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};

Expand All @@ -70,6 +77,8 @@ pub struct CdcBackfillExecutor<S: StateStore> {
/// Stores the backfill done flag
source_state_handler: SourceStateTableHandler<S>,

upstream_is_shared: bool,

metrics: Arc<StreamingMetrics>,

chunk_size: usize,
Expand All @@ -87,6 +96,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
pk_indices: PkIndices,
metrics: Arc<StreamingMetrics>,
source_state_handler: SourceStateTableHandler<S>,
upstream_is_shared: bool,
chunk_size: usize,
) -> Self {
Self {
Expand All @@ -103,6 +113,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
chunk_size,
actor_ctx,
source_state_handler,
upstream_is_shared,
}
}

Expand All @@ -115,7 +126,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let upstream_table_id = self.upstream_table.table_id().table_id;
let upstream_table_reader = UpstreamTableReader::new(self.upstream_table);

let mut upstream = self.upstream.execute().peekable();
let mut upstream = self.upstream.execute();

// Current position of the upstream_table storage primary key.
// `None` means it starts from the beginning.
Expand Down Expand Up @@ -161,6 +172,14 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

let mut upstream = if self.upstream_is_shared {
transform_upstream(upstream, &self.info.schema)
.boxed()
.peekable()
} else {
upstream.peekable()
};

if invalid_backfill {
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
Expand Down Expand Up @@ -544,6 +563,106 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
};
let mut parser = DebeziumParser::new(
props,
get_rw_columns(schema),
Arc::new(SourceContext::default()),
)
.await
.map_err(|err| StreamExecutorError::connector_error(err))?;

pin_mut!(upstream);
#[for_await]
for msg in upstream {
let mut msg = msg?;
if let Message::Chunk(chunk) = &mut msg {
let mut parsed_chunk = parse_debezium_chunk(&mut parser, chunk, schema).await?;
*chunk = parsed_chunk;
}
yield msg;
}
}

async fn parse_debezium_chunk(
parser: &mut DebeziumParser,
chunk: &StreamChunk,
schema: &Schema,
) -> StreamExecutorResult<StreamChunk> {
// here we transform the input chunk in (payload varchar, _rw_offset varchar, _rw_table_name varchar) schema
// to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
// table job with `_rw_offset` in the end
// see `gen_create_table_plan_for_cdc_source` for details
let column_descs = get_rw_columns(schema);
let mut builder = SourceStreamChunkBuilder::with_capacity(column_descs, chunk.capacity());

// The schema of input chunk (payload varchar, _rw_offset varchar, _rw_table_name varchar)
// We should use the debezium parser to parse the first column,
// then chain the parsed row with `_rw_offset` row to get a new row.
let payloads = chunk.data_chunk().project(vec![0].as_slice());
let offset_columns = chunk.data_chunk().project(vec![1].as_slice());

// TODO: preserve the transaction semantics
for payload in payloads.rows() {
let ScalarRefImpl::Utf8(str) = payload.datum_at(0).expect("payload must exist") else {
unreachable!("payload must be utf8 string");
};

parser
.parse_inner(None, Some(str.as_bytes().to_vec()), builder.row_writer())
.await
.unwrap();
}

let parsed_chunk = builder.finish();
let (data_chunk, ops) = parsed_chunk.into_parts();

// concat the rows in the parsed chunk with the _rw_offset column, we should also retain the Op column
let mut new_rows = Vec::with_capacity(chunk.capacity());
for (data_row, offset_row) in data_chunk
.rows_with_holes()
.zip_eq_fast(offset_columns.rows_with_holes())
{
let combined = data_row.chain(offset_row);
new_rows.push(combined);
}

let data_types = schema
.fields
.iter()
.map(|field| field.data_type.clone())
.chain(std::iter::once(DataType::Varchar)) // _rw_offset column
.collect_vec();

Ok(StreamChunk::from_parts(
ops,
DataChunk::from_rows(new_rows.as_slice(), data_types.as_slice()),
))
}

fn get_rw_columns(schema: &Schema) -> Vec<SourceColumnDesc> {
schema
.fields
.iter()
.map(|field| {
let column_desc = ColumnDesc::named(
field.name.clone(),
ColumnId::placeholder(),
field.data_type.clone(),
);
SourceColumnDesc::from(&column_desc)
})
.collect_vec()
}

impl<S: StateStore> Executor for CdcBackfillExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
Expand All @@ -561,3 +680,68 @@ impl<S: StateStore> Executor for CdcBackfillExecutor<S> {
&self.info.identity
}
}

#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};
use risingwave_common::array::{DataChunk, Op, StreamChunk, Vis};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::iter_util::ZipEqFast;

use crate::executor::backfill::cdc_backfill::transform_upstream;
use crate::executor::test_utils::MockSource;
use crate::executor::Executor;

#[tokio::test]
async fn test_transform_upstream_chunk() {
let schema = Schema::new(vec![
Field::unnamed(DataType::Varchar), // debezium json payload
Field::unnamed(DataType::Varchar), // _rw_offset
Field::unnamed(DataType::Varchar), // _rw_table_name
]);
let pk_indices = vec![1];
let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();

let mut datums: Vec<Datum> = vec![
Some(payload.into()),
Some("file: 1.binlog, pos: 100".to_string().into()),
Some("mydb.orders".to_string().into()),
];

println!("datums: {:?}", datums[1]);

let mut builders = schema.create_array_builders(8);
for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
builder.append(datum.clone());
}
let columns = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();

// one row chunk
let chunk =
StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, Vis::Compact(1)));

tx.push_chunk(chunk);
let upstream = Box::new(source).execute();

// schema of the CDC table
let rw_schema = Schema::new(vec![
Field::with_name(DataType::Int64, "O_ORDERKEY"), // orderkey
Field::with_name(DataType::Int64, "O_CUSTKEY"), // custkey
Field::with_name(DataType::Varchar, "O_ORDERSTATUS"), // orderstatus
Field::with_name(DataType::Decimal, "O_TOTALPRICE"), // totalprice
Field::with_name(DataType::Date, "O_ORDERDATE"), // orderdate
]);

let parsed_stream = transform_upstream(upstream, &rw_schema);
pin_mut!(parsed_stream);
// the output chunk must contain the offset column
if let Some(message) = parsed_stream.next().await {
println!("chunk: {:#?}", message.unwrap());
}
}
}
2 changes: 2 additions & 0 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ mod tests {
vec![0, 1, 2],
hash_mapping,
0,
None,
);

let chunk = StreamChunk::from_pretty(
Expand Down Expand Up @@ -1174,6 +1175,7 @@ mod tests {
(0..dimension).collect(),
hash_mapping.clone(),
0,
None,
);

let mut ops = Vec::new();
Expand Down
Loading

0 comments on commit 7d684b9

Please sign in to comment.