Skip to content

Commit

Permalink
Refactor DriverExecutor (#31422)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored May 28, 2024
1 parent 79f4760 commit 5c394e4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,40 +93,27 @@ public Optional<ResultSet> executeAdvanceQuery(final ShardingSphereMetaData meta
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
TrafficExecutorCallback<ResultSet> trafficCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
? Statement::executeQuery
: ((statement, sql) -> ((PreparedStatement) statement).executeQuery());
return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback));
return Optional.of(trafficExecutor.execute(
connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, getTrafficExecutorCallback(prepareEngine)));
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
ExecuteQueryCallback sqlFederationCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
: new PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
return Optional.of(sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())));
return Optional.of(sqlFederationEngine.executeQuery(
prepareEngine, getSQLFederationCallback(database, queryContext, prepareEngine), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId())));
}
return Optional.empty();
}

/**
* Execute advance update.
*
* @param metaData meta data
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
* @return updated row count
* @throws SQLException SQL exception
*/
public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine, (statement, sql) -> ((PreparedStatement) statement).executeUpdate()));
}
return Optional.empty();
private TrafficExecutorCallback<ResultSet> getTrafficExecutorCallback(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
return JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) ? Statement::executeQuery : ((statement, sql) -> ((PreparedStatement) statement).executeQuery());
}

private ExecuteQueryCallback getSQLFederationCallback(final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
return JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
: new PreparedStatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
}

/**
Expand Down Expand Up @@ -171,12 +158,8 @@ public Optional<Boolean> executeAdvance(final ShardingSphereMetaData metaData, f
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ExecuteQueryCallback sqlFederationCallback = JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
? new StatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown())
: new PreparedStatementExecuteQueryCallback(database.getProtocolType(), database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
ResultSet resultSet = sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getSQLFederationCallback(database, queryContext, prepareEngine), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
return Optional.of(null != resultSet);
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public int executeUpdate() throws SQLException {
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
Optional<Integer> updatedCount = executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database));
Optional<Integer> updatedCount = executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, queryContext, createDriverExecutionPrepareEngine(database),
(statement, sql) -> ((PreparedStatement) statement).executeUpdate());
if (updatedCount.isPresent()) {
return updatedCount.get();
}
Expand Down

0 comments on commit 5c394e4

Please sign in to comment.