Skip to content

Commit

Permalink
fix(jdbc-sink): fix primary key bindings for the DELETE statement (#1…
Browse files Browse the repository at this point in the history
…6770) (#16775)

Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
github-actions[bot] and StrikeW authored May 16, 2024
1 parent b24c5e7 commit 5b73c68
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 55 deletions.
4 changes: 2 additions & 2 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ select * from t1_uuid;
221 74605c5a-a7bb-4b3b-8742-2a12e9709dea hello world


query T
query TIT
select * from sk_t1_uuid
----
21189447-8736-44bd-b254-26b5dec91da9
21189447-8736-44bd-b254-26b5dec91da9 2 bb
8 changes: 4 additions & 4 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ INSERT INTO t1_uuid values (221, '74605c5a-a7bb-4b3b-8742-2a12e9709dea', 'hello


statement ok
CREATE TABLE t1_test_uuid_delete (id varchar, primary key(id));
CREATE TABLE t1_test_uuid_delete (id varchar, v1 int, v2 varchar, primary key(id, v2));

statement ok
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'), ('21189447-8736-44bd-b254-26b5dec91da9');
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8', 1, 'aa'), ('21189447-8736-44bd-b254-26b5dec91da9', 2, 'bb');

statement ok
CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='sk_t1_uuid',
primary_key='id',
primary_key='id, v2',
type='upsert'
);

statement ok
DELETE FROM t1_test_uuid_delete WHERE ID='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8';
DELETE FROM t1_test_uuid_delete WHERE id='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8' AND v2='aa';


statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ CREATE TABLE biz.t2 (
"aBc" INTEGER PRIMARY KEY
);

CREATE TABLE sk_t1_uuid (id uuid, primary key(id));
CREATE TABLE sk_t1_uuid (id uuid, v1 int, v2 varchar, primary key(id, v2));
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;
try {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
// Retrieve primary keys and column type mappings from the database
this.pkColumnNames =
getPkColumnNames(conn, config.getTableName(), config.getSchemaName());
// Table schema has been validated before, so we get the PK from it directly
this.pkColumnNames = tableSchema.getPrimaryKeys();
// column name -> java.sql.Types
Map<String, Integer> columnTypeMapping =
getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName());
Expand All @@ -72,9 +71,10 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}, columnSqlTypes = {}, pkIndices = {}",
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}",
config.getSchemaName(),
config.getTableName(),
tableSchema,
columnSqlTypes,
pkIndices);

Expand Down Expand Up @@ -125,28 +125,6 @@ private static Map<String, Integer> getColumnTypeMapping(
return columnTypeMap;
}

private static List<String> getPkColumnNames(
Connection conn, String tableName, String schemaName) {
List<String> pkColumnNames = new ArrayList<>();
try {
var pks = conn.getMetaData().getPrimaryKeys(null, schemaName, tableName);
while (pks.next()) {
pkColumnNames.add(pks.getString(JDBC_COLUMN_NAME_KEY));
}
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
LOG.info(
"schema = {}, table = {}: detected pk column = {}",
schemaName,
tableName,
pkColumnNames);
return pkColumnNames;
}

@Override
public boolean write(Iterable<SinkRow> rows) {
final int maxRetryCount = 4;
Expand Down Expand Up @@ -311,7 +289,7 @@ public void prepareDelete(SinkRow row) {
.asRuntimeException();
}
try {
jdbcDialect.bindDeleteStatement(deleteStatement, row);
jdbcDialect.bindDeleteStatement(deleteStatement, tableSchema, row);
deleteStatement.addBatch();
} catch (SQLException e) {
throw Status.INTERNAL
Expand Down Expand Up @@ -362,7 +340,7 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
LOG.debug("Executing statement: {}", stmt);
LOG.info("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ void bindInsertIntoStatement(
throws SQLException;

/** Bind the values of primary key fields to the {@code DELETE} statement. */
void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException;
void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@

public class MySqlDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public MySqlDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
var columnSqlTypesArr = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
pkColumnSqlTypes[i] = columnSqlTypesArr[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}
Expand Down Expand Up @@ -118,12 +117,13 @@ public void bindInsertIntoStatement(
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
for (int i = 0; i < pkIndices.length; ++i) {
Object pkField = row.get(pkIndices[i]);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[i]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ public class PostgresDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public PostgresDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

private static final HashMap<TypeName, String> RW_TYPE_TO_JDBC_TYPE_NAME;
Expand Down Expand Up @@ -166,12 +158,13 @@ public void bindInsertIntoStatement(
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
for (int pkIdx : pkIndices) {
Object pkField = row.get(pkIdx);
stmt.setObject(placeholderIdx++, pkField, columnSqlTypes[pkIdx]);
}
}
}

0 comments on commit 5b73c68

Please sign in to comment.