Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(jdbc-sink): execute statements in batch and set isolation level to RC #12250

Merged
merged 10 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static TableSchema getTestTableSchema() {

static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test2";
createMockTable(container.getJdbcUrl(), tableName, testType);
JDBCSink sink =
new JDBCSink(
Expand All @@ -97,12 +97,13 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
sink.sync();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test");
int count;
for (count = 0; rs.next(); ) {
count++;
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
int count;
for (count = 0; rs.next(); ) {
count++;
}
assertEquals(1, count);
}
assertEquals(1, count);

sink.write(
Iterators.forArray(
Expand All @@ -116,20 +117,22 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT * FROM test");
for (count = 0; rs.next(); ) {
count++;
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
int count;
for (count = 0; rs.next(); ) {
count++;
}
assertEquals(2, count);
}
assertEquals(2, count);
stmt.close();

sink.sync();
sink.drop();
}

static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test1";
createMockTable(container.getJdbcUrl(), tableName, testType);

JDBCSink sink =
Expand All @@ -138,6 +141,7 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
getTestTableSchema());
assertEquals(tableName, sink.getTableName());
Connection conn = sink.getConn();
Statement stmt = conn.createStatement();

sink.write(
Iterators.forArray(
Expand All @@ -158,7 +162,16 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Time(1000000000),
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes()),
"I want to sleep".getBytes())));

// chunk will commit after sink.write()
try (var rs = stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s", tableName))) {
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
}

sink.write(
Iterators.forArray(
new ArraySinkRow(
Op.UPDATE_DELETE,
1,
Expand Down Expand Up @@ -186,30 +199,30 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test");
rs.next();
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
assertTrue(rs.next());

// check if rows are inserted
assertEquals(1, rs.getInt(1));
assertEquals("Clare", rs.getString(2));
assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString());
assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString());
assertEquals(new Timestamp(2000000000), rs.getTimestamp(5));
assertEquals(
"{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", rs.getString(6));
assertEquals("I want to eat", new String(rs.getBytes(7)));
assertFalse(rs.next());
// check if rows are inserted
assertEquals(1, rs.getInt(1));
assertEquals("Clare", rs.getString(2));
assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString());
assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString());
assertEquals(new Timestamp(2000000000), rs.getTimestamp(5));
assertEquals(
"{\"key\": \"password\", \"value\": \"Singularity123123123123\"}",
rs.getString(6));
assertEquals("I want to eat", new String(rs.getBytes(7)));
assertFalse(rs.next());
}

sink.sync();
stmt.close();
}

static void testJDBCDrop(JdbcDatabaseContainer<?> container, TestType testType)
throws SQLException {
String tableName = "test";
String tableName = "test3";
createMockTable(container.getJdbcUrl(), tableName, testType);

JDBCSink sink =
Expand Down Expand Up @@ -237,8 +250,8 @@ public void testPostgres() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
pg.start();
testJDBCSync(pg, TestType.TestPg);
testJDBCWrite(pg, TestType.TestPg);
testJDBCSync(pg, TestType.TestPg);
testJDBCDrop(pg, TestType.TestPg);
pg.stop();
}
Expand All @@ -254,8 +267,8 @@ public void testMySQL() throws SQLException {
.withUrlParam("user", "postgres")
.withUrlParam("password", "password");
mysql.start();
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCWrite(mysql, TestType.TestMySQL);
testJDBCSync(mysql, TestType.TestMySQL);
testJDBCDrop(mysql, TestType.TestMySQL);
mysql.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,6 +40,7 @@ public class JDBCSink extends SinkWriterBase {
private PreparedStatement deletePreparedStmt;

private boolean updateFlag = false;

private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class);

public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
Expand All @@ -59,9 +58,17 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;
try {
this.conn = DriverManager.getConnection(config.getJdbcUrl());
this.conn.setAutoCommit(false);
this.pkColumnNames =
getPkColumnNames(conn, config.getTableName(), config.getSchemaName());
// disable auto commit can improve performance
this.conn.setAutoCommit(false);
// explicitly set isolation level to RC
this.conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

LOG.info(
"JDBC connection: autoCommit = {}, trxn = {}",
conn.getAutoCommit(),
conn.getTransactionIsolation());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -133,6 +140,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) {
try {
var preparedStmt = insertPreparedStmt;
jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row);
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
Expand All @@ -149,7 +157,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
switch (row.getOp()) {
case INSERT:
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
return preparedStmt;
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
Expand All @@ -158,12 +166,14 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
}
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
updateFlag = false;
return preparedStmt;
break;
default:
throw Status.FAILED_PRECONDITION
.withDescription("unexpected op type: " + row.getOp())
.asRuntimeException();
}
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -192,6 +202,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
deletePreparedStmt.setObject(placeholderIdx++, fromRow);
}
deletePreparedStmt.addBatch();
return deletePreparedStmt;
} catch (SQLException e) {
throw Status.INTERNAL
Expand All @@ -203,48 +214,53 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {

@Override
public void write(Iterator<SinkRow> rows) {
PreparedStatement deleteStatement = null;
PreparedStatement upsertStatement = null;
PreparedStatement insertStatement = null;

while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.UPDATE_DELETE) {
updateFlag = true;
continue;
}

if (config.isUpsertSink()) {
stmt = prepareForUpsert(row);
if (row.getOp() == Data.Op.DELETE) {
deleteStatement = prepareDeleteStatement(row);
} else {
upsertStatement = prepareUpsertStatement(row);
}
} else {
stmt = prepareForAppendOnly(row);
}

try {
LOG.debug("Executing statement: {}", stmt);
stmt.executeUpdate();
stmt.clearParameters();
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, stmt, e.getMessage()))
.asRuntimeException();
insertStatement = prepareInsertStatement(row);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private PreparedStatement prepareForUpsert(SinkRow row) {
PreparedStatement stmt;
if (row.getOp() == Data.Op.DELETE) {
stmt = prepareDeleteStatement(row);
} else {
stmt = prepareUpsertStatement(row);
try {
// Execute staging statements after all rows are prepared.
// We execute DELETE statement before to avoid accidentally deletion.
executeStatement(deleteStatement);
executeStatement(upsertStatement);
executeStatement(insertStatement);

conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
return stmt;
}

private PreparedStatement prepareForAppendOnly(SinkRow row) {
return prepareInsertStatement(row);
private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}

@Override
Expand All @@ -255,14 +271,6 @@ public void sync() {
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
}
try {
conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

@Override
Expand Down