Skip to content

Commit

Permalink
fix primary key
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 15, 2024
1 parent e22b358 commit 0f9e657
Showing 1 changed file with 5 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;
try {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
// Retrieve primary keys and column type mappings from the database
this.pkColumnNames =
getPkColumnNames(conn, config.getTableName(), config.getSchemaName());
// Table schema has been validated before, so we get the PK from it directly
this.pkColumnNames = tableSchema.getPrimaryKeys();
// column name -> java.sql.Types
Map<String, Integer> columnTypeMapping =
getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName());
Expand All @@ -72,9 +71,10 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}, columnSqlTypes = {}, pkIndices = {}",
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}",
config.getSchemaName(),
config.getTableName(),
tableSchema,
columnSqlTypes,
pkIndices);

Expand Down Expand Up @@ -125,28 +125,6 @@ private static Map<String, Integer> getColumnTypeMapping(
return columnTypeMap;
}

private static List<String> getPkColumnNames(
Connection conn, String tableName, String schemaName) {
List<String> pkColumnNames = new ArrayList<>();
try {
var pks = conn.getMetaData().getPrimaryKeys(null, schemaName, tableName);
while (pks.next()) {
pkColumnNames.add(pks.getString(JDBC_COLUMN_NAME_KEY));
}
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
LOG.info(
"schema = {}, table = {}: detected pk column = {}",
schemaName,
tableName,
pkColumnNames);
return pkColumnNames;
}

@Override
public boolean write(Iterable<SinkRow> rows) {
final int maxRetryCount = 4;
Expand Down Expand Up @@ -362,7 +340,7 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
LOG.debug("Executing statement: {}", stmt);
LOG.info("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}
Expand Down

0 comments on commit 0f9e657

Please sign in to comment.