Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql-cdc): fix auto schema mapping (#17402) #17405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"
system ok
mysql --protocol=tcp -u root mytest -e "
DROP TABLE IF EXISTS mysql_types_test;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
CREATE TABLE IF NOT EXISTS mysql_types_test(
c_boolean boolean,
c_bit bit,
Expand Down Expand Up @@ -34,7 +40,6 @@ mysql --protocol=tcp -u root mytest -e "
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
Expand All @@ -46,6 +51,19 @@ create source mysql_source with (
server.id = '5601'
);

statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ var record = event.value();
byte[] key =
keyConverter.fromConnectData(
record.topic(), record.keySchema(), record.key());
String msgPayload =
payload == null ? "" : new String(payload, StandardCharsets.UTF_8);
// key can be null if the table has no primary key
String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8);
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setKey(new String(key, StandardCharsets.UTF_8))
.setPayload(msgPayload)
.setKey(msgKey)
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
Expand Down
31 changes: 18 additions & 13 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAM
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::mysql::def::{CharSet, Collation, ColumnKey, ColumnType, StorageEngine, TableInfo};
use sea_schema::mysql::def::{ColumnKey, ColumnType};
use sea_schema::mysql::discovery::SchemaDiscovery;
use sea_schema::mysql::query::SchemaQueryBuilder;
use sea_schema::sea_query::{Alias, IntoIden};
use serde_derive::{Deserialize, Serialize};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::MySqlPool;
Expand Down Expand Up @@ -89,22 +91,21 @@ impl MySqlExternalTable {
});

let connection = MySqlPool::connect_with(options).await?;
let schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());

let table_schema = schema_discovery
.discover_table(TableInfo {
name: config.table.clone(),
engine: StorageEngine::InnoDb,
auto_increment: None,
char_set: CharSet::Utf8Mb4,
collation: Collation::Utf8Mb40900AiCi,
comment: "".to_string(),
})
let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());

// discover system version first
let system_info = schema_discovery.discover_system().await?;
schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());

let schema = Alias::new(config.database.as_str()).into_iden();
let table = Alias::new(config.table.as_str()).into_iden();
let columns = schema_discovery
.discover_columns(schema, table, &system_info)
.await?;

let mut column_descs = vec![];
let mut pk_names = vec![];
for col in &table_schema.columns {
for col in columns {
let data_type = type_to_rw_type(&col.col_type)?;
column_descs.push(ColumnDesc::named(
col.name.clone(),
Expand All @@ -116,6 +117,10 @@ impl MySqlExternalTable {
}
}

if pk_names.is_empty() {
return Err(anyhow!("MySQL table doesn't define the primary key").into());
}

Ok(Self {
column_descs,
pk_names,
Expand Down
Loading