From 0f65f187cbf9008454501aa1b2f22c24ca584f5a Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sat, 1 Jun 2024 23:19:05 +0800 Subject: [PATCH] Add metaData as field for DriverExecutor --- .../driver/executor/DriverExecutor.java | 44 +++++++++---------- .../ShardingSpherePreparedStatement.java | 7 ++- .../statement/ShardingSphereStatement.java | 7 ++- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java index 23bcb365ba790..8d2430f191c64 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java @@ -62,7 +62,6 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee; import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute; import org.apache.shardingsphere.infra.session.query.QueryContext; -import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement; @@ -95,6 +94,8 @@ public final class DriverExecutor implements AutoCloseable { private final ShardingSphereConnection connection; + private final ShardingSphereMetaData metaData; + private final DriverJDBCExecutor regularExecutor; private final RawExecutor rawExecutor; @@ -115,21 +116,20 @@ public final class DriverExecutor implements AutoCloseable { public DriverExecutor(final ShardingSphereConnection connection) { this.connection = connection; - MetaDataContexts metaDataContexts = connection.getContextManager().getMetaDataContexts(); + metaData = connection.getContextManager().getMetaDataContexts().getMetaData(); ExecutorEngine executorEngine = connection.getContextManager().getExecutorEngine(); JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.getDatabaseConnectionManager().getConnectionContext()); regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), connection.getContextManager(), jdbcExecutor); rawExecutor = new RawExecutor(executorEngine, connection.getDatabaseConnectionManager().getConnectionContext()); - String schemaName = new DatabaseTypeRegistry(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName()); + String schemaName = new DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName()); trafficExecutor = new TrafficExecutor(); - sqlFederationEngine = new SQLFederationEngine(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor); + sqlFederationEngine = new SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData, connection.getContextManager().getMetaDataContexts().getStatistics(), jdbcExecutor); kernelProcessor = new KernelProcessor(); } /** * Execute query. * - * @param metaData meta data * @param database database * @param queryContext query context * @param prepareEngine prepare engine @@ -140,7 +140,7 @@ public DriverExecutor(final ShardingSphereConnection connection) { * @throws SQLException SQL exception */ @SuppressWarnings("rawtypes") - public ResultSet executeQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + public ResultSet executeQuery(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, final Statement statement, final Map columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) throws SQLException { Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); @@ -152,8 +152,8 @@ public ResultSet executeQuery(final ShardingSphereMetaData metaData, final Shard return sqlFederationEngine.executeQuery( prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())); } - List queryResults = executePushDownQuery(metaData, database, queryContext, prepareEngine, statementReplayCallback); - MergedResult mergedResult = mergeQuery(metaData, database, queryResults, queryContext.getSqlStatementContext()); + List queryResults = executePushDownQuery(database, queryContext, prepareEngine, statementReplayCallback); + MergedResult mergedResult = mergeQuery(database, queryResults, queryContext.getSqlStatementContext()); boolean selectContainsEnhancedTable = queryContext.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) queryContext.getSqlStatementContext()).isContainsEnhancedTable(); List resultSets = getResultSets(); @@ -176,12 +176,12 @@ private ExecuteQueryCallback getExecuteQueryCallback(final ShardingSphereDatabas } @SuppressWarnings({"rawtypes", "unchecked"}) - private List executePushDownQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + private List executePushDownQuery(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, final StatementReplayCallback statementReplayCallback) throws SQLException { - ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext); + ExecutionContext executionContext = createExecutionContext(database, queryContext); if (hasRawExecutionRule(database)) { - return rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), + return rawExecutor.execute(createRawExecutionGroupContext(database, executionContext), queryContext, new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList()); } ExecutionGroupContext executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), @@ -216,7 +216,7 @@ private Collection> getParameterSets(final ExecutionGroup createRawExecutionGroupContext(final ShardingSphereMetaData metaData, - final ShardingSphereDatabase database, final ExecutionContext executionContext) throws SQLException { + private ExecutionGroupContext createRawExecutionGroupContext(final ShardingSphereDatabase database, final ExecutionContext executionContext) throws SQLException { int maxConnectionsSizePerQuery = metaData.getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, database.getRuleMetaData().getRules()).prepare( executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new Grantee("", ""))); } - private MergedResult mergeQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, - final List queryResults, final SQLStatementContext sqlStatementContext) throws SQLException { + private MergedResult mergeQuery(final ShardingSphereDatabase database, final List queryResults, final SQLStatementContext sqlStatementContext) throws SQLException { MergeEngine mergeEngine = new MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), connection.getDatabaseConnectionManager().getConnectionContext()); return mergeEngine.merge(queryResults, sqlStatementContext); } @@ -256,7 +254,6 @@ private List getResultSets() throws SQLException { /** * Execute update. * - * @param metaData meta data * @param database database * @param queryContext query context * @param prepareEngine prepare engine @@ -267,20 +264,20 @@ private List getResultSets() throws SQLException { * @throws SQLException SQL exception */ @SuppressWarnings("rawtypes") - public int executeUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + public int executeUpdate(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, final TrafficExecutorCallback trafficCallback, final ExecuteUpdateCallback updateCallback, final StatementReplayCallback statementReplayCallback) throws SQLException { Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); if (trafficInstanceId.isPresent()) { return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback); } - ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext); + ExecutionContext executionContext = createExecutionContext(database, queryContext); return database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty() ? executeUpdate(database, updateCallback, queryContext.getSqlStatementContext(), executionContext, prepareEngine, isNeedImplicitCommitTransaction(connection, queryContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1), statementReplayCallback) - : accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback())); + : accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, executionContext), queryContext, new RawSQLExecutorCallback())); } @SuppressWarnings("rawtypes") @@ -360,7 +357,6 @@ private int accumulate(final Collection results) { /** * Execute advance. * - * @param metaData meta data * @param database database * @param queryContext query context * @param prepareEngine prepare engine @@ -371,7 +367,7 @@ private int accumulate(final Collection results) { * @throws SQLException SQL exception */ @SuppressWarnings("rawtypes") - public boolean executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext, + public boolean executeAdvance(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine prepareEngine, final TrafficExecutorCallback trafficCallback, final ExecuteCallback executeCallback, final StatementReplayCallback statementReplayCallback) throws SQLException { Optional trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext); @@ -385,9 +381,9 @@ public boolean executeAdvance(final ShardingSphereMetaData metaData, final Shard prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())); return null != resultSet; } - ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext); + ExecutionContext executionContext = createExecutionContext(database, queryContext); if (!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { - Collection results = rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback()); + Collection results = rawExecutor.execute(createRawExecutionGroupContext(database, executionContext), queryContext, new RawSQLExecutorCallback()); return results.iterator().next() instanceof QueryResult; } boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction( diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index a10c9e17b2a10..198fe9bb813c7 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -210,7 +210,7 @@ public ResultSet executeQuery() throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaData.getDatabase(databaseName); findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues())); - currentResultSet = executor.executeQuery(metaData, database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap, + currentResultSet = executor.executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap, (StatementReplayCallback) this::replay); if (currentResultSet instanceof ShardingSphereResultSet) { columnLabelAndIndexMap = ((ShardingSphereResultSet) currentResultSet).getColumnLabelAndIndexMap(); @@ -260,7 +260,7 @@ public int executeUpdate() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaData.getDatabase(databaseName); - final int result = executor.executeUpdate(metaData, database, queryContext, createDriverExecutionPrepareEngine(database), + final int result = executor.executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeUpdate(), null, (StatementReplayCallback) this::replay); for (Statement each : executor.getStatements()) { statements.add((PreparedStatement) each); @@ -289,8 +289,7 @@ public boolean execute() throws SQLException { QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); ShardingSphereDatabase database = metaData.getDatabase(databaseName); - final boolean result = executor.executeAdvance( - metaData, database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute(), + final boolean result = executor.executeAdvance(database, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute(), null, (StatementReplayCallback) this::replay); for (Statement each : executor.getStatements()) { statements.add((PreparedStatement) each); diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 9c63028f24f95..7cb715a0b5fa6 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -129,7 +129,7 @@ public ResultSet executeQuery(final String sql) throws SQLException { connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); ShardingSphereDatabase database = metaData.getDatabase(databaseName); sqlStatementContext = queryContext.getSqlStatementContext(); - currentResultSet = executor.executeQuery(metaData, database, queryContext, createDriverExecutionPrepareEngine(database), this, null, + currentResultSet = executor.executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, null, (StatementReplayCallback) (statements, parameterSets) -> replay(statements)); statements.addAll(executor.getStatements()); return currentResultSet; @@ -218,8 +218,7 @@ private int executeUpdate(final String sql, final ExecuteUpdateCallback updateCa ShardingSphereDatabase database = metaData.getDatabase(databaseName); sqlStatementContext = queryContext.getSqlStatementContext(); clearStatements(); - int result = executor.executeUpdate( - metaData, database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback, + int result = executor.executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback, (StatementReplayCallback) (statements, parameterSets) -> replay(statements)); statements.addAll(executor.getStatements()); replay(statements); @@ -288,7 +287,7 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback ShardingSphereDatabase database = metaData.getDatabase(databaseName); sqlStatementContext = queryContext.getSqlStatementContext(); clearStatements(); - boolean result = executor.executeAdvance(metaData, database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, + boolean result = executor.executeAdvance(database, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback, executeCallback, (StatementReplayCallback) (statements, parameterSets) -> replay(statements)); statements.addAll(executor.getStatements()); return result;