Skip to content

Commit

Permalink
Refactor DriverExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 29, 2024
1 parent 32d4e57 commit acdda07
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,36 @@
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
Expand All @@ -45,7 +65,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Driver executor.
Expand All @@ -66,6 +92,14 @@ public final class DriverExecutor implements AutoCloseable {

private ExecuteType executeType = ExecuteType.REGULAR;

private final KernelProcessor kernelProcessor;

@Getter
private final Collection<Statement> statements = new LinkedList<>();

@Getter
private final Collection<List<Object>> parameterSets = new LinkedList<>();

public DriverExecutor(final ShardingSphereConnection connection) {
this.connection = connection;
MetaDataContexts metaDataContexts = connection.getContextManager().getMetaDataContexts();
Expand All @@ -76,6 +110,7 @@ public DriverExecutor(final ShardingSphereConnection connection) {
String schemaName = new DatabaseTypeRegistry(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
trafficExecutor = new TrafficExecutor();
sqlFederationEngine = new SQLFederationEngine(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
kernelProcessor = new KernelProcessor();
}

/**
Expand All @@ -85,35 +120,126 @@ public DriverExecutor(final ShardingSphereConnection connection) {
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
* @param statement statement
* @param columnLabelAndIndexMap column label and index map
* @return result set
* @throws SQLException SQL exception
*/
public Optional<ResultSet> executeAdvanceQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
public ResultSet executeAdvanceQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement,
final Map<String, Integer> columnLabelAndIndexMap) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
return Optional.of(trafficExecutor.execute(
connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, getTrafficExecutorCallback(prepareEngine)));
return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, getTrafficExecutorCallback(prepareEngine));
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
return Optional.of(sqlFederationEngine.executeQuery(
prepareEngine, getSQLFederationCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())));
return sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
}
return Optional.empty();
return doExecuteQuery(metaData, database, queryContext, prepareEngine, statement, columnLabelAndIndexMap);
}

private TrafficExecutorCallback<ResultSet> getTrafficExecutorCallback(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
return JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) ? Statement::executeQuery : ((statement, sql) -> ((PreparedStatement) statement).executeQuery());
}

private ExecuteQueryCallback getSQLFederationCallback(final ShardingSphereDatabase database, final QueryContext queryContext, final String jdbcDriverType) {
private ExecuteQueryCallback getExecuteQueryCallback(final ShardingSphereDatabase database, final QueryContext queryContext, final String jdbcDriverType) {
return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
: new PreparedStatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
}

private ShardingSphereResultSet doExecuteQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement,
final Map<String, Integer> columnLabelAndIndexMap) throws SQLException {
List<QueryResult> queryResults = executeQuery0(metaData, database, queryContext, prepareEngine);
MergedResult mergedResult = mergeQuery(metaData, database, queryResults, queryContext.getSqlStatementContext());
boolean selectContainsEnhancedTable = queryContext.getSqlStatementContext() instanceof SelectStatementContext
&& ((SelectStatementContext) queryContext.getSqlStatementContext()).isContainsEnhancedTable();
List<ResultSet> resultSets = getResultSets();
return new ShardingSphereResultSet(resultSets, mergedResult, statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
null == columnLabelAndIndexMap
? ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(), selectContainsEnhancedTable, resultSets.get(0).getMetaData())
: columnLabelAndIndexMap);
}

private List<QueryResult> executeQuery0(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext);
if (hasRawExecutionRule(database)) {
return rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext),
queryContext, new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new Grantee("", "")));
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
statements.addAll(getStatements(each));
if (JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
parameterSets.addAll(getParameterSets(each));
}
}
return regularExecutor.executeQuery(executionGroupContext, queryContext, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
}

private Collection<Statement> getStatements(final ExecutionGroup<JDBCExecutionUnit> executionGroup) {
Collection<Statement> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionGroup.getInputs()) {
result.add(each.getStorageResource());
}
return result;
}

private Collection<List<Object>> getParameterSets(final ExecutionGroup<JDBCExecutionUnit> executionGroup) {
Collection<List<Object>> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionGroup.getInputs()) {
result.add(each.getExecutionUnit().getSqlUnit().getParameters());
}
return result;
}

private ExecutionContext createExecutionContext(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext) throws SQLException {
clearStatements();
RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, database, null, queryContext.getHintValueContext());
return kernelProcessor.generateExecutionContext(queryContext, database, globalRuleMetaData, metaData.getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
}

private void clearStatements() throws SQLException {
for (Statement each : statements) {
each.close();
}
statements.clear();
}

private boolean hasRawExecutionRule(final ShardingSphereDatabase database) {
return !database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}

private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionGroupContext(final ShardingSphereMetaData metaData,
final ShardingSphereDatabase database, final ExecutionContext executionContext) throws SQLException {
int maxConnectionsSizePerQuery = metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, database.getRuleMetaData().getRules()).prepare(
executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new Grantee("", "")));
}

private MergedResult mergeQuery(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database,
final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
MergeEngine mergeEngine = new MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
return mergeEngine.merge(queryResults, sqlStatementContext);
}

private List<ResultSet> getResultSets() throws SQLException {
List<ResultSet> result = new ArrayList<>(statements.size());
for (Statement each : statements) {
if (null != each.getResultSet()) {
result.add(each.getResultSet());
}
}
return result;
}

/**
* Execute advance update.
*
Expand Down Expand Up @@ -157,7 +283,7 @@ public Optional<Boolean> executeAdvance(final ShardingSphereMetaData metaData, f
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getSQLFederationCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
return Optional.of(null != resultSet);
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.driver.jdbc.core.resultset;

import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractResultSetAdapter;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
Expand Down Expand Up @@ -62,6 +63,7 @@ public final class ShardingSphereResultSet extends AbstractResultSetAdapter {

private final MergedResult mergeResultSet;

@Getter
private final Map<String, Integer> columnLabelAndIndexMap;

public ShardingSphereResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final boolean selectContainsEnhancedTable,
Expand Down
Loading

0 comments on commit acdda07

Please sign in to comment.