Skip to content

Commit

Permalink
fix(mysql-cdc): fix auto schema mapping (#17402)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored and StrikeW committed Jun 23, 2024
1 parent 3283862 commit e24a7cc
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
20 changes: 19 additions & 1 deletion e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"
system ok
mysql --protocol=tcp -u root mytest -e "
DROP TABLE IF EXISTS mysql_types_test;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
CREATE TABLE IF NOT EXISTS mysql_types_test(
c_boolean boolean,
c_bit bit,
Expand Down Expand Up @@ -34,7 +40,6 @@ mysql --protocol=tcp -u root mytest -e "
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
Expand All @@ -46,6 +51,19 @@ create source mysql_source with (
server.id = '5601'
);

statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ var record = event.value();
byte[] key =
keyConverter.fromConnectData(
record.topic(), record.keySchema(), record.key());
String msgPayload =
payload == null ? "" : new String(payload, StandardCharsets.UTF_8);
// key can be null if the table has no primary key
String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8);
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setKey(new String(key, StandardCharsets.UTF_8))
.setPayload(msgPayload)
.setKey(msgKey)
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
Expand Down
31 changes: 18 additions & 13 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAM
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::mysql::def::{CharSet, Collation, ColumnKey, ColumnType, StorageEngine, TableInfo};
use sea_schema::mysql::def::{ColumnKey, ColumnType};
use sea_schema::mysql::discovery::SchemaDiscovery;
use sea_schema::mysql::query::SchemaQueryBuilder;
use sea_schema::sea_query::{Alias, IntoIden};
use serde_derive::{Deserialize, Serialize};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::MySqlPool;
Expand Down Expand Up @@ -89,22 +91,21 @@ impl MySqlExternalTable {
});

let connection = MySqlPool::connect_with(options).await?;
let schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());

let table_schema = schema_discovery
.discover_table(TableInfo {
name: config.table.clone(),
engine: StorageEngine::InnoDb,
auto_increment: None,
char_set: CharSet::Utf8Mb4,
collation: Collation::Utf8Mb40900AiCi,
comment: "".to_string(),
})
let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());

// discover system version first
let system_info = schema_discovery.discover_system().await?;
schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());

let schema = Alias::new(config.database.as_str()).into_iden();
let table = Alias::new(config.table.as_str()).into_iden();
let columns = schema_discovery
.discover_columns(schema, table, &system_info)
.await?;

let mut column_descs = vec![];
let mut pk_names = vec![];
for col in &table_schema.columns {
for col in columns {
let data_type = type_to_rw_type(&col.col_type)?;
column_descs.push(ColumnDesc::named(
col.name.clone(),
Expand All @@ -116,6 +117,10 @@ impl MySqlExternalTable {
}
}

if pk_names.is_empty() {
return Err(anyhow!("MySQL table doesn't define the primary key").into());
}

Ok(Self {
column_descs,
pk_names,
Expand Down

0 comments on commit e24a7cc

Please sign in to comment.