Skip to content

Commit

Permalink
chore(cdc): enhance cdc primary key error message (#18829)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Oct 10, 2024
1 parent dce14c9 commit 9a71862
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ private void validateTableSchema() throws SQLException {
}
}

if (!isPrimaryKeyMatch(tableSchema, pkFields)) {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
primaryKeyCheck(tableSchema, pkFields);
}
}

Expand All @@ -240,16 +238,24 @@ public void close() throws Exception {
}
}

private boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
private static void primaryKeyCheck(TableSchema sourceSchema, Set<String> pkFields)
throws RuntimeException {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: the SQL schema defines "
+ sourceSchema.getPrimaryKeys().size()
+ " primary key columns, but the source table in MySQL has "
+ pkFields.size()
+ " columns.");
}
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName.toLowerCase())) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: The primary key list of the source table in MySQL does not contain '"
+ colName
+ "'.");
}
}
return true;
}

private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeName typeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ private void validateTableSchema() throws SQLException {
var name = res.getString(1);
pkFields.add(name);
}

if (!isPrimaryKeyMatch(tableSchema, pkFields)) {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
primaryKeyCheck(tableSchema, pkFields);
}

// Check whether source schema match table schema on upstream
Expand Down Expand Up @@ -233,17 +230,24 @@ private void validateTableSchema() throws SQLException {
}
}

private boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
private static void primaryKeyCheck(TableSchema sourceSchema, Set<String> pkFields)
throws RuntimeException {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: the SQL schema defines "
+ sourceSchema.getPrimaryKeys().size()
+ " primary key columns, but the source table in Postgres has "
+ pkFields.size()
+ " columns.");
}
// postgres column name is case-sensitive
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName)) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: The primary key list of the source table in Postgres does not contain '"
+ colName
+ "'.\nHint: If your primary key contains uppercase letters, please ensure that the primary key in the DML of RisingWave uses the same uppercase format and is wrapped with double quotes (\"\").");
}
}
return true;
}

private void validatePrivileges() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,9 @@ private void validateTableSchema() throws SQLException {
var name = res.getString(1);
pkFields.add(name);
}

if (!isPrimaryKeyMatch(tableSchema, pkFields)) {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
primaryKeyCheck(tableSchema, pkFields);
}

// Check whether the db is case-sensitive
boolean isCaseSensitive = false;
try (var stmt =
Expand Down Expand Up @@ -266,16 +264,24 @@ public void close() throws Exception {
}
}

public static boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set<String> pkFields) {
private static void primaryKeyCheck(TableSchema sourceSchema, Set<String> pkFields)
throws RuntimeException {
if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: the SQL schema defines "
+ sourceSchema.getPrimaryKeys().size()
+ " primary key columns, but the source table in SQL Server has "
+ pkFields.size()
+ " columns.");
}
for (var colName : sourceSchema.getPrimaryKeys()) {
if (!pkFields.contains(colName)) {
return false;
throw ValidatorUtils.invalidArgument(
"Primary key mismatch: The primary key list of the source table in SQL Server does not contain '"
+ colName
+ "'.\nHint: If your primary key contains uppercase letters, please ensure that the primary key in the DML of RisingWave uses the same uppercase format and is wrapped with double quotes (\"\").");
}
}
return true;
}

private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName typeName) {
Expand Down

0 comments on commit 9a71862

Please sign in to comment.