diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java index ca3a5165184ad..0995292e0dbb9 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java @@ -135,7 +135,10 @@ private void tryFlush(final DataSource dataSource, final List buffer private void doFlush(final DataSource dataSource, final List buffer) throws SQLException { try (Connection connection = dataSource.getConnection()) { - connection.setAutoCommit(false); + boolean enableTransaction = buffer.size() > 1; + if (enableTransaction) { + connection.setAutoCommit(false); + } switch (buffer.get(0).getType()) { case IngestDataChangeType.INSERT: if (null != rateLimitAlgorithm) { @@ -158,45 +161,12 @@ private void doFlush(final DataSource dataSource, final List buffer) default: break; } - connection.commit(); - } - } - - private void doFlush(final Connection connection, final List buffer) { - // TODO it's better use transaction, but execute delete maybe not effect when open transaction of PostgreSQL sometimes - for (DataRecord each : buffer) { - try { - doFlush(connection, each); - } catch (final SQLException ex) { - throw new PipelineImporterJobWriteException(String.format("Write failed, record=%s", each), ex); + if (enableTransaction) { + connection.commit(); } } } - private void doFlush(final Connection connection, final DataRecord dataRecord) throws SQLException { - switch (dataRecord.getType()) { - case IngestDataChangeType.INSERT: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1); - } - executeBatchInsert(connection, Collections.singletonList(dataRecord)); - break; - case IngestDataChangeType.UPDATE: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1); - } - executeUpdate(connection, dataRecord); - break; - case IngestDataChangeType.DELETE: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1); - } - executeBatchDelete(connection, Collections.singletonList(dataRecord)); - break; - default: - } - } - private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord); @@ -279,11 +249,11 @@ private void executeBatchDelete(final Connection connection, final List buffer) { - if (buffer.isEmpty()) { - return; - } - try (Connection connection = dataSource.getConnection()) { - doFlush(connection, buffer); + // TODO it's better use transaction, but execute delete maybe not effect when open transaction of PostgreSQL sometimes + try { + for (DataRecord each : buffer) { + doFlush(dataSource, Collections.singletonList(each)); + } } catch (final SQLException ex) { throw new PipelineImporterJobWriteException(ex); }