Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(jdbc-sink): execute statements in batch and set isolation level to RC #12250

Merged
merged 10 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
getTestTableSchema());
assertEquals(tableName, sink.getTableName());
Connection conn = sink.getConn();
Statement stmt = conn.createStatement();

ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM test");
assertTrue(rs.next());
assertEquals(0, rs.getInt(1));

sink.write(
Iterators.forArray(
Expand All @@ -158,7 +163,15 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Time(1000000000),
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes()),
"I want to sleep".getBytes())));

// chunk will commit after sink.write()
rs = stmt.executeQuery("SELECT COUNT(*) FROM test");
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));

sink.write(
Iterators.forArray(
new ArraySinkRow(
Op.UPDATE_DELETE,
1,
Expand Down Expand Up @@ -186,11 +199,13 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test");
rs.next();
rs = stmt.executeQuery("SELECT COUNT(*) FROM test");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));

rs = stmt.executeQuery("SELECT * FROM test");
assertTrue(rs.next());

// check if rows are inserted
assertEquals(1, rs.getInt(1));
Expand Down Expand Up @@ -237,8 +252,8 @@ public void testPostgres() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
pg.start();
testJDBCSync(pg, TestType.TestPg);
testJDBCWrite(pg, TestType.TestPg);
testJDBCSync(pg, TestType.TestPg);
testJDBCDrop(pg, TestType.TestPg);
pg.stop();
}
Expand All @@ -254,8 +269,8 @@ public void testMySQL() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
mysql.start();
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCWrite(mysql, TestType.TestMySQL);
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCDrop(mysql, TestType.TestMySQL);
mysql.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCSink extends SinkWriterBase {
// The order of variants matters here, should not change it
enum OpType {
DELETE,
UPSERT,
INSERT,
}

private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private final JdbcDialect jdbcDialect;
Expand All @@ -42,6 +47,7 @@ public class JDBCSink extends SinkWriterBase {
private PreparedStatement deletePreparedStmt;

private boolean updateFlag = false;

private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class);

public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
Expand All @@ -59,9 +65,17 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;
try {
this.conn = DriverManager.getConnection(config.getJdbcUrl());
this.conn.setAutoCommit(false);
this.pkColumnNames =
getPkColumnNames(conn, config.getTableName(), config.getSchemaName());
// disable auto commit can improve performance
this.conn.setAutoCommit(false);
// use the lowest isolation level since we don't guarantee exactly-once
this.conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
StrikeW marked this conversation as resolved.
Show resolved Hide resolved

LOG.info(
"JDBC connection: autoCommit = {}, trxn = {}",
conn.getAutoCommit(),
conn.getTransactionIsolation());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -133,6 +147,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) {
try {
var preparedStmt = insertPreparedStmt;
jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row);
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
Expand All @@ -149,7 +164,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
switch (row.getOp()) {
case INSERT:
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
return preparedStmt;
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
Expand All @@ -158,12 +173,14 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
}
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
updateFlag = false;
return preparedStmt;
break;
default:
throw Status.FAILED_PRECONDITION
.withDescription("unexpected op type: " + row.getOp())
.asRuntimeException();
}
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -192,6 +209,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
deletePreparedStmt.setObject(placeholderIdx++, fromRow);
}
deletePreparedStmt.addBatch();
return deletePreparedStmt;
} catch (SQLException e) {
throw Status.INTERNAL
Expand All @@ -203,48 +221,51 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {

@Override
public void write(Iterator<SinkRow> rows) {
var stagingStatements = new TreeMap<OpType, PreparedStatement>();
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.UPDATE_DELETE) {
updateFlag = true;
continue;
}

if (config.isUpsertSink()) {
stmt = prepareForUpsert(row);
if (row.getOp() == Data.Op.DELETE) {
var stmt = prepareDeleteStatement(row);
stagingStatements.put(OpType.DELETE, stmt);
} else {
var stmt = prepareUpsertStatement(row);
stagingStatements.put(OpType.UPSERT, stmt);
}
} else {
stmt = prepareForAppendOnly(row);
}

try {
LOG.debug("Executing statement: {}", stmt);
stmt.executeUpdate();
stmt.clearParameters();
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, stmt, e.getMessage()))
.asRuntimeException();
var stmt = prepareInsertStatement(row);
stagingStatements.put(OpType.INSERT, stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private PreparedStatement prepareForUpsert(SinkRow row) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.DELETE) {
stmt = prepareDeleteStatement(row);
} else {
stmt = prepareUpsertStatement(row);
try {
// Execute staging statements after all rows are prepared.
// Note that we execute statement in the order of DELETE, UPSERT, INSERT as defined in
// OpType.
for (var entry : stagingStatements.entrySet()) {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
executeStatement(entry.getValue());
}

conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
return stmt;
}

private PreparedStatement prepareForAppendOnly(SinkRow row) {
return prepareInsertStatement(row);
private void executeStatement(PreparedStatement stmt) throws SQLException {
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}

@Override
Expand All @@ -255,14 +276,6 @@ public void sync() {
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
}
try {
conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

@Override
Expand Down