From b0a2292d56baf13276ac1b715b6477bf79162a79 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 15 May 2024 22:39:13 +0800 Subject: [PATCH] remove mili seconds part from temporal column --- .../com/risingwave/connector/JDBCSink.java | 2 +- .../connector/jdbc/JdbcDialect.java | 3 +- .../connector/jdbc/MySqlDialect.java | 28 +++++++++++++++++-- .../connector/jdbc/PostgresDialect.java | 3 +- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 180e0fd96efc..d854e561878f 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -289,7 +289,7 @@ public void prepareDelete(SinkRow row) { .asRuntimeException(); } try { - jdbcDialect.bindDeleteStatement(deleteStatement, row); + jdbcDialect.bindDeleteStatement(deleteStatement, tableSchema, row); deleteStatement.addBatch(); } catch (SQLException e) { throw Status.INTERNAL diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java index 3a091ff33f89..308f9927457a 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/JdbcDialect.java @@ -112,5 +112,6 @@ void bindInsertIntoStatement( throws SQLException; /** Bind the values of primary key fields to the {@code DELETE} statement. */ - void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException; + void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException; } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java index 8ff251af41ef..335a8b8fc5ed 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/MySqlDialect.java @@ -19,15 +19,21 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MySqlDialect implements JdbcDialect { private final int[] pkIndices; private final int[] pkColumnSqlTypes; + static final Logger LOG = LoggerFactory.getLogger(MySqlDialect.class); public MySqlDialect(List columnSqlTypes, List pkIndices) { var columnSqlTypesArr = columnSqlTypes.stream().mapToInt(i -> i).toArray(); @@ -117,11 +123,29 @@ public void bindInsertIntoStatement( } @Override - public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException { + public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException { // set the values of primary key fields int placeholderIdx = 1; + var columnDescs = tableSchema.getColumnDescs(); for (int i = 0; i < pkIndices.length; ++i) { - Object pkField = row.get(pkIndices[i]); + int pkIdx = pkIndices[i]; + Object pkField = row.get(pkIdx); + // remove the milliseconds part from the timestamp/timestamptz + switch (columnDescs.get(pkIdx).getDataType().getTypeName()) { + case TIMESTAMP: + LocalDateTime ldt = (LocalDateTime) pkField; + pkField = + LocalDateTime.ofEpochSecond( + ldt.toEpochSecond(ZoneOffset.UTC), 0, ZoneOffset.UTC); + break; + case TIMESTAMPTZ: + OffsetDateTime odt = (OffsetDateTime) pkField; + pkField = LocalDateTime.ofEpochSecond(odt.toEpochSecond(), 0, ZoneOffset.UTC); + break; + default: + break; + } stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[i]); } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java index a869d8cc2fdc..72a8cace1773 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java @@ -158,7 +158,8 @@ public void bindInsertIntoStatement( } @Override - public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException { + public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row) + throws SQLException { // set the values of primary key fields int placeholderIdx = 1; for (int colIndex : pkIndices) {