diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 2f8a035911f24..ae5c417a9b08c 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -193,7 +193,10 @@ public void drop() { private String createInsertStatement(String tableName, TableSchema tableSchema) { String[] columnNames = tableSchema.getColumnNames(); - String columnNamesString = String.join(", ", columnNames); + String columnNamesString = + Arrays.stream(columnNames) + .map(columnName -> "\"" + columnName + "\"") + .collect(Collectors.joining(", ")); String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?")); return String.format( "INSERT INTO %s (%s) VALUES (%s)", @@ -204,11 +207,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema) List primaryKeys = tableSchema.getPrimaryKeys(); String setClause = // cassandra does not allow SET on primary keys nonKeyColumns.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> "\"" + columnName + "\" = ?") .collect(Collectors.joining(", ")); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> "\"" + columnName + "\" = ?") .collect(Collectors.joining(" AND ")); return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); } @@ -217,7 +220,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS List primaryKeys = tableSchema.getPrimaryKeys(); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> "\"" + columnName + "\" = ?") .collect(Collectors.joining(" AND ")); return String.format("DELETE FROM %s WHERE %s", tableName, whereClause); }