Skip to content

Commit

Permalink
remove mili seconds part from temporal column
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 15, 2024
1 parent 0f9e657 commit b0a2292
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void prepareDelete(SinkRow row) {
.asRuntimeException();
}
try {
jdbcDialect.bindDeleteStatement(deleteStatement, row);
jdbcDialect.bindDeleteStatement(deleteStatement, tableSchema, row);
deleteStatement.addBatch();
} catch (SQLException e) {
throw Status.INTERNAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ void bindInsertIntoStatement(
throws SQLException;

/** Bind the values of primary key fields to the {@code DELETE} statement. */
void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException;
void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlDialect implements JdbcDialect {

private final int[] pkIndices;
private final int[] pkColumnSqlTypes;
static final Logger LOG = LoggerFactory.getLogger(MySqlDialect.class);

public MySqlDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
var columnSqlTypesArr = columnSqlTypes.stream().mapToInt(i -> i).toArray();
Expand Down Expand Up @@ -117,11 +123,29 @@ public void bindInsertIntoStatement(
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
var columnDescs = tableSchema.getColumnDescs();
for (int i = 0; i < pkIndices.length; ++i) {
Object pkField = row.get(pkIndices[i]);
int pkIdx = pkIndices[i];
Object pkField = row.get(pkIdx);
// remove the milliseconds part from the timestamp/timestamptz
switch (columnDescs.get(pkIdx).getDataType().getTypeName()) {
case TIMESTAMP:
LocalDateTime ldt = (LocalDateTime) pkField;
pkField =
LocalDateTime.ofEpochSecond(
ldt.toEpochSecond(ZoneOffset.UTC), 0, ZoneOffset.UTC);
break;
case TIMESTAMPTZ:
OffsetDateTime odt = (OffsetDateTime) pkField;
pkField = LocalDateTime.ofEpochSecond(odt.toEpochSecond(), 0, ZoneOffset.UTC);
break;
default:
break;
}
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[i]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public void bindInsertIntoStatement(
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
public void bindDeleteStatement(PreparedStatement stmt, TableSchema tableSchema, SinkRow row)
throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int colIndex : pkIndices) {
Expand Down

0 comments on commit b0a2292

Please sign in to comment.