From f42d3db26f5655c94c06ad61dacfb4b9be5309d0 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 24 Sep 2023 13:51:05 +0800 Subject: [PATCH] WIP: test e2e process without persist state of backfill --- .../source/SourceValidateHandler.java | 3 +- .../source/common/DbzConnectorConfig.java | 16 ++-- .../source/core/DbzCdcEventConsumer.java | 7 +- proto/stream_plan.proto | 1 - src/connector/src/source/cdc/mod.rs | 2 +- src/connector/src/source/external.rs | 25 ++++-- src/frontend/src/handler/create_source.rs | 7 +- src/frontend/src/handler/create_table.rs | 8 +- src/frontend/src/handler/mod.rs | 2 +- .../optimizer/plan_node/stream_table_scan.rs | 11 --- src/meta/src/stream/stream_graph/fragment.rs | 2 + src/sqlparser/src/ast/mod.rs | 4 +- src/sqlparser/src/parser.rs | 2 +- .../src/executor/backfill/cdc_backfill.rs | 83 +++++++++-------- src/stream/src/from_proto/chain.rs | 88 +++++++++++++++---- 15 files changed, 168 insertions(+), 93 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 38fd23ae1c1aa..27f2b7c5103a5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -79,13 +79,13 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re ensurePropNotNull(props, DbzConnectorConfig.HOST); ensurePropNotNull(props, DbzConnectorConfig.PORT); ensurePropNotNull(props, DbzConnectorConfig.DB_NAME); - ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotNull(props, DbzConnectorConfig.USER); ensurePropNotNull(props, DbzConnectorConfig.PASSWORD); TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { case POSTGRES: + ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); ensurePropNotNull(props, DbzConnectorConfig.PG_SLOT_NAME); ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_NAME); @@ -96,6 +96,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re break; case CITUS: + ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); try (var coordinatorValidator = new PostgresValidator(props, tableSchema)) { coordinatorValidator.validateDistributedTable(); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 598c7334480c0..cc5a918fee00b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -130,12 +130,8 @@ public DbzConnectorConfig( mysqlProps.setProperty("snapshot.locking.mode", "none"); } - // disable table filtering for the shared cdc stream - if (!userProps.containsKey(CDC_SHARING_MODE)) { - dbzProps.remove("table.include.list"); - } - dbzProps.putAll(mysqlProps); + } else if (source == SourceTypeE.POSTGRES || source == SourceTypeE.CITUS) { var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); @@ -169,11 +165,21 @@ public DbzConnectorConfig( dbzProps.putIfAbsent(entry.getKey(), entry.getValue()); } + if (userProps.containsKey(CDC_SHARING_MODE)) { + adjustConfigForSharedCdcStream(dbzProps); + } + this.sourceId = sourceId; this.sourceType = source; this.resolvedDbzProps = dbzProps; } + private void adjustConfigForSharedCdcStream(Properties dbzProps) { + // disable table filtering for the shared cdc stream + LOG.info("Disable table filtering for the shared cdc stream"); + dbzProps.remove("table.include.list"); + } + private Properties initiateDbConfig(String fileName, StringSubstitutor substitutor) { var dbProps = new Properties(); try (var input = getClass().getClassLoader().getResourceAsStream(fileName)) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index 9ffab0c421b75..093c34afb4ceb 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -55,7 +55,7 @@ public class DbzCdcEventConsumer // only serialize the value part configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); // include record schema - configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); jsonConverter.configure(configs); this.converter = jsonConverter; } @@ -103,7 +103,7 @@ var record = event.value(); // - PG: serverName.schemaName.tableName // - MySQL: serverName.databaseName.tableName // We can extract the full table name from the topic - var fullTableName = record.topic().substring(record.topic().indexOf('.')); + var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1); var message = CdcMessage.newBuilder() .setOffset(offsetStr) @@ -111,7 +111,8 @@ var record = event.value(); .setPartition(String.valueOf(sourceId)) .setPayload(new String(payload, StandardCharsets.UTF_8)) .build(); - LOG.info("record => {}", message.getPayload()); + LOG.info("fullTableName => {}", fullTableName); + LOG.debug("record => {}", message.getPayload()); builder.addEvents(message); committer.markProcessed(event); } diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 8cbd525e0909f..8c5d60ae27de7 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -409,7 +409,6 @@ message MergeNode { repeated plan_common.Field fields = 4; // Whether the upstream is a CDC stream, used in chaining a Table job with a CDC source job. bool cdc_upstream = 5; - optional map connector_properties = 6; } // passed from frontend to meta, used by fragmenter to generate `MergeNode` diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 937e6e63b8f4c..2167b3d277f49 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -30,7 +30,7 @@ use crate::source::{ConnectorProperties, SourceProperties, SplitImpl}; pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode"; -pub const CDC_LATEST_OFFSET_MODE: &str = "rw_cdc_backfill"; +pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill"; pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode"; pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index baf2b2bbce685..7e990d14d02a4 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -81,12 +81,14 @@ impl ExternalTableType { #[derive(Debug, Clone)] pub struct SchemaTableName { + // namespace of the table, e.g. database in mysql, schema in postgres pub schema_name: String, pub table_name: String, } pub const TABLE_NAME_KEY: &str = "table.name"; pub const SCHEMA_NAME_KEY: &str = "schema.name"; +pub const DATABASE_NAME_KEY: &str = "database.name"; impl SchemaTableName { pub fn new(schema_name: String, table_name: String) -> Self { @@ -97,15 +99,25 @@ impl SchemaTableName { } pub fn from_properties(properties: &HashMap) -> Self { + let table_type = ExternalTableType::from_properties(properties); let table_name = properties .get(TABLE_NAME_KEY) - .map(|c| c.to_ascii_lowercase()) + .map(|c| c.clone()) .unwrap_or_default(); - let schema_name = properties - .get(SCHEMA_NAME_KEY) - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); + let schema_name = match table_type { + ExternalTableType::MySql => properties + .get(DATABASE_NAME_KEY) + .map(|c| c.clone()) + .unwrap_or_default(), + ExternalTableType::Postgres | ExternalTableType::Citus => properties + .get(SCHEMA_NAME_KEY) + .map(|c| c.clone()) + .unwrap_or_default(), + _ => { + unreachable!("invalid external table type: {:?}", table_type); + } + }; Self { schema_name, @@ -245,7 +257,8 @@ pub struct ExternalTableConfig { impl ExternalTableReader for MySqlExternalTableReader { fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String { - format!("`{}`", table_name.table_name) + // schema name is the database name in mysql + format!("`{}`.`{}`", table_name.schema_name, table_name.table_name) } async fn current_cdc_offset(&self) -> ConnectorResult { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0d9a6b3ae8cd2..67f72a1f0b84c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -32,7 +32,8 @@ use risingwave_connector::parser::{ ProtobufParserConfig, SpecificParserConfig, }; use risingwave_connector::source::cdc::{ - CDC_SHARING_MODE_KEY, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, + CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CITUS_CDC_CONNECTOR, + MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::external::TABLE_NAME_KEY; @@ -1142,8 +1143,8 @@ pub async fn handle_create_source( .await?; if create_cdc_source_job { - // TODO: set connector to latest offset mode - // with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_LATEST_OFFSET_MODE.into()); + // set connector to backfill mode + with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into()); // ignore table name option, default to capture all tables with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into()); } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 9015cf2f1fcdd..bb9129ba16918 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -25,7 +25,7 @@ use risingwave_common::catalog::{ use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_connector::source::cdc::{CDC_LATEST_OFFSET_MODE, CDC_SNAPSHOT_MODE_KEY}; +use risingwave_connector::source::cdc::{CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY}; use risingwave_connector::source::external::ExternalTableType; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc}; @@ -503,7 +503,7 @@ pub(crate) async fn gen_create_table_plan_with_source( columns.push(offset_column); // debezium connector will only consume changelogs from latest offset on this mode - properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_LATEST_OFFSET_MODE.into()); + properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into()); let pk_column_indices = { let mut id_to_idx = HashMap::new(); @@ -530,7 +530,7 @@ pub(crate) async fn gen_create_table_plan_with_source( stream_key: pk_column_indices, append_only, retention_seconds: TABLE_OPTION_DUMMY_RETENTION_SECOND, - value_indices: (0..columns.len()).collect_vec(), + value_indices: (0..columns.len()).collect_vec(), /* FIXME: maybe we can remove `_rw_offset` from TableDesc */ read_prefix_len_hint: 0, watermark_columns: Default::default(), versioned: false, @@ -808,6 +808,8 @@ fn gen_create_table_plan_for_cdc_source( table_name: Some(external_table_name), }; + tracing::debug!(target: "cdc_table", ?external_table_desc, "create table"); + // TODO: we'll parse the information on Meta to construct the fragment plan let scan_external: PlanRef = LogicalScan::create( source_name, diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c5177fcec971f..794750d4449f0 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -266,7 +266,7 @@ pub async fn handle( append_only, notice, cdc_source_name, - external_table_name.map(|name| name.real_value()), + external_table_name, ) .await } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index fd06574ae7259..425ae19cf56db 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -43,7 +43,6 @@ pub struct StreamTableScan { logical: generic::Scan, batch_plan_id: PlanNodeId, chain_type: ChainType, - connector_properties: Option>, } impl StreamTableScan { @@ -51,15 +50,6 @@ impl StreamTableScan { Self::new_with_chain_type(logical, ChainType::Backfill) } - pub fn new_for_cdc_scan( - logical: generic::Scan, - connector_properties: HashMap, - ) -> Self { - let mut plan = Self::new_with_chain_type(logical, ChainType::CdcBackfill); - plan.connector_properties = Some(connector_properties); - plan - } - pub fn new_with_chain_type(logical: generic::Scan, chain_type: ChainType) -> Self { let batch_plan_id = logical.ctx.next_plan_node_id(); @@ -281,7 +271,6 @@ impl StreamTableScan { PbStreamNode { node_body: Some(PbNodeBody::Merge(MergeNode { cdc_upstream, - connector_properties: self.connector_properties.clone(), ..Default::default() })), identity: "Upstream".into(), diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 5b241b06ad802..ff15ef0d8b739 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -578,6 +578,8 @@ impl CompleteStreamFragmentGraph { .expect("table name column not found") }; + tracing::debug!(target: "cdc_table", ?full_table_name, ?source_fragment_id, ?rw_table_name_index, ?output_columns, "chain with upstream source fragment"); + let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 0ecf58d2b99ac..b966c2ddf6b7a 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1087,7 +1087,7 @@ pub enum Statement { cdc_source_name: Option, - external_table_name: Option, + external_table_name: Option, }, /// CREATE INDEX CreateIndex { @@ -1556,7 +1556,7 @@ impl fmt::Display for Statement { write!(f, " FROM {}", source)?; } if let Some(table_name) = external_table_name { - write!(f, " TABLE {}", table_name.real_value())?; + write!(f, " TABLE {}", table_name)?; } Ok(()) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 4664f6fabbb24..f70e71428616e 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2508,7 +2508,7 @@ impl Parser { }; let external_table = if self.parse_keyword(Keyword::TABLE) { - Some(self.parse_identifier()?) + Some(self.parse_literal_string()?) } else { None }; diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index c1c183affe6d4..64c7c1aeceef8 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -195,7 +195,7 @@ impl CdcBackfillExecutor { tracing::debug!("start cdc backfill: actor {:?}", self.actor_ctx.id); - self.source_state_handler.init_epoch(first_barrier.epoch); + // self.source_state_handler.init_epoch(first_barrier.epoch); // start from the beginning // TODO(siyuan): restore backfill offset from state store @@ -208,13 +208,14 @@ impl CdcBackfillExecutor { if let Some(split_id) = split_id.as_ref() { let mut key = split_id.to_string(); key.push_str(BACKFILL_STATE_KEY_SUFFIX); - match self.source_state_handler.get(key.into()).await? { - Some(row) => match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => jsonb_ref.as_bool()?, - _ => unreachable!("invalid backfill persistent state"), - }, - None => false, - } + // match self.source_state_handler.get(key.into()).await? { + // Some(row) => match row.datum_at(1) { + // Some(ScalarRefImpl::Jsonb(jsonb_ref)) => jsonb_ref.as_bool()?, + // _ => unreachable!("invalid backfill persistent state"), + // }, + // None => false, + // } + false } else { false } @@ -356,11 +357,11 @@ impl CdcBackfillExecutor { } // seal current epoch even though there is no data - Self::persist_state( - &mut self.source_state_handler, - barrier.epoch, - ) - .await?; + // Self::persist_state( + // &mut self.source_state_handler, + // barrier.epoch, + // ) + // .await?; yield Message::Barrier(barrier); // Break the for loop and start a new snapshot read stream. @@ -426,14 +427,14 @@ impl CdcBackfillExecutor { )); } - Self::write_backfill_state( - &mut self.source_state_handler, - upstream_table_id, - &split_id, - &mut cdc_split, - last_binlog_offset.clone(), - ) - .await?; + // Self::write_backfill_state( + // &mut self.source_state_handler, + // upstream_table_id, + // &split_id, + // &mut cdc_split, + // last_binlog_offset.clone(), + // ) + // .await?; break 'backfill_loop; } Some(chunk) => { @@ -461,14 +462,14 @@ impl CdcBackfillExecutor { } } } else { - Self::write_backfill_state( - &mut self.source_state_handler, - upstream_table_id, - &split_id, - &mut cdc_split, - None, - ) - .await?; + // Self::write_backfill_state( + // &mut self.source_state_handler, + // upstream_table_id, + // &split_id, + // &mut cdc_split, + // None, + // ) + // .await?; } tracing::debug!( @@ -486,7 +487,7 @@ impl CdcBackfillExecutor { if let Some(msg) = mapping_message(msg?, &self.output_indices) { // persist the backfill state if any if let Message::Barrier(barrier) = &msg { - Self::persist_state(&mut self.source_state_handler, barrier.epoch).await?; + // Self::persist_state(&mut self.source_state_handler, barrier.epoch).await?; } yield msg; } @@ -612,12 +613,17 @@ async fn parse_debezium_chunk( // TODO: preserve the transaction semantics for payload in payloads.rows() { - let ScalarRefImpl::Utf8(str) = payload.datum_at(0).expect("payload must exist") else { - unreachable!("payload must be utf8 string"); + let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist") + else { + unreachable!("payload must be jsonb"); }; parser - .parse_inner(None, Some(str.as_bytes().to_vec()), builder.row_writer()) + .parse_inner( + None, + Some(jsonb_ref.to_string().as_bytes().to_vec()), + builder.row_writer(), + ) .await .unwrap(); } @@ -683,10 +689,12 @@ impl Executor for CdcBackfillExecutor { #[cfg(test)] mod tests { + use std::str::FromStr; + use futures::{pin_mut, StreamExt}; use risingwave_common::array::{DataChunk, Op, StreamChunk, Vis}; use risingwave_common::catalog::{Field, Schema}; - use risingwave_common::types::{DataType, Datum}; + use risingwave_common::types::{DataType, Datum, JsonbVal}; use risingwave_common::util::iter_util::ZipEqFast; use crate::executor::backfill::cdc_backfill::transform_upstream; @@ -696,16 +704,17 @@ mod tests { #[tokio::test] async fn test_transform_upstream_chunk() { let schema = Schema::new(vec![ - Field::unnamed(DataType::Varchar), // debezium json payload + Field::unnamed(DataType::Jsonb), // debezium json payload Field::unnamed(DataType::Varchar), // _rw_offset Field::unnamed(DataType::Varchar), // _rw_table_name ]); let pk_indices = vec![1]; let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone()); - let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string(); + // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string(); + let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#; let mut datums: Vec = vec![ - Some(payload.into()), + Some(JsonbVal::from_str(payload).unwrap().into()), Some("file: 1.binlog, pos: 100".to_string().into()), Some("mydb.orders".to_string().into()), ]; diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index d73195cbd9bfc..e872526e72cfb 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; +use maplit::hashmap; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; +use risingwave_connector::source::external::{ExternalTableType, SchemaTableName}; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{ChainNode, ChainType}; use risingwave_storage::table::batch_table::storage_table::StorageTable; @@ -26,7 +29,7 @@ use crate::common::table::state_table::StateTable; use crate::executor::external::ExternalStorageTable; use crate::executor::{ BackfillExecutor, CdcBackfillExecutor, ChainExecutor, FlowControlExecutor, - RearrangedChainExecutor, + RearrangedChainExecutor, SourceStateTableHandler, }; pub struct ChainExecutorBuilder; @@ -41,7 +44,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { state_store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { - let [mview, snapshot]: [_; 2] = params.input.try_into().unwrap(); + let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap(); // For reporting the progress. let progress = stream .context @@ -73,7 +76,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { let upstream_only = matches!(node.chain_type(), ChainType::UpstreamOnly); ChainExecutor::new( snapshot, - mview, + upstream, progress, schema, params.pk_indices, @@ -81,18 +84,61 @@ impl ExecutorBuilder for ChainExecutorBuilder { ) .boxed() } - ChainType::Rearrange => { - RearrangedChainExecutor::new(snapshot, mview, progress, schema, params.pk_indices) - .boxed() - } + ChainType::Rearrange => RearrangedChainExecutor::new( + snapshot, + upstream, + progress, + schema, + params.pk_indices, + ) + .boxed(), ChainType::CdcBackfill => { - // todo!("CdcBackfill is not supported yet") + // TODO: pass the connector properties via barrier + let properties: HashMap = hashmap!( + "connector" => "mysql-cdc", + "host" => "localhost", + "port" => "8306", + "username" => "root", + "password" => "123456", + "database.name" => "mydb", + "table.name" => "products", + "server.id" => "5085" + ) + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + let table_desc: &StorageTableDesc = node.get_table_desc()?; + // last column may be `_rw_offset` + let schema = Schema::new( + table_desc + .columns + .iter() + .map(|col| Field::from(col)) + .collect_vec(), + ); + let table_type = ExternalTableType::from_properties(&properties); + // TODO: use fixed properties for test, then pass the properties via barrier let table_reader = - table_type.create_table_reader(source.properties.clone(), schema.clone())?; + table_type.create_table_reader(properties.clone(), schema.clone())?; + + let order_types = table_desc + .pk + .iter() + .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap())) + .collect_vec(); + + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); + + let schema_table_name = SchemaTableName::from_properties(&properties); let external_table = ExternalStorageTable::new( - TableId::new(source.source_id), - upstream_table_name, + TableId::new(table_desc.table_id), + schema_table_name, table_reader, schema.clone(), order_types, @@ -100,18 +146,24 @@ impl ExecutorBuilder for ChainExecutorBuilder { (0..table_desc.columns.len()).collect_vec(), ); - let cdc_backfill = CdcBackfillExecutor::new( + let source_state_handler = SourceStateTableHandler::from_table_catalog( + node.get_state_table().as_ref().unwrap(), + state_store.clone(), + ) + .await; + CdcBackfillExecutor::new( params.actor_context.clone(), external_table, - Box::new(source_exec), - (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ + upstream, + (0..table_desc.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ None, schema.clone(), pk_indices, params.executor_stats, source_state_handler, - source_ctrl_opts.chunk_size, - ); + true, + params.env.config().developer.chunk_size, + ).boxed() } ChainType::Backfill => { let table_desc: &StorageTableDesc = node.get_table_desc()?; @@ -194,7 +246,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { BackfillExecutor::new( upstream_table, - mview, + upstream, state_table, output_indices, progress,