diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java index 655c5f8836e46..4e097b0daccbf 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.infra.executor.sql.prepare.driver; +import lombok.Getter; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; @@ -45,6 +46,9 @@ public final class DriverExecutionPrepareEngine @SuppressWarnings("rawtypes") private static final Map TYPE_TO_BUILDER_MAP = new ConcurrentHashMap<>(8, 1F); + @Getter + private final String type; + private final DatabaseConnectionManager databaseConnectionManager; private final ExecutorStatementManager statementManager; @@ -60,6 +64,7 @@ public DriverExecutionPrepareEngine(final String type, final int maxConnectionsS final ExecutorStatementManager statementManager, final StorageResourceOption option, final Collection rules, final Map storageUnits) { super(maxConnectionsSizePerQuery, rules); + this.type = type; this.databaseConnectionManager = databaseConnectionManager; this.statementManager = statementManager; this.option = option; diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java index 510665e241a7c..ad349f07178dc 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java @@ -18,33 +18,56 @@ package org.apache.shardingsphere.driver.executor; import lombok.Getter; +import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback; +import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback; +import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback; import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; +import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.traffic.executor.TrafficExecutor; +import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback; +import org.apache.shardingsphere.traffic.rule.TrafficRule; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; +import java.util.Optional; /** * Driver executor. */ -@Getter public final class DriverExecutor implements AutoCloseable { + private final ShardingSphereConnection connection; + + @Getter private final DriverJDBCExecutor regularExecutor; + @Getter private final RawExecutor rawExecutor; + private final TrafficExecutor trafficExecutor; + private final SQLFederationEngine sqlFederationEngine; - private final TrafficExecutor trafficExecutor; + private ExecuteType executeType = ExecuteType.REGULAR; public DriverExecutor(final ShardingSphereConnection connection) { + this.connection = connection; MetaDataContexts metaDataContexts = connection.getContextManager().getMetaDataContexts(); ExecutorEngine executorEngine = connection.getContextManager().getExecutorEngine(); JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.getDatabaseConnectionManager().getConnectionContext()); @@ -52,18 +75,137 @@ public DriverExecutor(final ShardingSphereConnection connection) { rawExecutor = new RawExecutor(executorEngine, connection.getDatabaseConnectionManager().getConnectionContext()); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()); String schemaName = new DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(connection.getDatabaseName()); - sqlFederationEngine = new SQLFederationEngine(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor); trafficExecutor = new TrafficExecutor(); + sqlFederationEngine = new SQLFederationEngine(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor); } /** - * Close. + * Execute advance query. * + * @param metaData meta data + * @param database database + * @param queryContext query context + * @param prepareEngine prepare engine + * @return result set * @throws SQLException SQL exception */ + public Optional executeAdvanceQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine) throws SQLException { + Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); + if (trafficInstanceId.isPresent()) { + TrafficExecutorCallback trafficCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) + ? Statement::executeQuery + : ((statement, sql) -> ((PreparedStatement) statement).executeQuery()); + return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback)); + } + if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) { + ExecuteQueryCallback sqlFederationCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) + ? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(), + queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()) + : new PreparedStatementExecuteQueryCallback(database.getProtocolType(), + database.getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()); + return Optional.of(sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()))); + } + return Optional.empty(); + } + + /** + * Execute advance update. + * + * @param metaData meta data + * @param database database + * @param queryContext query context + * @param prepareEngine prepare engine + * @return updated row count + * @throws SQLException SQL exception + */ + public Optional executeAdvanceUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine) throws SQLException { + Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); + if (trafficInstanceId.isPresent()) { + return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), + trafficInstanceId.get(), queryContext, prepareEngine, (statement, sql) -> ((PreparedStatement) statement).executeUpdate())); + } + return Optional.empty(); + } + + /** + * Execute advance update. + * + * @param metaData meta data + * @param database database + * @param queryContext query context + * @param prepareEngine prepare engine + * @param trafficCallback traffic callback + * @return updated row count + * @throws SQLException SQL exception + */ + public Optional executeAdvanceUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine, + final TrafficExecutorCallback trafficCallback) throws SQLException { + Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); + if (trafficInstanceId.isPresent()) { + return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback)); + } + return Optional.empty(); + } + + /** + * Execute advance. + * + * @param metaData meta data + * @param database database + * @param queryContext query context + * @param prepareEngine prepare engine + * @param trafficCallback traffic callback + * @return execute result + * @throws SQLException SQL exception + */ + public Optional executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, + final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, + final TrafficExecutorCallback trafficCallback) throws SQLException { + Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); + if (trafficInstanceId.isPresent()) { + executeType = ExecuteType.TRAFFIC; + return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback)); + } + if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) { + executeType = ExecuteType.FEDERATION; + ExecuteQueryCallback sqlFederationCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) + ? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(), + queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()) + : new PreparedStatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(), + queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()); + ResultSet resultSet = sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())); + return Optional.of(null != resultSet); + } + return Optional.empty(); + } + + /** + * Get advanced result set. + * + * @return advanced result set + */ + public Optional getAdvancedResultSet() { + switch (executeType) { + case TRAFFIC: + return Optional.of(trafficExecutor.getResultSet()); + case FEDERATION: + return Optional.of(sqlFederationEngine.getResultSet()); + default: + return Optional.empty(); + } + } + @Override public void close() throws SQLException { sqlFederationEngine.close(); trafficExecutor.close(); } + + public enum ExecuteType { + + TRAFFIC, FEDERATION, REGULAR + } } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index fecffc9e8e193..35bf260aeda7a 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -83,8 +83,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException; import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.util.AutoCommitUtils; @@ -226,14 +224,9 @@ public ResultSet executeQuery() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - currentResultSet = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); - return currentResultSet; - } - if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { - currentResultSet = executeFederationQuery(queryContext); + Optional advancedResultSet = executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database)); + if (advancedResultSet.isPresent()) { + currentResultSet = advancedResultSet.get(); return currentResultSet; } executionContext = createExecutionContext(queryContext); @@ -260,25 +253,12 @@ private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionC return new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap); } - private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) { - return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData); - } - private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLException { if (AutoCommitUtils.needOpenTransaction(sqlStatement)) { connection.handleAutoCommit(); } } - private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); - ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); - ExecutionGroupContext context = - prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); - } - private void resetParameters() throws SQLException { parameterSets.clear(); parameterSets.add(getParameters()); @@ -298,14 +278,6 @@ private List executeQuery0(final ExecutionContext executionContext) SQLExecutorExceptionHandler.isExceptionThrown())); } - private ResultSet executeFederationQuery(final QueryContext queryContext) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(database.getProtocolType(), - database.getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()); - SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId()); - return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database), callback, context); - } - private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) { int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption, @@ -322,11 +294,10 @@ public int executeUpdate() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - return executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + Optional updatedCount = executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database)); + if (updatedCount.isPresent()) { + return updatedCount.get(); } executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { @@ -387,20 +358,12 @@ public boolean execute() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - boolean result = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); - currentResultSet = executor.getTrafficExecutor().getResultSet(); - return result; - } - if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { - ResultSet resultSet = executeFederationQuery(queryContext); - currentResultSet = resultSet; - return null != resultSet; + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + Optional advancedResult = executor.executeAdvance( + metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); + if (advancedResult.isPresent()) { + return advancedResult.get(); } - currentResultSet = null; executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { Collection results = @@ -414,8 +377,7 @@ public boolean execute() throws SQLException { handleExceptionInTransaction(connection, metaDataContexts); throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()); } finally { - batchPreparedStatementExecutor.clear(); - clearParameters(); + clearBatch(); } } @@ -477,6 +439,10 @@ public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } + Optional advancedResultSet = executor.getAdvancedResultSet(); + if (advancedResultSet.isPresent()) { + return advancedResultSet.get(); + } if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { List resultSets = getResultSets(); if (resultSets.isEmpty()) { diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 9becb70f884af..55edd1785d902 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -74,9 +74,7 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback; -import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.util.AutoCommitUtils; import java.sql.Connection; @@ -111,8 +109,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter { private final KernelProcessor kernelProcessor; - private final TrafficRule trafficRule; - @Getter(AccessLevel.PROTECTED) private final StatementManager statementManager; @@ -141,7 +137,6 @@ public ShardingSphereStatement(final ShardingSphereConnection connection, final statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability); executor = new DriverExecutor(connection); kernelProcessor = new KernelProcessor(); - trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class); statementManager = new StatementManager(); batchStatementExecutor = new BatchStatementExecutor(this); databaseName = connection.getDatabaseName(); @@ -156,14 +151,9 @@ public ResultSet executeQuery(final String sql) throws SQLException { databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - currentResultSet = executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), Statement::executeQuery); - return currentResultSet; - } - if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { - currentResultSet = executeFederationQuery(queryContext); + Optional advancedResultSet = executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database)); + if (advancedResultSet.isPresent()) { + currentResultSet = advancedResultSet.get(); return currentResultSet; } executionContext = createExecutionContext(queryContext); @@ -186,10 +176,6 @@ private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionC return new ShardingSphereResultSet(getResultSets(), mergedResult, this, selectContainsEnhancedTable, executionContext); } - private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) { - return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData); - } - private List executeQuery0(final ExecutionContext executionContext) throws SQLException { if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { return executor.getRawExecutor().execute( @@ -203,14 +189,6 @@ private List executeQuery0(final ExecutionContext executionContext) return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(), callback); } - private ResultSet executeFederationQuery(final QueryContext queryContext) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(database.getProtocolType(), - database.getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()); - SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId()); - return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database), callback, context); - } - private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) { int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption, @@ -291,11 +269,10 @@ private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateC handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - return executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + Optional updatedCount = executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); + if (updatedCount.isPresent()) { + return updatedCount.get(); } executionContext = createExecutionContext(queryContext); if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { @@ -393,24 +370,16 @@ public boolean execute(final String sql, final String[] columnNames) throws SQLE } private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback trafficCallback) throws SQLException { + currentResultSet = null; QueryContext queryContext = createQueryContext(sql); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); - Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); - if (trafficInstanceId.isPresent()) { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - boolean result = executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); - currentResultSet = executor.getTrafficExecutor().getResultSet(); - return result; - } - if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { - ResultSet resultSet = executeFederationQuery(queryContext); - currentResultSet = resultSet; - return null != resultSet; + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + Optional advancedResult = executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); + if (advancedResult.isPresent()) { + return advancedResult.get(); } - currentResultSet = null; executionContext = createExecutionContext(queryContext); if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { Collection results = executor.getRawExecutor().execute(createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback()); @@ -528,6 +497,10 @@ public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } + Optional advancedResultSet = executor.getAdvancedResultSet(); + if (advancedResultSet.isPresent()) { + return advancedResultSet.get(); + } if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { List resultSets = getResultSets(); if (resultSets.isEmpty()) {