Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 15, 2024
1 parent 34c732a commit 67f8778
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 24 deletions.
4 changes: 2 additions & 2 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ select * from t1_uuid;
221 74605c5a-a7bb-4b3b-8742-2a12e9709dea hello world


query T
query TIT
select * from sk_t1_uuid
----
21189447-8736-44bd-b254-26b5dec91da9
21189447-8736-44bd-b254-26b5dec91da9 2 bb
8 changes: 4 additions & 4 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ INSERT INTO t1_uuid values (221, '74605c5a-a7bb-4b3b-8742-2a12e9709dea', 'hello


statement ok
CREATE TABLE t1_test_uuid_delete (id varchar, primary key(id));
CREATE TABLE t1_test_uuid_delete (id varchar, v1 int, v2 varchar primary key(id, v2));

statement ok
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'), ('21189447-8736-44bd-b254-26b5dec91da9');
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8', 1, 'aa'), ('21189447-8736-44bd-b254-26b5dec91da9', 2, 'bb');

statement ok
CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='sk_t1_uuid',
primary_key='id',
primary_key='id, v2',
type='upsert'
);

statement ok
DELETE FROM t1_test_uuid_delete WHERE ID='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8';
DELETE FROM t1_test_uuid_delete WHERE id='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8' AND v2='aa';


statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ CREATE TABLE biz.t2 (
"aBc" INTEGER PRIMARY KEY
);

CREATE TABLE sk_t1_uuid (id uuid, primary key(id));
CREATE TABLE sk_t1_uuid (id uuid, v1 int, v2 varchar primary key(id, v2));
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@

public class MySqlDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public MySqlDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
var columnSqlTypesArr = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
pkColumnSqlTypes[i] = columnSqlTypesArr[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}
Expand Down Expand Up @@ -121,9 +120,9 @@ public void bindInsertIntoStatement(
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
for (int i = 0; i < pkIndices.length; ++i) {
Object pkField = row.get(pkIndices[i]);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[i]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ public class PostgresDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public PostgresDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

private static final HashMap<TypeName, String> RW_TYPE_TO_JDBC_TYPE_NAME;
Expand Down Expand Up @@ -169,9 +161,9 @@ public void bindInsertIntoStatement(
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
for (int colIndex : pkIndices) {
Object pkField = row.get(colIndex);
stmt.setObject(placeholderIdx++, pkField, columnSqlTypes[colIndex]);
}
}
}

0 comments on commit 67f8778

Please sign in to comment.