Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DriverExecutor #31446

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public DriverExecutor(final ShardingSphereConnection connection) {
}

/**
* Execute advance query.
* Execute query.
*
* @param metaData meta data
* @param database database
Expand All @@ -128,9 +128,9 @@ public DriverExecutor(final ShardingSphereConnection connection) {
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public ResultSet executeAdvanceQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement,
final Map<String, Integer> columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) throws SQLException {
public ResultSet executeQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement,
final Map<String, Integer> columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
return trafficExecutor.execute(
Expand All @@ -140,7 +140,15 @@ public ResultSet executeAdvanceQuery(final ShardingSphereMetaData metaData, fina
return sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
}
return doExecuteQuery(metaData, database, queryContext, prepareEngine, statement, columnLabelAndIndexMap, statementReplayCallback);
List<QueryResult> queryResults = executePushDownQuery(metaData, database, queryContext, prepareEngine, statementReplayCallback);
MergedResult mergedResult = mergeQuery(metaData, database, queryResults, queryContext.getSqlStatementContext());
boolean selectContainsEnhancedTable = queryContext.getSqlStatementContext() instanceof SelectStatementContext
&& ((SelectStatementContext) queryContext.getSqlStatementContext()).isContainsEnhancedTable();
List<ResultSet> resultSets = getResultSets();
return new ShardingSphereResultSet(resultSets, mergedResult, statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
null == columnLabelAndIndexMap
? ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(), selectContainsEnhancedTable, resultSets.get(0).getMetaData())
: columnLabelAndIndexMap);
}

private TrafficExecutorCallback<ResultSet> getTrafficExecuteQueryCallback(final String jdbcDriverType) {
Expand All @@ -155,25 +163,10 @@ private ExecuteQueryCallback getExecuteQueryCallback(final ShardingSphereDatabas
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
}

@SuppressWarnings("rawtypes")
private ShardingSphereResultSet doExecuteQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement,
final Map<String, Integer> columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) throws SQLException {
List<QueryResult> queryResults = executeQuery0(metaData, database, queryContext, prepareEngine, statementReplayCallback);
MergedResult mergedResult = mergeQuery(metaData, database, queryResults, queryContext.getSqlStatementContext());
boolean selectContainsEnhancedTable = queryContext.getSqlStatementContext() instanceof SelectStatementContext
&& ((SelectStatementContext) queryContext.getSqlStatementContext()).isContainsEnhancedTable();
List<ResultSet> resultSets = getResultSets();
return new ShardingSphereResultSet(resultSets, mergedResult, statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
null == columnLabelAndIndexMap
? ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(), selectContainsEnhancedTable, resultSets.get(0).getMetaData())
: columnLabelAndIndexMap);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private List<QueryResult> executeQuery0(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementReplayCallback statementReplayCallback) throws SQLException {
private List<QueryResult> executePushDownQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementReplayCallback statementReplayCallback) throws SQLException {
ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext);
if (hasRawExecutionRule(database)) {
return rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext),
Expand All @@ -191,6 +184,10 @@ private List<QueryResult> executeQuery0(final ShardingSphereMetaData metaData, f
return regularExecutor.executeQuery(executionGroupContext, queryContext, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
}

private boolean hasRawExecutionRule(final ShardingSphereDatabase database) {
return !database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}

private Collection<Statement> getStatements(final ExecutionGroup<JDBCExecutionUnit> executionGroup) {
Collection<Statement> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionGroup.getInputs()) {
Expand Down Expand Up @@ -221,10 +218,6 @@ private void clearStatements() throws SQLException {
statements.clear();
}

private boolean hasRawExecutionRule(final ShardingSphereDatabase database) {
return !database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}

private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionGroupContext(final ShardingSphereMetaData metaData,
final ShardingSphereDatabase database, final ExecutionContext executionContext) throws SQLException {
int maxConnectionsSizePerQuery = metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public ResultSet executeQuery() throws SQLException {
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
currentResultSet = executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
currentResultSet = executor.executeQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
(StatementReplayCallback<PreparedStatement>) this::replay);
if (currentResultSet instanceof ShardingSphereResultSet) {
columnLabelAndIndexMap = ((ShardingSphereResultSet) currentResultSet).getColumnLabelAndIndexMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public ResultSet executeQuery(final String sql) throws SQLException {
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
currentResultSet = executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), this, null,
currentResultSet = executor.executeQuery(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database), this, null,
(StatementReplayCallback<Statement>) (statements, parameterSets) -> replay(statements));
statements.addAll(executor.getStatements());
return currentResultSet;
Expand Down
Loading