diff --git a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt index 5d27c73765e85..ea1e98c16c1dc 100644 --- a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt +++ b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt @@ -7,6 +7,12 @@ mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;" system ok mysql --protocol=tcp -u root mytest -e " DROP TABLE IF EXISTS mysql_types_test; + CREATE TABLE customers( + id BIGINT PRIMARY KEY, + modified DATETIME, + custinfo JSON + ); + ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) ); CREATE TABLE IF NOT EXISTS mysql_types_test( c_boolean boolean, c_bit bit, @@ -34,7 +40,6 @@ mysql --protocol=tcp -u root mytest -e " INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]'); " - statement ok create source mysql_source with ( connector = 'mysql-cdc', @@ -46,6 +51,19 @@ create source mysql_source with ( server.id = '5601' ); +statement ok +create table rw_customers (*) from mysql_source table 'mytest.customers'; + +# Name, Type, Is Hidden, Description +query TTTT +describe rw_customers; +---- +id bigint false NULL +modified timestamp without time zone false NULL +custinfo jsonb false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description rw_customers NULL NULL statement ok create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test'; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java index cabf9e29a68aa..98a0a171ec4cc 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java @@ -202,11 +202,15 @@ var record = event.value(); byte[] key = keyConverter.fromConnectData( record.topic(), record.keySchema(), record.key()); + String msgPayload = + payload == null ? "" : new String(payload, StandardCharsets.UTF_8); + // key can be null if the table has no primary key + String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8); var message = msgBuilder .setFullTableName(fullTableName) - .setPayload(new String(payload, StandardCharsets.UTF_8)) - .setKey(new String(key, StandardCharsets.UTF_8)) + .setPayload(msgPayload) + .setKey(msgKey) .setSourceTsMs(sourceTsMs) .build(); LOG.debug( diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 37d855a9513e3..6aff5e43692e4 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -27,8 +27,10 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAM use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; -use sea_schema::mysql::def::{CharSet, Collation, ColumnKey, ColumnType, StorageEngine, TableInfo}; +use sea_schema::mysql::def::{ColumnKey, ColumnType}; use sea_schema::mysql::discovery::SchemaDiscovery; +use sea_schema::mysql::query::SchemaQueryBuilder; +use sea_schema::sea_query::{Alias, IntoIden}; use serde_derive::{Deserialize, Serialize}; use sqlx::mysql::MySqlConnectOptions; use sqlx::MySqlPool; @@ -89,22 +91,21 @@ impl MySqlExternalTable { }); let connection = MySqlPool::connect_with(options).await?; - let schema_discovery = SchemaDiscovery::new(connection, config.database.as_str()); - - let table_schema = schema_discovery - .discover_table(TableInfo { - name: config.table.clone(), - engine: StorageEngine::InnoDb, - auto_increment: None, - char_set: CharSet::Utf8Mb4, - collation: Collation::Utf8Mb40900AiCi, - comment: "".to_string(), - }) + let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str()); + + // discover system version first + let system_info = schema_discovery.discover_system().await?; + schema_discovery.query = SchemaQueryBuilder::new(system_info.clone()); + + let schema = Alias::new(config.database.as_str()).into_iden(); + let table = Alias::new(config.table.as_str()).into_iden(); + let columns = schema_discovery + .discover_columns(schema, table, &system_info) .await?; let mut column_descs = vec![]; let mut pk_names = vec![]; - for col in &table_schema.columns { + for col in columns { let data_type = type_to_rw_type(&col.col_type)?; column_descs.push(ColumnDesc::named( col.name.clone(), @@ -116,6 +117,10 @@ impl MySqlExternalTable { } } + if pk_names.is_empty() { + return Err(anyhow!("MySQL table doesn't define the primary key").into()); + } + Ok(Self { column_descs, pk_names,