From b8d9ad5147345134d33121edbbd16b5ab57a483a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 14 Sep 2023 18:42:34 +0800 Subject: [PATCH] refactor(jdbc-sink): execute statements in batch and set isolation level to RC (#12250) --- .../connector/sink/jdbc/JDBCSinkTest.java | 73 +++++++++------- .../com/risingwave/connector/JDBCSink.java | 86 ++++++++++--------- 2 files changed, 90 insertions(+), 69 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java index f38cd83b10e7c..da9b9d866583b 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java @@ -74,7 +74,7 @@ static TableSchema getTestTableSchema() { static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) throws SQLException { - String tableName = "test"; + String tableName = "test2"; createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = new JDBCSink( @@ -97,12 +97,13 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) sink.sync(); Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT * FROM test"); - int count; - for (count = 0; rs.next(); ) { - count++; + try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) { + int count; + for (count = 0; rs.next(); ) { + count++; + } + assertEquals(1, count); } - assertEquals(1, count); sink.write( Iterators.forArray( @@ -116,12 +117,14 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) "{\"key\": \"password\", \"value\": \"Singularity123\"}", "I want to sleep".getBytes()))); sink.sync(); - stmt = conn.createStatement(); - rs = stmt.executeQuery("SELECT * FROM test"); - for (count = 0; rs.next(); ) { - count++; + try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) { + int count; + for (count = 0; rs.next(); ) { + count++; + } + assertEquals(2, count); } - assertEquals(2, count); + stmt.close(); sink.sync(); sink.drop(); @@ -129,7 +132,7 @@ static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) throws SQLException { - String tableName = "test"; + String tableName = "test1"; createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = @@ -138,6 +141,7 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) getTestTableSchema()); assertEquals(tableName, sink.getTableName()); Connection conn = sink.getConn(); + Statement stmt = conn.createStatement(); sink.write( Iterators.forArray( @@ -158,7 +162,16 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) new Time(1000000000), new Timestamp(1000000000), "{\"key\": \"password\", \"value\": \"Singularity123\"}", - "I want to sleep".getBytes()), + "I want to sleep".getBytes()))); + + // chunk will commit after sink.write() + try (var rs = stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s", tableName))) { + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + } + + sink.write( + Iterators.forArray( new ArraySinkRow( Op.UPDATE_DELETE, 1, @@ -186,22 +199,22 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) new Timestamp(1000000000), "{\"key\": \"password\", \"value\": \"Singularity123\"}", "I want to sleep".getBytes()))); - sink.sync(); - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT * FROM test"); - rs.next(); + try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) { + assertTrue(rs.next()); - // check if rows are inserted - assertEquals(1, rs.getInt(1)); - assertEquals("Clare", rs.getString(2)); - assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString()); - assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString()); - assertEquals(new Timestamp(2000000000), rs.getTimestamp(5)); - assertEquals( - "{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", rs.getString(6)); - assertEquals("I want to eat", new String(rs.getBytes(7))); - assertFalse(rs.next()); + // check if rows are inserted + assertEquals(1, rs.getInt(1)); + assertEquals("Clare", rs.getString(2)); + assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString()); + assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString()); + assertEquals(new Timestamp(2000000000), rs.getTimestamp(5)); + assertEquals( + "{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", + rs.getString(6)); + assertEquals("I want to eat", new String(rs.getBytes(7))); + assertFalse(rs.next()); + } sink.sync(); stmt.close(); @@ -209,7 +222,7 @@ static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) static void testJDBCDrop(JdbcDatabaseContainer container, TestType testType) throws SQLException { - String tableName = "test"; + String tableName = "test3"; createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = @@ -237,8 +250,8 @@ public void testPostgres() throws SQLException { .withUrlParam("user", "postgres") .withUrlParam("password", "password"); pg.start(); - testJDBCSync(pg, TestType.TestPg); testJDBCWrite(pg, TestType.TestPg); + testJDBCSync(pg, TestType.TestPg); testJDBCDrop(pg, TestType.TestPg); pg.stop(); } @@ -254,8 +267,8 @@ public void testMySQL() throws SQLException { .withUrlParam("user", "postgres") .withUrlParam("password", "password"); mysql.start(); - testJDBCSync(mysql, TestType.TestMySQL); testJDBCWrite(mysql, TestType.TestMySQL); + testJDBCSync(mysql, TestType.TestMySQL); testJDBCDrop(mysql, TestType.TestMySQL); mysql.stop(); } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 216503b8d824b..fe23c7db5d846 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -22,9 +22,7 @@ import com.risingwave.proto.Data; import io.grpc.Status; import java.sql.*; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +40,7 @@ public class JDBCSink extends SinkWriterBase { private PreparedStatement deletePreparedStmt; private boolean updateFlag = false; + private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class); public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { @@ -59,9 +58,17 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { this.config = config; try { this.conn = DriverManager.getConnection(config.getJdbcUrl()); - this.conn.setAutoCommit(false); this.pkColumnNames = getPkColumnNames(conn, config.getTableName(), config.getSchemaName()); + // disable auto commit can improve performance + this.conn.setAutoCommit(false); + // explicitly set isolation level to RC + this.conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + + LOG.info( + "JDBC connection: autoCommit = {}, trxn = {}", + conn.getAutoCommit(), + conn.getTransactionIsolation()); } catch (SQLException e) { throw Status.INTERNAL .withDescription( @@ -133,6 +140,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) { try { var preparedStmt = insertPreparedStmt; jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row); + preparedStmt.addBatch(); return preparedStmt; } catch (SQLException e) { throw io.grpc.Status.INTERNAL @@ -149,7 +157,7 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) { switch (row.getOp()) { case INSERT: jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row); - return preparedStmt; + break; case UPDATE_INSERT: if (!updateFlag) { throw Status.FAILED_PRECONDITION @@ -158,12 +166,14 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) { } jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row); updateFlag = false; - return preparedStmt; + break; default: throw Status.FAILED_PRECONDITION .withDescription("unexpected op type: " + row.getOp()) .asRuntimeException(); } + preparedStmt.addBatch(); + return preparedStmt; } catch (SQLException e) { throw io.grpc.Status.INTERNAL .withDescription( @@ -192,6 +202,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) { Object fromRow = getTableSchema().getFromRow(primaryKey, row); deletePreparedStmt.setObject(placeholderIdx++, fromRow); } + deletePreparedStmt.addBatch(); return deletePreparedStmt; } catch (SQLException e) { throw Status.INTERNAL @@ -203,48 +214,53 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) { @Override public void write(Iterator rows) { + PreparedStatement deleteStatement = null; + PreparedStatement upsertStatement = null; + PreparedStatement insertStatement = null; + while (rows.hasNext()) { try (SinkRow row = rows.next()) { - PreparedStatement stmt; if (row.getOp() == Data.Op.UPDATE_DELETE) { updateFlag = true; continue; } - if (config.isUpsertSink()) { - stmt = prepareForUpsert(row); + if (row.getOp() == Data.Op.DELETE) { + deleteStatement = prepareDeleteStatement(row); + } else { + upsertStatement = prepareUpsertStatement(row); + } } else { - stmt = prepareForAppendOnly(row); - } - - try { - LOG.debug("Executing statement: {}", stmt); - stmt.executeUpdate(); - stmt.clearParameters(); - } catch (SQLException e) { - throw Status.INTERNAL - .withDescription( - String.format(ERROR_REPORT_TEMPLATE, stmt, e.getMessage())) - .asRuntimeException(); + insertStatement = prepareInsertStatement(row); } } catch (Exception e) { throw new RuntimeException(e); } } - } - private PreparedStatement prepareForUpsert(SinkRow row) { - PreparedStatement stmt; - if (row.getOp() == Data.Op.DELETE) { - stmt = prepareDeleteStatement(row); - } else { - stmt = prepareUpsertStatement(row); + try { + // Execute staging statements after all rows are prepared. + // We execute DELETE statement before to avoid accidentally deletion. + executeStatement(deleteStatement); + executeStatement(upsertStatement); + executeStatement(insertStatement); + + conn.commit(); + } catch (SQLException e) { + throw io.grpc.Status.INTERNAL + .withDescription( + String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) + .asRuntimeException(); } - return stmt; } - private PreparedStatement prepareForAppendOnly(SinkRow row) { - return prepareInsertStatement(row); + private void executeStatement(PreparedStatement stmt) throws SQLException { + if (stmt == null) { + return; + } + LOG.debug("Executing statement: {}", stmt); + stmt.executeBatch(); + stmt.clearParameters(); } @Override @@ -255,14 +271,6 @@ public void sync() { "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") .asRuntimeException(); } - try { - conn.commit(); - } catch (SQLException e) { - throw io.grpc.Status.INTERNAL - .withDescription( - String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) - .asRuntimeException(); - } } @Override