From c27fa6c1259aea2e5bd3674883a2d3e088be527d Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 1 Jun 2024 03:04:59 +0800 Subject: [PATCH] Move execute to DriverExecutor (#31513) --- .../driver/executor/DriverExecutor.java | 95 ++++++++++++++-- .../adapter/AbstractStatementAdapter.java | 41 ------- .../ShardingSpherePreparedStatement.java | 99 ++--------------- .../statement/ShardingSphereStatement.java | 103 ++---------------- 4 files changed, 104 insertions(+), 234 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java index f111e3528280a..86cf6ff939964 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.driver.executor; import lombok.Getter; +import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback; import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback; import org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback; import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback; @@ -63,12 +64,16 @@ import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; +import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement; +import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.traffic.executor.TrafficExecutor; import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback; import org.apache.shardingsphere.traffic.rule.TrafficRule; +import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback; +import org.apache.shardingsphere.transaction.rule.TransactionRule; import java.sql.Connection; import java.sql.PreparedStatement; @@ -259,21 +264,21 @@ private List getResultSets() throws SQLException { * @param prepareEngine prepare engine * @param trafficCallback traffic callback * @param updateCallback update callback - * @param isNeedImplicitCommitTransaction is need implicit commit transaction * @param statementReplayCallback statement replay callback - * @param executionContext execution context * @return updated row count * @throws SQLException SQL exception */ @SuppressWarnings("rawtypes") public int executeUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, - final DriverExecutionPrepareEngine prepareEngine, - final TrafficExecutorCallback trafficCallback, final ExecuteUpdateCallback updateCallback, final StatementReplayCallback statementReplayCallback, - final boolean isNeedImplicitCommitTransaction, final ExecutionContext executionContext) throws SQLException { + final DriverExecutionPrepareEngine prepareEngine, final TrafficExecutorCallback trafficCallback, + final ExecuteUpdateCallback updateCallback, final StatementReplayCallback statementReplayCallback) throws SQLException { + ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext); Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); if (trafficInstanceId.isPresent()) { return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback); } + boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction( + connection, queryContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1); return database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty() ? executeUpdate(database, updateCallback, queryContext.getSqlStatementContext(), executionContext, prepareEngine, isNeedImplicitCommitTransaction, statementReplayCallback) : accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback())); @@ -361,24 +366,92 @@ private int accumulate(final Collection results) { * @param queryContext query context * @param prepareEngine prepare engine * @param trafficCallback traffic callback + * @param executeCallback execute callback + * @param statementReplayCallback statement replay callback * @return execute result * @throws SQLException SQL exception */ - public Optional executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, - final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, - final TrafficExecutorCallback trafficCallback) throws SQLException { + @SuppressWarnings("rawtypes") + public boolean executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine, final TrafficExecutorCallback trafficCallback, + final ExecuteCallback executeCallback, final StatementReplayCallback statementReplayCallback) throws SQLException { Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); if (trafficInstanceId.isPresent()) { executeType = ExecuteType.TRAFFIC; - return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback)); + return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback); } if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) { executeType = ExecuteType.FEDERATION; ResultSet resultSet = sqlFederationEngine.executeQuery( prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())); - return Optional.of(null != resultSet); + return null != resultSet; + } + ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext); + if (!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { + Collection results = rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback()); + return results.iterator().next() instanceof QueryResult; + } + boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction( + connection, queryContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1); + return executeWithExecutionContext(database, executeCallback, executionContext, prepareEngine, isNeedImplicitCommitTransaction, statementReplayCallback); + } + + @SuppressWarnings("rawtypes") + private boolean executeWithExecutionContext(final ShardingSphereDatabase database, final ExecuteCallback executeCallback, final ExecutionContext executionContext, + final DriverExecutionPrepareEngine prepareEngine, + final boolean isNeedImplicitCommitTransaction, final StatementReplayCallback statementReplayCallback) throws SQLException { + return isNeedImplicitCommitTransaction + ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback), connection, + database.getProtocolType()) + : useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private boolean useDriverToExecute(final ShardingSphereDatabase database, final ExecuteCallback callback, final ExecutionContext executionContext, + final DriverExecutionPrepareEngine prepareEngine, final StatementReplayCallback statementReplayCallback) throws SQLException { + ExecutionGroupContext executionGroupContext = createExecutionGroupContext(database, executionContext, prepareEngine); + for (ExecutionGroup each : executionGroupContext.getInputGroups()) { + statements.addAll(getStatements(each)); + if (JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) { + parameterSets.addAll(getParameterSets(each)); + } } - return Optional.empty(); + statementReplayCallback.replay(statements, parameterSets); + JDBCExecutorCallback jdbcExecutorCallback = createExecuteCallback(database, callback, executionContext.getSqlStatementContext().getSqlStatement(), prepareEngine.getType()); + return regularExecutor.execute(executionGroupContext, executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback); + } + + private JDBCExecutorCallback createExecuteCallback(final ShardingSphereDatabase database, final ExecuteCallback executeCallback, + final SQLStatement sqlStatement, final String jdbcDriverType) { + boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); + return new JDBCExecutorCallback(database.getProtocolType(), database.getResourceMetaData(), sqlStatement, isExceptionThrown) { + + @Override + protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException { + return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? executeCallback.execute(sql, statement) : ((PreparedStatement) statement).execute(); + } + + @Override + protected Optional getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) { + return Optional.empty(); + } + }; + } + + private boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final SQLStatement sqlStatement, final boolean multiExecutionUnits) { + if (!connection.getAutoCommit()) { + return false; + } + TransactionType transactionType = connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType(); + boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction(); + if (!TransactionType.isDistributedTransaction(transactionType) || isInTransaction) { + return false; + } + return isWriteDMLStatement(sqlStatement) && multiExecutionUnits; + } + + private boolean isWriteDMLStatement(final SQLStatement sqlStatement) { + return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement); } /** diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java index 75646a43cde3a..d024fc4a6f468 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java @@ -26,16 +26,8 @@ import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; -import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; -import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; -import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement; -import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement; -import org.apache.shardingsphere.transaction.api.TransactionType; -import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback; -import org.apache.shardingsphere.transaction.rule.TransactionRule; -import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; @@ -61,39 +53,6 @@ public abstract class AbstractStatementAdapter extends WrapperAdapter implements private boolean closeOnCompletion; - protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final SQLStatement sqlStatement, final boolean multiExecutionUnits) { - if (!connection.getAutoCommit()) { - return false; - } - TransactionType transactionType = connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType(); - boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction(); - if (!TransactionType.isDistributedTransaction(transactionType) || isInTransaction) { - return false; - } - return isWriteDMLStatement(sqlStatement) && multiExecutionUnits; - } - - protected final T executeWithImplicitCommitTransaction(final ImplicitTransactionCallback callback, final Connection connection, final DatabaseType databaseType) throws SQLException { - T result; - try { - connection.setAutoCommit(false); - result = callback.execute(); - connection.commit(); - // CHECKSTYLE:OFF - } catch (final Exception ex) { - // CHECKSTYLE:ON - connection.rollback(); - throw SQLExceptionTransformEngine.toSQLException(ex, databaseType); - } finally { - connection.setAutoCommit(true); - } - return result; - } - - private boolean isWriteDMLStatement(final SQLStatement sqlStatement) { - return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement); - } - protected final void handleExceptionInTransaction(final ShardingSphereConnection connection, final MetaDataContexts metaDataContexts) { if (connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction()) { DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(); diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 1b6e78bee3eb0..151ffd1bfc226 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -38,31 +38,21 @@ import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException; import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback; -import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption; -import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine; import org.apache.shardingsphere.infra.hint.HintManager; import org.apache.shardingsphere.infra.hint.HintValueContext; import org.apache.shardingsphere.infra.hint.SQLHintUtils; @@ -74,7 +64,6 @@ import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.infra.route.context.RouteContext; import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute; -import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute; import org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorReusableRuleAttribute; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; @@ -115,8 +104,6 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState private final List> parameterSets; - private final SQLStatement sqlStatement; - private final SQLStatementContext sqlStatementContext; private final String databaseName; @@ -190,7 +177,7 @@ private ShardingSpherePreparedStatement(final ShardingSphereConnection connectio parameterSets = new ArrayList<>(); SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class); SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()); - sqlStatement = sqlParserEngine.parse(this.sql, true); + SQLStatement sqlStatement = sqlParserEngine.parse(this.sql, true); sqlStatementContext = new SQLBindEngine(metaDataContexts.getMetaData(), connection.getDatabaseName(), hintValueContext).bind(sqlStatement, Collections.emptyList()); databaseName = sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); @@ -243,10 +230,6 @@ public ResultSet executeQuery() throws SQLException { } } - private boolean hasRawExecutionRule() { - return !metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty(); - } - private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLException { if (AutoCommitUtils.needOpenTransaction(sqlStatement)) { connection.handleAutoCommit(); @@ -276,15 +259,13 @@ public int executeUpdate() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - ExecutionContext executionContext = createExecutionContext(queryContext); - boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction(connection, sqlStatementContext.getSqlStatement(), executionContext.getExecutionUnits().size() > 1); - int result = executor.executeUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), - (statement, sql) -> ((PreparedStatement) statement).executeUpdate(), null, (StatementReplayCallback) this::replay, - isNeedImplicitCommitTransaction, executionContext); + final int result = executor.executeUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), + (statement, sql) -> ((PreparedStatement) statement).executeUpdate(), null, (StatementReplayCallback) this::replay); for (Statement each : executor.getStatements()) { statements.add((PreparedStatement) each); } parameterSets.addAll(executor.getParameterSets()); + findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues())); return result; // CHECKSTYLE:OFF } catch (final RuntimeException ex) { @@ -307,18 +288,15 @@ public boolean execute() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - Optional advancedResult = executor.executeAdvance( - metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); - if (advancedResult.isPresent()) { - return advancedResult.get(); - } - ExecutionContext executionContext = createExecutionContext(queryContext); - if (hasRawExecutionRule()) { - Collection results = - executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback()); - return results.iterator().next() instanceof QueryResult; + final boolean result = executor.executeAdvance( + metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute(), + null, (StatementReplayCallback) this::replay); + for (Statement each : executor.getStatements()) { + statements.add((PreparedStatement) each); } - return executeWithExecutionContext(executionContext); + parameterSets.addAll(executor.getParameterSets()); + findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues())); + return result; // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -329,43 +307,6 @@ public boolean execute() throws SQLException { } } - private boolean executeWithExecutionContext(final ExecutionContext executionContext) throws SQLException { - return isNeedImplicitCommitTransaction(connection, sqlStatementContext.getSqlStatement(), executionContext.getExecutionUnits().size() > 1) - ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executionContext), connection, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()) - : useDriverToExecute(executionContext); - } - - private boolean useDriverToExecute(final ExecutionContext executionContext) throws SQLException { - ExecutionGroupContext executionGroupContext = createExecutionGroupContext(executionContext); - cacheStatements(executionGroupContext.getInputGroups()); - return executor.getRegularExecutor().execute(executionGroupContext, - executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback()); - } - - private JDBCExecutorCallback createExecuteCallback() { - boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); - return new JDBCExecutorCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) { - - @Override - protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException { - return ((PreparedStatement) statement).execute(); - } - - @Override - protected Optional getSaneResult(final SQLStatement sqlStatement, final SQLException ex) { - return Optional.empty(); - } - }; - } - - private ExecutionGroupContext createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); - return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), - new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - } - @Override public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { @@ -424,12 +365,6 @@ private ExecutionContext createExecutionContext(final QueryContext queryContext, return new ExecutionContext(queryContext, Collections.singletonList(executionUnit), new RouteContext()); } - private ExecutionGroupContext createRawExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); - return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules()) - .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - } - private QueryContext createQueryContext() { List params = new ArrayList<>(getParameters()); if (sqlStatementContext instanceof ParameterAware) { @@ -444,16 +379,6 @@ private MergedResult mergeQuery(final List queryResults, final SQLS return mergeEngine.merge(queryResults, sqlStatementContext); } - private void cacheStatements(final Collection> executionGroups) throws SQLException { - for (ExecutionGroup each : executionGroups) { - each.getInputs().forEach(eachInput -> { - statements.add((PreparedStatement) eachInput.getStorageResource()); - parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters()); - }); - } - replay(statements, parameterSets); - } - private void replay(final List statements, final List> parameterSets) throws SQLException { replaySetParameter(statements, parameterSets); for (Statement each : statements) { diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 8965c273fb172..6294f9cbe9b39 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -34,39 +34,22 @@ import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; -import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException; -import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; -import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback; -import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption; -import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine; import org.apache.shardingsphere.infra.hint.HintValueContext; import org.apache.shardingsphere.infra.hint.SQLHintUtils; import org.apache.shardingsphere.infra.merge.MergeEngine; import org.apache.shardingsphere.infra.merge.result.MergedResult; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; -import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; -import org.apache.shardingsphere.infra.metadata.user.Grantee; import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute; -import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.parser.rule.SQLParserRule; @@ -85,7 +68,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * ShardingSphere statement. @@ -105,8 +87,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter { @Getter(AccessLevel.PROTECTED) private final DriverExecutor executor; - private final KernelProcessor kernelProcessor; - @Getter(AccessLevel.PROTECTED) private final StatementManager statementManager; @@ -134,7 +114,6 @@ public ShardingSphereStatement(final ShardingSphereConnection connection, final statements = new LinkedList<>(); statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability); executor = new DriverExecutor(connection); - kernelProcessor = new KernelProcessor(); statementManager = new StatementManager(); batchStatementExecutor = new BatchStatementExecutor(this); databaseName = connection.getDatabaseName(); @@ -238,11 +217,10 @@ private int executeUpdate(final String sql, final ExecuteUpdateCallback updateCa connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); sqlStatementContext = queryContext.getSqlStatementContext(); - ExecutionContext executionContext = createExecutionContext(queryContext); - boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction(connection, sqlStatementContext.getSqlStatement(), executionContext.getExecutionUnits().size() > 1); + clearStatements(); int result = executor.executeUpdate( metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback, - (StatementReplayCallback) (statements, parameterSets) -> replay(statements), isNeedImplicitCommitTransaction, executionContext); + (StatementReplayCallback) (statements, parameterSets) -> replay(statements)); statements.addAll(executor.getStatements()); replay(statements); return result; @@ -309,16 +287,11 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); sqlStatementContext = queryContext.getSqlStatementContext(); - Optional advancedResult = executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); - if (advancedResult.isPresent()) { - return advancedResult.get(); - } - ExecutionContext executionContext = createExecutionContext(queryContext); - if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { - Collection results = executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), queryContext, new RawSQLExecutorCallback()); - return results.iterator().next() instanceof QueryResult; - } - return executeWithExecutionContext(executeCallback, executionContext); + clearStatements(); + boolean result = executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, + executeCallback, (StatementReplayCallback) (statements, parameterSets) -> replay(statements)); + statements.addAll(executor.getStatements()); + return result; } private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLException { @@ -332,6 +305,7 @@ private void clearStatements() throws SQLException { each.close(); } statements.clear(); + executor.clear(); } @Override @@ -358,67 +332,6 @@ private QueryContext createQueryContext(final String originSQL) { return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext); } - private ExecutionContext createExecutionContext(final QueryContext queryContext) throws SQLException { - clearStatements(); - RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData(); - ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName); - SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext()); - return kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), - connection.getDatabaseConnectionManager().getConnectionContext()); - } - - private ExecutionGroupContext createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); - return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), - new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - } - - private ExecutionGroupContext createRawExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); - return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules()) - .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - } - - private boolean executeWithExecutionContext(final ExecuteCallback executeCallback, final ExecutionContext executionContext) throws SQLException { - return isNeedImplicitCommitTransaction(connection, sqlStatementContext.getSqlStatement(), executionContext.getExecutionUnits().size() > 1) - ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executeCallback, executionContext), connection, - metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()) - : useDriverToExecute(executeCallback, executionContext); - } - - private boolean useDriverToExecute(final ExecuteCallback callback, final ExecutionContext executionContext) throws SQLException { - ExecutionGroupContext executionGroupContext = createExecutionGroupContext(executionContext); - cacheStatements(executionGroupContext.getInputGroups()); - JDBCExecutorCallback jdbcExecutorCallback = createExecuteCallback(callback, sqlStatementContext.getSqlStatement()); - return executor.getRegularExecutor().execute(executionGroupContext, - executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback); - } - - private void cacheStatements(final Collection> executionGroups) throws SQLException { - for (ExecutionGroup each : executionGroups) { - statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList())); - } - replay(statements); - } - - private JDBCExecutorCallback createExecuteCallback(final ExecuteCallback executeCallback, final SQLStatement sqlStatement) { - boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); - return new JDBCExecutorCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) { - - @Override - protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException { - return executeCallback.execute(sql, statement); - } - - @Override - protected Optional getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) { - return Optional.empty(); - } - }; - } - private void replay(final List statements) throws SQLException { for (Statement each : statements) { getMethodInvocationRecorder().replay(each);