Skip to content

Commit

Permalink
fix(mysql-cdc): validate column names in case-insensitive way (#17310) (
Browse files Browse the repository at this point in the history
#17319)

Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
github-actions[bot] and StrikeW authored Jun 18, 2024
1 parent 2fb2e37 commit a7e0b2c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 24 deletions.
6 changes: 3 additions & 3 deletions e2e_test/source/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"jacket","water resistant black wind breaker"),
(default,"spare tire","24 inch spare tire");


CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
`cusTomer_Name` VARCHAR(255) NOT NULL,
`priCE` DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,18 @@ private void validateTableSchema() throws SQLException {
stmt.setString(1, dbName);
stmt.setString(2, tableName);

// Field name in lower case -> data type
var schema = new HashMap<String, String>();
// Field name in lower case -> data type, because MySQL column name is case-insensitive
// https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html
var upstreamSchema = new HashMap<String, String>();
var pkFields = new HashSet<String>();
var res = stmt.executeQuery();
while (res.next()) {
var field = res.getString(1);
var dataType = res.getString(2);
var key = res.getString(3);
schema.put(field, dataType);
upstreamSchema.put(field.toLowerCase(), dataType);
if (key.equalsIgnoreCase("PRI")) {
pkFields.add(field);
pkFields.add(field.toLowerCase());
}
}

Expand All @@ -207,7 +208,7 @@ private void validateTableSchema() throws SQLException {
if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) {
continue;
}
var dataType = schema.get(e.getKey());
var dataType = upstreamSchema.get(e.getKey().toLowerCase());
if (dataType == null) {
throw ValidatorUtils.invalidArgument(
"Column '" + e.getKey() + "' not found in the upstream database");
Expand All @@ -218,7 +219,7 @@ private void validateTableSchema() throws SQLException {
}
}

if (!ValidatorUtils.isPrimaryKeyMatch(tableSchema, pkFields)) {
if (!isPrimaryKeyMatch(tableSchema, pkFields)) {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
}
Expand All @@ -231,6 +232,18 @@ public void close() throws Exception {
}
}

private boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
}
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName.toLowerCase())) {
return false;
}
}
return true;
}

private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeName typeName) {
int val = typeName.getNumber();
switch (mysqlDataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void validateTableSchema() throws SQLException {
pkFields.add(name);
}

if (!ValidatorUtils.isPrimaryKeyMatch(tableSchema, pkFields)) {
if (!isPrimaryKeyMatch(tableSchema, pkFields)) {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
}
Expand Down Expand Up @@ -227,6 +227,19 @@ private void validateTableSchema() throws SQLException {
}
}

private boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
}
// postgres column name is case-sensitive
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName)) {
return false;
}
}
return true;
}

private void validatePrivileges() throws SQLException {
boolean isSuperUser = false;
if (this.isAwsRds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

package com.risingwave.connector.source.common;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.source.SourceTypeE;
import io.grpc.Status;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,16 +70,4 @@ public static String getJdbcUrl(
throw ValidatorUtils.invalidArgument("Unknown source type: " + sourceType);
}
}

public static boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
}
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName)) {
return false;
}
}
return true;
}
}

0 comments on commit a7e0b2c

Please sign in to comment.