Skip to content

Commit

Permalink
Move execute to DriverExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 31, 2024
1 parent e383088 commit 41dbae5
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.driver.executor;

import lombok.Getter;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
Expand Down Expand Up @@ -167,9 +168,9 @@ private TrafficExecutorCallback<ResultSet> getTrafficExecuteQueryCallback(final
private ExecuteQueryCallback getExecuteQueryCallback(final ShardingSphereDatabase database, final QueryContext queryContext, final String jdbcDriverType) {
return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
: new PreparedStatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down Expand Up @@ -285,7 +286,7 @@ private int executeUpdate(final ShardingSphereDatabase database, final ExecuteUp
final StatementReplayCallback statementReplayCallback) throws SQLException {
return isNeedImplicitCommitTransaction
? executeWithImplicitCommitTransaction(() -> useDriverToExecuteUpdate(
database, updateCallback, sqlStatementContext, executionContext, prepareEngine, statementReplayCallback), connection, database.getProtocolType())
database, updateCallback, sqlStatementContext, executionContext, prepareEngine, statementReplayCallback), connection, database.getProtocolType())
: useDriverToExecuteUpdate(database, updateCallback, sqlStatementContext, executionContext, prepareEngine, statementReplayCallback);
}

Expand Down Expand Up @@ -361,24 +362,76 @@ private int accumulate(final Collection<ExecuteResult> results) {
* @param queryContext query context
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
* @param isNeedImplicitCommitTransaction is need implicit commit transaction
* @param executeCallback execute callback
* @param statementReplayCallback statement replay callback
* @param executionContext execution context
* @return execute result
* @throws SQLException SQL exception
*/
public Optional<Boolean> executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database,
final QueryContext queryContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
@SuppressWarnings("rawtypes")
public boolean executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database,
final QueryContext queryContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Boolean> trafficCallback, final boolean isNeedImplicitCommitTransaction,
final ExecuteCallback executeCallback, final StatementReplayCallback statementReplayCallback, final ExecutionContext executionContext) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
executeType = ExecuteType.TRAFFIC;
return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback));
return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback);
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
return Optional.of(null != resultSet);
return null != resultSet;
}
if (!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
return executeWithExecutionContext(database, executeCallback, executionContext, prepareEngine, isNeedImplicitCommitTransaction, statementReplayCallback);
}

@SuppressWarnings("rawtypes")
private boolean executeWithExecutionContext(final ShardingSphereDatabase database, final ExecuteCallback executeCallback, final ExecutionContext executionContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction, final StatementReplayCallback statementReplayCallback) throws SQLException {
return isNeedImplicitCommitTransaction
? executeWithImplicitCommitTransaction(() -> useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback), connection,
database.getProtocolType())
: useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private boolean useDriverToExecute(final ShardingSphereDatabase database, final ExecuteCallback callback, final ExecutionContext executionContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final StatementReplayCallback statementReplayCallback) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(database, executionContext, prepareEngine);
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
statements.addAll(getStatements(each));
if (JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
parameterSets.addAll(getParameterSets(each));
}
}
return Optional.empty();
statementReplayCallback.replay(statements, parameterSets);
JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(database, callback, executionContext.getSqlStatementContext().getSqlStatement(), prepareEngine.getType());
return regularExecutor.execute(executionGroupContext, executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
}

private JDBCExecutorCallback<Boolean> createExecuteCallback(final ShardingSphereDatabase database, final ExecuteCallback executeCallback,
final SQLStatement sqlStatement, final String jdbcDriverType) {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
return new JDBCExecutorCallback<Boolean>(database.getProtocolType(), database.getResourceMetaData(), sqlStatement, isExceptionThrown) {

@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? executeCallback.execute(sql, statement) : ((PreparedStatement) statement).execute();
}

@Override
protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
return Optional.empty();
}
};
}

/**
Expand Down
Loading

0 comments on commit 41dbae5

Please sign in to comment.