diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index a97bab3d0ba71..9ffab0c421b75 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -107,11 +107,11 @@ var record = event.value(); var message = CdcMessage.newBuilder() .setOffset(offsetStr) - .setTableName(fullTableName) + .setFullTableName(fullTableName) .setPartition(String.valueOf(sourceId)) .setPayload(new String(payload, StandardCharsets.UTF_8)) .build(); - LOG.debug("record => {}", message.getPayload()); + LOG.info("record => {}", message.getPayload()); builder.addEvents(message); committer.markProcessed(event); } diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index 2f522ae8eeb0c..c1c183affe6d4 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -20,16 +20,23 @@ use anyhow::anyhow; use either::Either; use futures::stream::select_with_strategy; use futures::{pin_mut, stream, StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures_async_stream::{for_await, try_stream}; use itertools::Itertools; use maplit::hashmap; -use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::Schema; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{JsonbVal, ScalarRefImpl}; +use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; +use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::types::{DataType, JsonbVal, ScalarRefImpl}; use risingwave_common::util::epoch::EpochPair; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_connector::parser::{ + DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties, + SourceStreamChunkBuilder, SpecificParserConfig, +}; use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset}; -use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; +use risingwave_connector::source::{ + SourceColumnDesc, SourceContext, SplitId, SplitImpl, SplitMetaData, +}; use risingwave_storage::StateStore; use serde_json::Value; @@ -43,8 +50,8 @@ use crate::executor::backfill::utils::{ use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, - ExecutorInfo, Message, Mutation, PkIndices, PkIndicesRef, SourceStateTableHandler, - StreamExecutorError, StreamExecutorResult, + ExecutorInfo, Message, MessageStream, Mutation, PkIndices, PkIndicesRef, + SourceStateTableHandler, StreamExecutorError, StreamExecutorResult, }; use crate::task::{ActorId, CreateMviewProgress}; @@ -70,6 +77,8 @@ pub struct CdcBackfillExecutor { /// Stores the backfill done flag source_state_handler: SourceStateTableHandler, + upstream_is_shared: bool, + metrics: Arc, chunk_size: usize, @@ -87,6 +96,7 @@ impl CdcBackfillExecutor { pk_indices: PkIndices, metrics: Arc, source_state_handler: SourceStateTableHandler, + upstream_is_shared: bool, chunk_size: usize, ) -> Self { Self { @@ -103,6 +113,7 @@ impl CdcBackfillExecutor { chunk_size, actor_ctx, source_state_handler, + upstream_is_shared, } } @@ -115,7 +126,7 @@ impl CdcBackfillExecutor { let upstream_table_id = self.upstream_table.table_id().table_id; let upstream_table_reader = UpstreamTableReader::new(self.upstream_table); - let mut upstream = self.upstream.execute().peekable(); + let mut upstream = self.upstream.execute(); // Current position of the upstream_table storage primary key. // `None` means it starts from the beginning. @@ -161,6 +172,14 @@ impl CdcBackfillExecutor { } } + let mut upstream = if self.upstream_is_shared { + transform_upstream(upstream, &self.info.schema) + .boxed() + .peekable() + } else { + upstream.peekable() + }; + if invalid_backfill { // The first barrier message should be propagated. yield Message::Barrier(first_barrier); @@ -544,6 +563,106 @@ impl CdcBackfillExecutor { } } +#[try_stream(ok = Message, error = StreamExecutorError)] +async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { + let props = SpecificParserConfig { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties { + use_schema_registry: false, + }), + protocol_config: ProtocolProperties::Debezium, + }; + let mut parser = DebeziumParser::new( + props, + get_rw_columns(schema), + Arc::new(SourceContext::default()), + ) + .await + .map_err(|err| StreamExecutorError::connector_error(err))?; + + pin_mut!(upstream); + #[for_await] + for msg in upstream { + let mut msg = msg?; + if let Message::Chunk(chunk) = &mut msg { + let mut parsed_chunk = parse_debezium_chunk(&mut parser, chunk, schema).await?; + *chunk = parsed_chunk; + } + yield msg; + } +} + +async fn parse_debezium_chunk( + parser: &mut DebeziumParser, + chunk: &StreamChunk, + schema: &Schema, +) -> StreamExecutorResult { + // here we transform the input chunk in (payload varchar, _rw_offset varchar, _rw_table_name varchar) schema + // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the + // table job with `_rw_offset` in the end + // see `gen_create_table_plan_for_cdc_source` for details + let column_descs = get_rw_columns(schema); + let mut builder = SourceStreamChunkBuilder::with_capacity(column_descs, chunk.capacity()); + + // The schema of input chunk (payload varchar, _rw_offset varchar, _rw_table_name varchar) + // We should use the debezium parser to parse the first column, + // then chain the parsed row with `_rw_offset` row to get a new row. + let payloads = chunk.data_chunk().project(vec![0].as_slice()); + let offset_columns = chunk.data_chunk().project(vec![1].as_slice()); + + // TODO: preserve the transaction semantics + for payload in payloads.rows() { + let ScalarRefImpl::Utf8(str) = payload.datum_at(0).expect("payload must exist") else { + unreachable!("payload must be utf8 string"); + }; + + parser + .parse_inner(None, Some(str.as_bytes().to_vec()), builder.row_writer()) + .await + .unwrap(); + } + + let parsed_chunk = builder.finish(); + let (data_chunk, ops) = parsed_chunk.into_parts(); + + // concat the rows in the parsed chunk with the _rw_offset column, we should also retain the Op column + let mut new_rows = Vec::with_capacity(chunk.capacity()); + for (data_row, offset_row) in data_chunk + .rows_with_holes() + .zip_eq_fast(offset_columns.rows_with_holes()) + { + let combined = data_row.chain(offset_row); + new_rows.push(combined); + } + + let data_types = schema + .fields + .iter() + .map(|field| field.data_type.clone()) + .chain(std::iter::once(DataType::Varchar)) // _rw_offset column + .collect_vec(); + + Ok(StreamChunk::from_parts( + ops, + DataChunk::from_rows(new_rows.as_slice(), data_types.as_slice()), + )) +} + +fn get_rw_columns(schema: &Schema) -> Vec { + schema + .fields + .iter() + .map(|field| { + let column_desc = ColumnDesc::named( + field.name.clone(), + ColumnId::placeholder(), + field.data_type.clone(), + ); + SourceColumnDesc::from(&column_desc) + }) + .collect_vec() +} + impl Executor for CdcBackfillExecutor { fn execute(self: Box) -> BoxedMessageStream { self.execute_inner().boxed() @@ -561,3 +680,68 @@ impl Executor for CdcBackfillExecutor { &self.info.identity } } + +#[cfg(test)] +mod tests { + use futures::{pin_mut, StreamExt}; + use risingwave_common::array::{DataChunk, Op, StreamChunk, Vis}; + use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::types::{DataType, Datum}; + use risingwave_common::util::iter_util::ZipEqFast; + + use crate::executor::backfill::cdc_backfill::transform_upstream; + use crate::executor::test_utils::MockSource; + use crate::executor::Executor; + + #[tokio::test] + async fn test_transform_upstream_chunk() { + let schema = Schema::new(vec![ + Field::unnamed(DataType::Varchar), // debezium json payload + Field::unnamed(DataType::Varchar), // _rw_offset + Field::unnamed(DataType::Varchar), // _rw_table_name + ]); + let pk_indices = vec![1]; + let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone()); + let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string(); + + let mut datums: Vec = vec![ + Some(payload.into()), + Some("file: 1.binlog, pos: 100".to_string().into()), + Some("mydb.orders".to_string().into()), + ]; + + println!("datums: {:?}", datums[1]); + + let mut builders = schema.create_array_builders(8); + for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) { + builder.append(datum.clone()); + } + let columns = builders + .into_iter() + .map(|builder| builder.finish().into()) + .collect(); + + // one row chunk + let chunk = + StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, Vis::Compact(1))); + + tx.push_chunk(chunk); + let upstream = Box::new(source).execute(); + + // schema of the CDC table + let rw_schema = Schema::new(vec![ + Field::with_name(DataType::Int64, "O_ORDERKEY"), // orderkey + Field::with_name(DataType::Int64, "O_CUSTKEY"), // custkey + Field::with_name(DataType::Varchar, "O_ORDERSTATUS"), // orderstatus + Field::with_name(DataType::Decimal, "O_TOTALPRICE"), // totalprice + Field::with_name(DataType::Date, "O_ORDERDATE"), // orderdate + ]); + + let parsed_stream = transform_upstream(upstream, &rw_schema); + pin_mut!(parsed_stream); + // the output chunk must contain the offset column + if let Some(message) = parsed_stream.next().await { + println!("chunk: {:#?}", message.unwrap()); + } + } +} diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index b1500f300c832..04c4311360fd5 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -939,6 +939,7 @@ mod tests { vec![0, 1, 2], hash_mapping, 0, + None, ); let chunk = StreamChunk::from_pretty( @@ -1174,6 +1175,7 @@ mod tests { (0..dimension).collect(), hash_mapping.clone(), 0, + None, ); let mut ops = Vec::new(); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index e03938d392285..dad7bf4ea3abb 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::LazyCell; use std::collections::BTreeMap; use std::pin::Pin; use std::task::{Context, Poll}; @@ -21,16 +20,7 @@ use anyhow::anyhow; use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; use futures::{pin_mut, Stream, StreamExt}; use futures_async_stream::try_stream; -use risingwave_common::array::DataChunk; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; -use risingwave_common::row::{Row, RowExt}; -use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::parser::{ - DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties, - SourceStreamChunkBuilder, SpecificParserConfig, -}; -use risingwave_connector::sink::encoder::{JsonEncoder, RowEncoder, SerTo, TimestampHandlingMode}; -use risingwave_connector::source::{SourceColumnDesc, SourceContext}; +use risingwave_common::catalog::Schema; use tokio::time::Instant; use super::error::StreamExecutorError; @@ -59,8 +49,6 @@ pub struct MergeExecutor { /// Logical Operator Info info: ExecutorInfo, - cdc_upstream: bool, - /// Shared context of the stream manager. context: Arc, @@ -72,7 +60,6 @@ impl MergeExecutor { #[allow(clippy::too_many_arguments)] pub fn new( schema: Schema, - cdc_upstream: bool, pk_indices: PkIndices, ctx: ActorContextRef, fragment_id: FragmentId, @@ -93,7 +80,6 @@ impl MergeExecutor { pk_indices, identity: format!("MergeExecutor {:X}", executor_id), }, - cdc_upstream, context, metrics, } @@ -106,7 +92,6 @@ impl MergeExecutor { Self::new( schema, - false, vec![], ActorContext::create(114), 514, @@ -123,21 +108,6 @@ impl MergeExecutor { ) } - fn get_rw_columns(schema: &Schema) -> Vec { - schema - .fields - .iter() - .map(|field| { - let column_desc = ColumnDesc::named( - field.name.clone(), - ColumnId::placeholder(), - field.data_type.clone(), - ); - SourceColumnDesc::from(&column_desc) - }) - .collect_vec() - } - #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self: Box) { // Futures of all active upstreams. @@ -146,27 +116,6 @@ impl MergeExecutor { let actor_id_str = actor_id.to_string(); let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string(); - let mut parser = if self.cdc_upstream { - let props = SpecificParserConfig { - key_encoding_config: None, - encoding_config: EncodingProperties::Json(JsonProperties { - use_schema_registry: false, - }), - protocol_config: ProtocolProperties::Debezium, - }; - Some( - DebeziumParser::new( - props, - Self::get_rw_columns(&self.info.schema), - Arc::new(SourceContext::default()), - ) - .await - .map_err(|err| StreamExecutorError::connector_error(err))?, - ) - } else { - None - }; - // Channels that're blocked by the barrier to align. let mut start_time = Instant::now(); pin_mut!(select_all); @@ -186,68 +135,6 @@ impl MergeExecutor { .actor_in_record_cnt .with_label_values(&[&actor_id_str]) .inc_by(chunk.cardinality() as _); - - if self.cdc_upstream && let Some(parser) = parser.as_mut() { - let props = SpecificParserConfig { - key_encoding_config: None, - encoding_config: EncodingProperties::Json(JsonProperties { - use_schema_registry: false, - }), - protocol_config: ProtocolProperties::Debezium, - }; - - // here we transform the input chunk in [jsonb, varchar] schema to chunk with downstream table schema - // `info.schema` of MergeNode contains the schema of the table job with `_rw_offset` in the end - // see `gen_create_table_plan_for_cdc_source` for details - let column_descs = Self::get_rw_columns(&self.info.schema); - let mut builder = - SourceStreamChunkBuilder::with_capacity(column_descs, chunk.capacity()); - let encoder = - JsonEncoder::new(&self.info.schema, None, TimestampHandlingMode::Milli); - - // The schema of input chunk [jsonb, _rw_offset] - // We should use the debezium parser to parse the first column, - // then chain the parsed row with _rw_offset row to a new rows. - let data_columns = chunk.data_chunk().project(vec![0].as_slice()); - let offset_columns = chunk.data_chunk().project(vec![1].as_slice()); - for row in data_columns.rows() { - let json_record = encoder.encode(row)?; - let payload: Vec = json_record.ser_to()?; - // TODO: preserve the transaction semantics - parser - .parse_inner(None, Some(payload), builder.row_writer()) - .await - .unwrap(); - } - - let parsed_chunk = builder.finish(); - let (data_chunk, ops) = parsed_chunk.into_parts(); - - // concat the rows in the parsed chunk with the _rw_offset column, we should also retain the Op column - let mut new_rows = Vec::with_capacity(chunk.capacity()); - for (data_row, offset_row) in data_chunk - .rows_with_holes() - .zip_eq_fast(offset_columns.rows_with_holes()) - { - let combined = data_row.chain(offset_row); - new_rows.push(combined); - } - - let data_types = self - .info - .schema - .fields - .iter() - .map(|field| field.data_type.clone()) - .collect_vec(); - let mut transformed_chunk = StreamChunk::from_parts( - ops, - DataChunk::from_rows(new_rows.as_slice(), data_types.as_slice()), - ); - - // TODO: is it safe to swap the chunk? - std::mem::swap(chunk, &mut transformed_chunk); - } } Message::Barrier(barrier) => { tracing::trace!( diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index e682e114fc242..32c4506cd7cc4 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -84,7 +84,19 @@ impl ExecutorBuilder for ChainExecutorBuilder { .boxed() } ChainType::CdcBackfill => { - todo!("CdcBackfill") + todo!("CdcBackfill is not supported yet") + // let cdc_backfill = CdcBackfillExecutor::new( + // params.actor_context.clone(), + // external_table, + // Box::new(source_exec), + // (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ + // None, + // schema.clone(), + // pk_indices, + // params.executor_stats, + // source_state_handler, + // source_ctrl_opts.chunk_size, + // ); } ChainType::Backfill => { let table_desc: &StorageTableDesc = node.get_table_desc()?; diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index dd81a6654c9ec..c465c9705bd53 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -79,7 +79,6 @@ impl ExecutorBuilder for MergeExecutorBuilder { } else { Ok(MergeExecutor::new( schema, - cdc_upstream, params.pk_indices, actor_context, params.fragment_id, diff --git a/src/stream/src/from_proto/source.rs b/src/stream/src/from_proto/source.rs index 77bbcc53e69c5..985ae4e0feddd 100644 --- a/src/stream/src/from_proto/source.rs +++ b/src/stream/src/from_proto/source.rs @@ -184,6 +184,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { pk_indices, params.executor_stats, source_state_handler, + false, source_ctrl_opts.chunk_size ); cdc_backfill.boxed()