diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java index 084d1ac29ffff..c44cc8a245633 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java @@ -18,17 +18,20 @@ package org.apache.shardingsphere.driver.jdbc.core.connection; import lombok.Getter; +import org.apache.shardingsphere.driver.exception.ConnectionClosedException; import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter; import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData; import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement; import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement; -import org.apache.shardingsphere.driver.exception.ConnectionClosedException; import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine; import org.apache.shardingsphere.infra.metadata.user.Grantee; import org.apache.shardingsphere.infra.session.connection.ConnectionContext; +import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.traffic.engine.TrafficEngine; +import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.rule.TransactionRule; @@ -40,6 +43,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; +import java.util.Optional; /** * ShardingSphere connection. @@ -212,6 +216,29 @@ public void handleAutoCommit() throws SQLException { } } + /** + * Get traffic tnstance ID. + * + * @param trafficRule traffic rule + * @param queryContext query context + * @return traffic tnstance ID + */ + public Optional getTrafficInstanceId(final TrafficRule trafficRule, final QueryContext queryContext) { + if (null == trafficRule || trafficRule.getStrategyRules().isEmpty()) { + return Optional.empty(); + } + Optional existedTrafficInstanceId = databaseConnectionManager.getConnectionContext().getTrafficInstanceId(); + if (existedTrafficInstanceId.isPresent()) { + return existedTrafficInstanceId; + } + boolean isHoldTransaction = isHoldTransaction(); + Optional result = new TrafficEngine(trafficRule, contextManager.getComputeNodeInstanceContext()).dispatch(queryContext, isHoldTransaction); + if (isHoldTransaction && result.isPresent()) { + databaseConnectionManager.getConnectionContext().setTrafficInstanceId(result.get()); + } + return result; + } + @Override public void commit() throws SQLException { try { diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index b7dc4c334e54e..fecffc9e8e193 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -68,7 +68,6 @@ import org.apache.shardingsphere.infra.hint.HintManager; import org.apache.shardingsphere.infra.hint.HintValueContext; import org.apache.shardingsphere.infra.hint.SQLHintUtils; -import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.merge.MergeEngine; import org.apache.shardingsphere.infra.merge.result.MergedResult; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -85,7 +84,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.traffic.engine.TrafficEngine; import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException; import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.util.AutoCommitUtils; @@ -228,10 +226,10 @@ public ResultSet executeQuery() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { currentResultSet = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); + trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); return currentResultSet; } if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { @@ -281,24 +279,6 @@ private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanc return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); } - private Optional getInstanceIdAndSet(final QueryContext queryContext) { - Optional result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId(); - if (!result.isPresent()) { - result = getInstanceId(queryContext); - } - if (connection.isHoldTransaction() && result.isPresent()) { - connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get()); - } - return result; - } - - private Optional getInstanceId(final QueryContext queryContext) { - ComputeNodeInstanceContext computeNodeInstanceContext = connection.getContextManager().getComputeNodeInstanceContext(); - return null != trafficRule && !trafficRule.getStrategyRules().isEmpty() - ? new TrafficEngine(trafficRule, computeNodeInstanceContext).dispatch(queryContext, connection.isHoldTransaction()) - : Optional.empty(); - } - private void resetParameters() throws SQLException { parameterSets.clear(); parameterSets.add(getParameters()); @@ -342,11 +322,11 @@ public int executeUpdate() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); return executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); + trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); } executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { @@ -407,11 +387,11 @@ public boolean execute() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); boolean result = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, - trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); + trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } @@ -624,8 +604,8 @@ private String getGeneratedKeysColumnName(final String columnName) { public void addBatch() { try { QueryContext queryContext = createQueryContext(); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - executionContext = null == trafficInstanceId ? createExecutionContext(queryContext) : createExecutionContext(queryContext, trafficInstanceId); + executionContext = connection.getTrafficInstanceId(trafficRule, queryContext) + .map(optional -> createExecutionContext(queryContext, optional)).orElseGet(() -> createExecutionContext(queryContext)); batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits()); } finally { currentResultSet = null; diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index d2b2e12dfe2c0..9becb70f884af 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -62,7 +62,6 @@ import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine; import org.apache.shardingsphere.infra.hint.HintValueContext; import org.apache.shardingsphere.infra.hint.SQLHintUtils; -import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.merge.MergeEngine; import org.apache.shardingsphere.infra.merge.result.MergedResult; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -76,7 +75,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.traffic.engine.TrafficEngine; import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback; import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.util.AutoCommitUtils; @@ -158,10 +156,10 @@ public ResultSet executeQuery(final String sql) throws SQLException { databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { currentResultSet = executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), Statement::executeQuery); + connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), Statement::executeQuery); return currentResultSet; } if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { @@ -192,24 +190,6 @@ private boolean decide(final QueryContext queryContext, final ShardingSphereData return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData); } - private Optional getInstanceIdAndSet(final QueryContext queryContext) { - Optional result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId(); - if (!result.isPresent()) { - result = getInstanceId(queryContext); - } - if (connection.isHoldTransaction() && result.isPresent()) { - connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get()); - } - return result; - } - - private Optional getInstanceId(final QueryContext queryContext) { - ComputeNodeInstanceContext computeNodeInstanceContext = connection.getContextManager().getComputeNodeInstanceContext(); - return null != trafficRule && !trafficRule.getStrategyRules().isEmpty() - ? new TrafficEngine(trafficRule, computeNodeInstanceContext).dispatch(queryContext, connection.isHoldTransaction()) - : Optional.empty(); - } - private List executeQuery0(final ExecutionContext executionContext) throws SQLException { if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { return executor.getRawExecutor().execute( @@ -311,11 +291,11 @@ private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateC handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); return executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); + connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); } executionContext = createExecutionContext(queryContext); if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { @@ -417,11 +397,11 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); - String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); - if (null != trafficInstanceId) { + Optional trafficInstanceId = connection.getTrafficInstanceId(trafficRule, queryContext); + if (trafficInstanceId.isPresent()) { ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); boolean result = executor.getTrafficExecutor().execute( - connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); + connection.getProcessId(), databaseName, trafficInstanceId.get(), queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } diff --git a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java index fddefd4740f5d..7706ac2365005 100644 --- a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java +++ b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java @@ -47,23 +47,6 @@ public final class TrafficExecutor implements AutoCloseable { @Getter private ResultSet resultSet; - /** - * Execute. - * - * @param executionUnit execution unit - * @param callback traffic executor callback - * @param return type - * @return execute result - * @throws SQLException SQL exception - */ - public T execute(final JDBCExecutionUnit executionUnit, final TrafficExecutorCallback callback) throws SQLException { - SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit(); - cacheStatement(sqlUnit.getParameters(), executionUnit.getStorageResource()); - T result = callback.execute(statement, sqlUnit.getSql()); - resultSet = statement.getResultSet(); - return result; - } - /** * Execute. *