Skip to content

Commit

Permalink
refactor(jdbc-sink): execute statements in batch and set isolation le…
Browse files Browse the repository at this point in the history
…vel to RC (#12250)
  • Loading branch information
StrikeW authored and Li0k committed Sep 15, 2023
1 parent c4b58b5 commit b8d9ad5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static TableSchema getTestTableSchema() {

static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test2";
createMockTable(container.getJdbcUrl(), tableName, testType);
JDBCSink sink =
new JDBCSink(
Expand All @@ -97,12 +97,13 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
sink.sync();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test");
int count;
for (count = 0; rs.next(); ) {
count++;
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
int count;
for (count = 0; rs.next(); ) {
count++;
}
assertEquals(1, count);
}
assertEquals(1, count);

sink.write(
Iterators.forArray(
Expand All @@ -116,20 +117,22 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT * FROM test");
for (count = 0; rs.next(); ) {
count++;
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
int count;
for (count = 0; rs.next(); ) {
count++;
}
assertEquals(2, count);
}
assertEquals(2, count);
stmt.close();

sink.sync();
sink.drop();
}

static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test1";
createMockTable(container.getJdbcUrl(), tableName, testType);

JDBCSink sink =
Expand All @@ -138,6 +141,7 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
getTestTableSchema());
assertEquals(tableName, sink.getTableName());
Connection conn = sink.getConn();
Statement stmt = conn.createStatement();

sink.write(
Iterators.forArray(
Expand All @@ -158,7 +162,16 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Time(1000000000),
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes()),
"I want to sleep".getBytes())));

// chunk will commit after sink.write()
try (var rs = stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s", tableName))) {
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
}

sink.write(
Iterators.forArray(
new ArraySinkRow(
Op.UPDATE_DELETE,
1,
Expand Down Expand Up @@ -186,30 +199,30 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test");
rs.next();
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
assertTrue(rs.next());

// check if rows are inserted
assertEquals(1, rs.getInt(1));
assertEquals("Clare", rs.getString(2));
assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString());
assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString());
assertEquals(new Timestamp(2000000000), rs.getTimestamp(5));
assertEquals(
"{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", rs.getString(6));
assertEquals("I want to eat", new String(rs.getBytes(7)));
assertFalse(rs.next());
// check if rows are inserted
assertEquals(1, rs.getInt(1));
assertEquals("Clare", rs.getString(2));
assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString());
assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString());
assertEquals(new Timestamp(2000000000), rs.getTimestamp(5));
assertEquals(
"{\"key\": \"password\", \"value\": \"Singularity123123123123\"}",
rs.getString(6));
assertEquals("I want to eat", new String(rs.getBytes(7)));
assertFalse(rs.next());
}

sink.sync();
stmt.close();
}

static void testJDBCDrop(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test3";
createMockTable(container.getJdbcUrl(), tableName, testType);

JDBCSink sink =
Expand Down Expand Up @@ -237,8 +250,8 @@ public void testPostgres() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
pg.start();
testJDBCSync(pg, TestType.TestPg);
testJDBCWrite(pg, TestType.TestPg);
testJDBCSync(pg, TestType.TestPg);
testJDBCDrop(pg, TestType.TestPg);
pg.stop();
}
Expand All @@ -254,8 +267,8 @@ public void testMySQL() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
mysql.start();
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCWrite(mysql, TestType.TestMySQL);
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCDrop(mysql, TestType.TestMySQL);
mysql.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,6 +40,7 @@ public class JDBCSink extends SinkWriterBase {
private PreparedStatement deletePreparedStmt;

private boolean updateFlag = false;

private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class);

public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
Expand All @@ -59,9 +58,17 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;
try {
this.conn = DriverManager.getConnection(config.getJdbcUrl());
this.conn.setAutoCommit(false);
this.pkColumnNames =
getPkColumnNames(conn, config.getTableName(), config.getSchemaName());
// disable auto commit can improve performance
this.conn.setAutoCommit(false);
// explicitly set isolation level to RC
this.conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

LOG.info(
"JDBC connection: autoCommit = {}, trxn = {}",
conn.getAutoCommit(),
conn.getTransactionIsolation());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -133,6 +140,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) {
try {
var preparedStmt = insertPreparedStmt;
jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row);
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
Expand All @@ -149,7 +157,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
switch (row.getOp()) {
case INSERT:
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
return preparedStmt;
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
Expand All @@ -158,12 +166,14 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
}
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
updateFlag = false;
return preparedStmt;
break;
default:
throw Status.FAILED_PRECONDITION
.withDescription("unexpected op type: " + row.getOp())
.asRuntimeException();
}
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -192,6 +202,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
deletePreparedStmt.setObject(placeholderIdx++, fromRow);
}
deletePreparedStmt.addBatch();
return deletePreparedStmt;
} catch (SQLException e) {
throw Status.INTERNAL
Expand All @@ -203,48 +214,53 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {

@Override
public void write(Iterator<SinkRow> rows) {
PreparedStatement deleteStatement = null;
PreparedStatement upsertStatement = null;
PreparedStatement insertStatement = null;

while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.UPDATE_DELETE) {
updateFlag = true;
continue;
}

if (config.isUpsertSink()) {
stmt = prepareForUpsert(row);
if (row.getOp() == Data.Op.DELETE) {
deleteStatement = prepareDeleteStatement(row);
} else {
upsertStatement = prepareUpsertStatement(row);
}
} else {
stmt = prepareForAppendOnly(row);
}

try {
LOG.debug("Executing statement: {}", stmt);
stmt.executeUpdate();
stmt.clearParameters();
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, stmt, e.getMessage()))
.asRuntimeException();
insertStatement = prepareInsertStatement(row);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private PreparedStatement prepareForUpsert(SinkRow row) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.DELETE) {
stmt = prepareDeleteStatement(row);
} else {
stmt = prepareUpsertStatement(row);
try {
// Execute staging statements after all rows are prepared.
// We execute DELETE statement before to avoid accidentally deletion.
executeStatement(deleteStatement);
executeStatement(upsertStatement);
executeStatement(insertStatement);

conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
return stmt;
}

private PreparedStatement prepareForAppendOnly(SinkRow row) {
return prepareInsertStatement(row);
private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}

@Override
Expand All @@ -255,14 +271,6 @@ public void sync() {
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
}
try {
conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

@Override
Expand Down

0 comments on commit b8d9ad5

Please sign in to comment.