diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index d532610c391e..1ec8c827d939 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -46,7 +46,7 @@ select * from t1_uuid; 221 74605c5a-a7bb-4b3b-8742-2a12e9709dea hello world -query T +query TIT select * from sk_t1_uuid ---- -21189447-8736-44bd-b254-26b5dec91da9 +21189447-8736-44bd-b254-26b5dec91da9 2 bb diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index 97bdabaa4751..9a4ede4e032e 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -166,22 +166,22 @@ INSERT INTO t1_uuid values (221, '74605c5a-a7bb-4b3b-8742-2a12e9709dea', 'hello statement ok -CREATE TABLE t1_test_uuid_delete (id varchar, primary key(id)); +CREATE TABLE t1_test_uuid_delete (id varchar, v1 int, v2 varchar, primary key(id, v2)); statement ok -INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'), ('21189447-8736-44bd-b254-26b5dec91da9'); +INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8', 1, 'aa'), ('21189447-8736-44bd-b254-26b5dec91da9', 2, 'bb'); statement ok CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH ( connector='jdbc', jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', table.name='sk_t1_uuid', - primary_key='id', + primary_key='id, v2', type='upsert' ); statement ok -DELETE FROM t1_test_uuid_delete WHERE ID='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'; +DELETE FROM t1_test_uuid_delete WHERE id='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8' AND v2='aa'; statement ok diff --git a/e2e_test/sink/remote/pg_create_table.sql b/e2e_test/sink/remote/pg_create_table.sql index dab753ee05d6..ee272ef747a7 100644 --- a/e2e_test/sink/remote/pg_create_table.sql +++ b/e2e_test/sink/remote/pg_create_table.sql @@ -84,4 +84,4 @@ CREATE TABLE biz.t2 ( "aBc" INTEGER PRIMARY KEY ); -CREATE TABLE sk_t1_uuid (id uuid, primary key(id)); +CREATE TABLE sk_t1_uuid (id uuid, v1 int, v2 varchar, primary key(id, v2)); diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 4672f628c176..d854e561878f 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -53,9 +53,8 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { this.config = config; try { conn = JdbcUtils.getConnection(config.getJdbcUrl()); - // Retrieve primary keys and column type mappings from the database - this.pkColumnNames = - getPkColumnNames(conn, config.getTableName(), config.getSchemaName()); + // Table schema has been validated before, so we get the PK from it directly + this.pkColumnNames = tableSchema.getPrimaryKeys(); // column name -> java.sql.Types Map columnTypeMapping = getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName()); @@ -72,9 +71,10 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { .collect(Collectors.toList()); LOG.info( - "schema = {}, table = {}, columnSqlTypes = {}, pkIndices = {}", + "schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}", config.getSchemaName(), config.getTableName(), + tableSchema, columnSqlTypes, pkIndices); @@ -125,28 +125,6 @@ private static Map getColumnTypeMapping( return columnTypeMap; } - private static List getPkColumnNames( - Connection conn, String tableName, String schemaName) { - List pkColumnNames = new ArrayList<>(); - try { - var pks = conn.getMetaData().getPrimaryKeys(null, schemaName, tableName); - while (pks.next()) { - pkColumnNames.add(pks.getString(JDBC_COLUMN_NAME_KEY)); - } - } catch (SQLException e) { - throw Status.INTERNAL - .withDescription( - String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) - .asRuntimeException(); - } - LOG.info( - "schema = {}, table = {}: detected pk column = {}", - schemaName, - tableName, - pkColumnNames); - return pkColumnNames; - } - @Override public boolean write(Iterable rows) { final int maxRetryCount = 4; @@ -311,7 +289,7 @@ public void prepareDelete(SinkRow row) { .asRuntimeException(); } try { - jdbcDialect.bindDeleteStatement(deleteStatement, row); + jdbcDialect.bindDeleteStatement(deleteStatement, tableSchema, row); deleteStatement.addBatch(); } catch (SQLException e) { throw Status.INTERNAL @@ -362,7 +340,7 @@ private void executeStatement(PreparedStatement stmt) throws SQLException { if (stmt == null) { return; } - LOG.debug("Executing statement: {}", stmt); + LOG.info("Executing statement: {}", stmt); stmt.executeBatch(); stmt.clearParameters(); } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java index 3a091ff33f89..308f9927457a 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java @@ -112,5 +112,6 @@ void bindInsertIntoStatement( throws SQLException; /** Bind the values of primary key fields to the {@code DELETE} statement. */ - void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException; + void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException; } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java index b79a2f9f3509..2c4ea73448a6 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java @@ -26,18 +26,17 @@ public class MySqlDialect implements JdbcDialect { - private final int[] columnSqlTypes; private final int[] pkIndices; private final int[] pkColumnSqlTypes; public MySqlDialect(List columnSqlTypes, List pkIndices) { - this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray(); + var columnSqlTypesArr = columnSqlTypes.stream().mapToInt(i -> i).toArray(); this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray(); // derive sql types for pk columns var pkColumnSqlTypes = new int[pkIndices.size()]; for (int i = 0; i < pkIndices.size(); i++) { - pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]]; + pkColumnSqlTypes[i] = columnSqlTypesArr[this.pkIndices[i]]; } this.pkColumnSqlTypes = pkColumnSqlTypes; } @@ -118,12 +117,13 @@ public void bindInsertIntoStatement( } @Override - public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException { + public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException { // set the values of primary key fields int placeholderIdx = 1; - for (int idx : pkIndices) { - Object pkField = row.get(idx); - stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]); + for (int i = 0; i < pkIndices.length; ++i) { + Object pkField = row.get(pkIndices[i]); + stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[i]); } } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java index 022a54f8c356..6264d9c5eac1 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java @@ -31,18 +31,10 @@ public class PostgresDialect implements JdbcDialect { private final int[] columnSqlTypes; private final int[] pkIndices; - private final int[] pkColumnSqlTypes; public PostgresDialect(List columnSqlTypes, List pkIndices) { this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray(); this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray(); - - // derive sql types for pk columns - var pkColumnSqlTypes = new int[pkIndices.size()]; - for (int i = 0; i < pkIndices.size(); i++) { - pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]]; - } - this.pkColumnSqlTypes = pkColumnSqlTypes; } private static final HashMap RW_TYPE_TO_JDBC_TYPE_NAME; @@ -166,12 +158,13 @@ public void bindInsertIntoStatement( } @Override - public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException { + public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException { // set the values of primary key fields int placeholderIdx = 1; - for (int idx : pkIndices) { - Object pkField = row.get(idx); - stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]); + for (int pkIdx : pkIndices) { + Object pkField = row.get(pkIdx); + stmt.setObject(placeholderIdx++, pkField, columnSqlTypes[pkIdx]); } } }