From 6867d040715d1850bad7ee5a2c2e375b46f5c79d Mon Sep 17 00:00:00 2001 From: Chuxin Chen Date: Thu, 19 Oct 2023 11:18:53 +0800 Subject: [PATCH] Optimize ShardingSpherePreparedStatement for multi executionContext (#28796) * Refactor ShardingSpherePreparedStatement for support multi executionContext. * Refactor ShardingSpherePreparedStatement for support multi executionContext. --- .../adapter/AbstractStatementAdapter.java | 16 +- .../ShardingSpherePreparedStatement.java | 8 +- .../statement/ShardingSphereStatement.java | 138 +++++++++++------- .../jdbc/adapter/StatementAdapterTest.java | 2 +- .../backend/connector/DatabaseConnector.java | 11 +- 5 files changed, 113 insertions(+), 62 deletions(-) diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java index dabd42d7d2a83..3ccbc6d1404b6 100644 --- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java +++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java @@ -57,7 +57,7 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat private boolean closed; - protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final ExecutionContext executionContext) { + protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final Collection executionContexts) { if (connection.getAutoCommit()) { return false; } @@ -66,11 +66,19 @@ protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConn if (!TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) || isInTransaction) { return false; } - return isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1; + if (1 == executionContexts.size()) { + SQLStatement sqlStatement = executionContexts.iterator().next().getSqlStatementContext().getSqlStatement(); + return isWriteDMLStatement(sqlStatement) && executionContexts.iterator().next().getExecutionUnits().size() > 1; + } + for (ExecutionContext each : executionContexts) { + if (isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) { + return true; + } + } + return false; } - private boolean isModifiedSQL(final ExecutionContext executionContext) { - SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement(); + private boolean isWriteDMLStatement(final SQLStatement sqlStatement) { return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement); } diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index beae1457f7895..228e07a9bce72 100644 --- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -20,7 +20,6 @@ import com.google.common.base.Strings; import lombok.AccessLevel; import lombok.Getter; -import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.driver.executor.DriverExecutor; import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit; import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor; @@ -33,15 +32,16 @@ import org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData; import org.apache.shardingsphere.driver.jdbc.exception.syntax.EmptySQLException; import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware; -import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; +import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; @@ -356,7 +356,7 @@ public int executeUpdate() throws SQLException { Collection executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback()); return accumulate(executeResults); } - return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate(); + return isNeedImplicitCommitTransaction(connection, Collections.singleton(executionContext)) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate(); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -426,7 +426,7 @@ public boolean execute() throws SQLException { Collection executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback()); return executeResults.iterator().next() instanceof QueryResult; } - return isNeedImplicitCommitTransaction(connection, executionContext) ? executeWithImplicitCommitTransaction() : useDriverToExecute(); + return isNeedImplicitCommitTransaction(connection, Collections.singleton(executionContext)) ? executeWithImplicitCommitTransaction() : useDriverToExecute(); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 1533f0a93dbc4..d787b19f5599d 100644 --- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import lombok.AccessLevel; import lombok.Getter; @@ -123,7 +124,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter { private boolean returnGeneratedKeys; - private ExecutionContext executionContext; + private Collection executionContexts; private ResultSet currentResultSet; @@ -174,12 +175,8 @@ public ResultSet executeQuery(final String sql) throws SQLException { if (useFederation) { return executeFederationQuery(queryContext); } - executionContext = createExecutionContext(queryContext); - List queryResults = executeQuery0(); - MergedResult mergedResult = mergeQuery(queryResults); - boolean selectContainsEnhancedTable = - executionContext.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) executionContext.getSqlStatementContext()).isContainsEnhancedTable(); - result = new ShardingSphereResultSet(getResultSets(), mergedResult, this, selectContainsEnhancedTable, executionContext); + executionContexts = createExecutionContext(queryContext); + result = doExecuteQuery(executionContexts); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -192,6 +189,20 @@ public ResultSet executeQuery(final String sql) throws SQLException { return result; } + private ShardingSphereResultSet doExecuteQuery(final Collection executionContexts) throws SQLException { + ShardingSphereResultSet result = null; + for (ExecutionContext each : executionContexts) { + List queryResults = executeQuery0(each); + MergedResult mergedResult = mergeQuery(queryResults, each.getSqlStatementContext()); + boolean selectContainsEnhancedTable = + each.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) each.getSqlStatementContext()).isContainsEnhancedTable(); + if (null == result) { + result = new ShardingSphereResultSet(getResultSets(), mergedResult, this, selectContainsEnhancedTable, each); + } + } + return result; + } + private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) { return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData); } @@ -214,12 +225,12 @@ private Optional getInstanceId(final QueryContext queryContext) { : Optional.empty(); } - private List executeQuery0() throws SQLException { + private List executeQuery0(final ExecutionContext executionContext) throws SQLException { if (metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance)) { return executor.getRawExecutor().execute( - createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList()); + createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList()); } - ExecutionGroupContext executionGroupContext = createExecutionGroupContext(); + ExecutionGroupContext executionGroupContext = createExecutionGroupContext(executionContext); cacheStatements(executionGroupContext.getInputGroups()); StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(), metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), executionContext.getSqlStatementContext().getSqlStatement(), @@ -304,9 +315,9 @@ public int executeUpdate(final String sql, final String[] columnNames) throws SQ } } - private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) throws SQLException { - return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction(updateCallback, sqlStatementContext) - : useDriverToExecuteUpdate(updateCallback, sqlStatementContext); + private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext, final Collection executionContexts) throws SQLException { + return isNeedImplicitCommitTransaction(connection, executionContexts) ? executeUpdateWithImplicitCommitTransaction(updateCallback, sqlStatementContext, executionContexts) + : useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContexts); } private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback trafficCallback) throws SQLException { @@ -319,18 +330,23 @@ private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateC JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); return executor.getTrafficExecutor().execute(executionUnit, trafficCallback); } - executionContext = createExecutionContext(queryContext); + executionContexts = createExecutionContext(queryContext); if (metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance)) { - return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback())); + Collection results = new LinkedList<>(); + for (ExecutionContext each : executionContexts) { + results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each), each.getQueryContext(), new RawSQLExecutorCallback())); + } + return accumulate(results); } - return executeUpdate(updateCallback, executionContext.getSqlStatementContext()); + return executeUpdate(updateCallback, queryContext.getSqlStatementContext(), executionContexts); } - private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) throws SQLException { + private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext, + final Collection executionContexts) throws SQLException { int result; try { connection.setAutoCommit(false); - result = useDriverToExecuteUpdate(updateCallback, sqlStatementContext); + result = useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContexts); connection.commit(); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { @@ -343,12 +359,21 @@ private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallba return result; } - private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) throws SQLException { - ExecutionGroupContext executionGroupContext = createExecutionGroupContext(); - cacheStatements(executionGroupContext.getInputGroups()); - JDBCExecutorCallback callback = createExecuteUpdateCallback(updateCallback, sqlStatementContext); - return executor.getRegularExecutor().executeUpdate(executionGroupContext, - executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), callback); + private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext, + final Collection executionContexts) throws SQLException { + Integer result = null; + Preconditions.checkArgument(!executionContexts.isEmpty()); + for (ExecutionContext each : executionContexts) { + ExecutionGroupContext executionGroupContext = createExecutionGroupContext(each); + cacheStatements(executionGroupContext.getInputGroups()); + JDBCExecutorCallback callback = createExecuteUpdateCallback(updateCallback, sqlStatementContext); + int effectedCount = executor.getRegularExecutor().executeUpdate(executionGroupContext, + each.getQueryContext(), each.getRouteContext().getRouteUnits(), callback); + if (null == result) { + result = effectedCount; + } + } + return result; } private JDBCExecutorCallback createExecuteUpdateCallback(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) { @@ -446,12 +471,16 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback ResultSet resultSet = executeFederationQuery(queryContext); return null != resultSet; } - executionContext = createExecutionContext(queryContext); + executionContexts = createExecutionContext(queryContext); if (metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance)) { - Collection results = executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback()); + Collection results = new LinkedList<>(); + for (ExecutionContext each : executionContexts) { + results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each), each.getQueryContext(), new RawSQLExecutorCallback())); + } return results.iterator().next() instanceof QueryResult; } - return isNeedImplicitCommitTransaction(connection, executionContext) ? executeWithImplicitCommitTransaction(executeCallback) : useDriverToExecute(executeCallback); + return isNeedImplicitCommitTransaction(connection, executionContexts) ? executeWithImplicitCommitTransaction(executeCallback, executionContexts) + : useDriverToExecute(executeCallback, executionContexts); } finally { currentResultSet = null; } @@ -507,31 +536,31 @@ private DatabaseType getDatabaseType() { return protocolType.getTrunkDatabaseType().orElse(protocolType); } - private ExecutionContext createExecutionContext(final QueryContext queryContext) throws SQLException { + private Collection createExecutionContext(final QueryContext queryContext) throws SQLException { clearStatements(); RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData(); ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName); SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext()); - return kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), - connection.getDatabaseConnectionManager().getConnectionContext()); + return Collections.singleton(kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), + connection.getDatabaseConnectionManager().getConnectionContext())); } - private ExecutionGroupContext createExecutionGroupContext() throws SQLException { + private ExecutionGroupContext createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(databaseName)); } - private ExecutionGroupContext createRawExecutionContext() throws SQLException { + private ExecutionGroupContext createRawExecutionContext(final ExecutionContext executionContext) throws SQLException { int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules()) .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(databaseName)); } - private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callback) throws SQLException { + private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callback, final Collection executionContexts) throws SQLException { boolean result; try { connection.setAutoCommit(false); - result = useDriverToExecute(callback); + result = useDriverToExecute(callback, executionContexts); connection.commit(); // CHECKSTYLE:OFF } catch (final Exception ex) { @@ -544,12 +573,20 @@ private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callb return result; } - private boolean useDriverToExecute(final ExecuteCallback callback) throws SQLException { - ExecutionGroupContext executionGroupContext = createExecutionGroupContext(); - cacheStatements(executionGroupContext.getInputGroups()); - JDBCExecutorCallback jdbcExecutorCallback = createExecuteCallback(callback, executionContext.getSqlStatementContext().getSqlStatement()); - return executor.getRegularExecutor().execute(executionGroupContext, - executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback); + private boolean useDriverToExecute(final ExecuteCallback callback, final Collection executionContexts) throws SQLException { + Boolean result = null; + Preconditions.checkArgument(!executionContexts.isEmpty()); + for (ExecutionContext each : executionContexts) { + ExecutionGroupContext executionGroupContext = createExecutionGroupContext(each); + cacheStatements(executionGroupContext.getInputGroups()); + JDBCExecutorCallback jdbcExecutorCallback = createExecuteCallback(callback, each.getSqlStatementContext().getSqlStatement()); + boolean isWrite = executor.getRegularExecutor().execute(executionGroupContext, + each.getQueryContext(), each.getRouteContext().getRouteUnits(), jdbcExecutorCallback); + if (null == result) { + result = isWrite; + } + } + return result; } private void cacheStatements(final Collection> executionGroups) throws SQLException { @@ -593,15 +630,16 @@ public ResultSet getResultSet() throws SQLException { if (useFederation) { return executor.getSqlFederationEngine().getResultSet(); } - if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { + if (executionContexts.iterator().next().getSqlStatementContext() instanceof SelectStatementContext + || executionContexts.iterator().next().getSqlStatementContext().getSqlStatement() instanceof DALStatement) { List resultSets = getResultSets(); if (resultSets.isEmpty()) { return currentResultSet; } - MergedResult mergedResult = mergeQuery(getQueryResults(resultSets)); - boolean selectContainsEnhancedTable = - executionContext.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) executionContext.getSqlStatementContext()).isContainsEnhancedTable(); - currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext); + SQLStatementContext sqlStatementContext = executionContexts.iterator().next().getSqlStatementContext(); + MergedResult mergedResult = mergeQuery(getQueryResults(resultSets), sqlStatementContext); + boolean selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable(); + currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContexts.iterator().next()); } return currentResultSet; } @@ -626,10 +664,10 @@ private List getQueryResults(final List resultSets) thro return result; } - private MergedResult mergeQuery(final List queryResults) throws SQLException { + private MergedResult mergeQuery(final List queryResults, final SQLStatementContext sqlStatementContext) throws SQLException { MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext()); - return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext()); + return mergeEngine.merge(queryResults, sqlStatementContext); } @SuppressWarnings("MagicConstant") @@ -652,7 +690,7 @@ public int getResultSetHoldability() { @Override public boolean isAccumulate() { return metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().findRules(DataNodeContainedRule.class).stream() - .anyMatch(each -> each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames())); + .anyMatch(each -> each.isNeedAccumulate(executionContexts.iterator().next().getSqlStatementContext().getTablesContext().getTableNames())); } @Override @@ -678,8 +716,8 @@ public ResultSet getGeneratedKeys() throws SQLException { } private Optional findGeneratedKey() { - return executionContext.getSqlStatementContext() instanceof InsertStatementContext - ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext() + return executionContexts.iterator().next().getSqlStatementContext() instanceof InsertStatementContext + ? ((InsertStatementContext) executionContexts.iterator().next().getSqlStatementContext()).getGeneratedKeyContext() : Optional.empty(); } diff --git a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java index e907a752eac05..816625d8614af 100644 --- a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java +++ b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java @@ -272,6 +272,6 @@ private ShardingSphereStatement mockShardingSphereStatementWithNeedAccumulate(fi @SneakyThrows(ReflectiveOperationException.class) private void setExecutionContext(final ShardingSphereStatement statement, final ExecutionContext executionContext) { - Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContext"), statement, executionContext); + Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContexts"), statement, Collections.singleton(executionContext)); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java index 6b45eff2eda6a..871853cec2590 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.proxy.backend.connector; import com.google.common.base.Preconditions; -import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware; import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; @@ -31,6 +30,8 @@ import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor; import org.apache.shardingsphere.infra.connection.refresher.MetaDataRefreshEngine; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; +import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; @@ -46,7 +47,6 @@ import org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils; import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule; import org.apache.shardingsphere.infra.session.query.QueryContext; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback; @@ -191,7 +191,12 @@ private boolean isNeedImplicitCommitTransaction(final Collection 1; } - return executionContexts.stream().anyMatch(each -> isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())); + for (ExecutionContext each : executionContexts) { + if (isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) { + return true; + } + } + return false; } private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {