Skip to content

Commit

Permalink
fix(sink): set query timeout for jdbc sink to avoid stuck (#18430)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored and StrikeW committed Sep 6, 2024
1 parent 1a310e4 commit dc48fdc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}",
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}, queryTimeout = {}",
config.getSchemaName(),
config.getTableName(),
tableSchema,
columnSqlTypes,
pkIndices);
pkIndices,
config.getQueryTimeout());

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices);
Expand All @@ -92,7 +93,7 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
// Commit the `getTransactionIsolation`
conn.commit();

jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -173,7 +174,7 @@ public boolean write(Iterable<SinkRow> rows) {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
// reset the flag since we will retry to prepare the batch again
updateFlag = false;
jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} else {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -206,13 +207,15 @@ public boolean write(Iterable<SinkRow> rows) {
* across multiple batches if only the JDBC connection is valid.
*/
class JdbcStatements implements AutoCloseable {
private final int queryTimeoutSecs;
private PreparedStatement deleteStatement;
private PreparedStatement upsertStatement;
private PreparedStatement insertStatement;

private final Connection conn;

public JdbcStatements(Connection conn) throws SQLException {
public JdbcStatements(Connection conn, int queryTimeoutSecs) throws SQLException {
this.queryTimeoutSecs = queryTimeoutSecs;
this.conn = conn;
var schemaTableName =
jdbcDialect.createSchemaTableName(
Expand Down Expand Up @@ -339,6 +342,9 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
// if timeout occurs, a SQLTimeoutException will be thrown
// and we will retry to write the stream chunk in `JDBCSink.write`
stmt.setQueryTimeout(queryTimeoutSecs);
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class JDBCSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "schema.name")
private String schemaName;

@JsonProperty(value = "jdbc.query.timeout")
private int queryTimeoutSeconds = 600;

@JsonCreator
public JDBCSinkConfig(
@JsonProperty(value = "jdbc.url") String jdbcUrl,
Expand Down Expand Up @@ -62,4 +65,8 @@ public String getSinkType() {
public boolean isUpsertSink() {
return this.isUpsertSink;
}

public int getQueryTimeout() {
return queryTimeoutSeconds;
}
}

0 comments on commit dc48fdc

Please sign in to comment.