Skip to content

Commit

Permalink
Move execute to DriverExecutor (#31513)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored May 31, 2024
1 parent e383088 commit c27fa6c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.driver.executor;

import lombok.Getter;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
Expand Down Expand Up @@ -63,12 +64,16 @@
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.api.TransactionType;
import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
import org.apache.shardingsphere.transaction.rule.TransactionRule;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -259,21 +264,21 @@ private List<ResultSet> getResultSets() throws SQLException {
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
* @param updateCallback update callback
* @param isNeedImplicitCommitTransaction is need implicit commit transaction
* @param statementReplayCallback statement replay callback
* @param executionContext execution context
* @return updated row count
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public int executeUpdate(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Integer> trafficCallback, final ExecuteUpdateCallback updateCallback, final StatementReplayCallback statementReplayCallback,
final boolean isNeedImplicitCommitTransaction, final ExecutionContext executionContext) throws SQLException {
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final TrafficExecutorCallback<Integer> trafficCallback,
final ExecuteUpdateCallback updateCallback, final StatementReplayCallback statementReplayCallback) throws SQLException {
ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext);
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback);
}
boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction(
connection, queryContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1);
return database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
? executeUpdate(database, updateCallback, queryContext.getSqlStatementContext(), executionContext, prepareEngine, isNeedImplicitCommitTransaction, statementReplayCallback)
: accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback()));
Expand Down Expand Up @@ -361,24 +366,92 @@ private int accumulate(final Collection<ExecuteResult> results) {
* @param queryContext query context
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
* @param executeCallback execute callback
* @param statementReplayCallback statement replay callback
* @return execute result
* @throws SQLException SQL exception
*/
public Optional<Boolean> executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database,
final QueryContext queryContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
@SuppressWarnings("rawtypes")
public boolean executeAdvance(final ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final TrafficExecutorCallback<Boolean> trafficCallback,
final ExecuteCallback executeCallback, final StatementReplayCallback statementReplayCallback) throws SQLException {
Optional<String> trafficInstanceId = connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class), queryContext);
if (trafficInstanceId.isPresent()) {
executeType = ExecuteType.TRAFFIC;
return Optional.of(trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback));
return trafficExecutor.execute(connection.getProcessId(), database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, trafficCallback);
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database, queryContext, prepareEngine.getType()), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
return Optional.of(null != resultSet);
return null != resultSet;
}
ExecutionContext executionContext = createExecutionContext(metaData, database, queryContext);
if (!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
Collection<ExecuteResult> results = rawExecutor.execute(createRawExecutionGroupContext(metaData, database, executionContext), queryContext, new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
boolean isNeedImplicitCommitTransaction = isNeedImplicitCommitTransaction(
connection, queryContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1);
return executeWithExecutionContext(database, executeCallback, executionContext, prepareEngine, isNeedImplicitCommitTransaction, statementReplayCallback);
}

@SuppressWarnings("rawtypes")
private boolean executeWithExecutionContext(final ShardingSphereDatabase database, final ExecuteCallback executeCallback, final ExecutionContext executionContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction, final StatementReplayCallback statementReplayCallback) throws SQLException {
return isNeedImplicitCommitTransaction
? executeWithImplicitCommitTransaction(() -> useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback), connection,
database.getProtocolType())
: useDriverToExecute(database, executeCallback, executionContext, prepareEngine, statementReplayCallback);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private boolean useDriverToExecute(final ShardingSphereDatabase database, final ExecuteCallback callback, final ExecutionContext executionContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final StatementReplayCallback statementReplayCallback) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(database, executionContext, prepareEngine);
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
statements.addAll(getStatements(each));
if (JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
parameterSets.addAll(getParameterSets(each));
}
}
return Optional.empty();
statementReplayCallback.replay(statements, parameterSets);
JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(database, callback, executionContext.getSqlStatementContext().getSqlStatement(), prepareEngine.getType());
return regularExecutor.execute(executionGroupContext, executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
}

private JDBCExecutorCallback<Boolean> createExecuteCallback(final ShardingSphereDatabase database, final ExecuteCallback executeCallback,
final SQLStatement sqlStatement, final String jdbcDriverType) {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
return new JDBCExecutorCallback<Boolean>(database.getProtocolType(), database.getResourceMetaData(), sqlStatement, isExceptionThrown) {

@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? executeCallback.execute(sql, statement) : ((PreparedStatement) statement).execute();
}

@Override
protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
return Optional.empty();
}
};
}

private boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final SQLStatement sqlStatement, final boolean multiExecutionUnits) {
if (!connection.getAutoCommit()) {
return false;
}
TransactionType transactionType = connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
if (!TransactionType.isDistributedTransaction(transactionType) || isInTransaction) {
return false;
}
return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
}

private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,8 @@
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.transaction.api.TransactionType;
import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
import org.apache.shardingsphere.transaction.rule.TransactionRule;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
Expand All @@ -61,39 +53,6 @@ public abstract class AbstractStatementAdapter extends WrapperAdapter implements

private boolean closeOnCompletion;

protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final SQLStatement sqlStatement, final boolean multiExecutionUnits) {
if (!connection.getAutoCommit()) {
return false;
}
TransactionType transactionType = connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
if (!TransactionType.isDistributedTransaction(transactionType) || isInTransaction) {
return false;
}
return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
}

protected final <T> T executeWithImplicitCommitTransaction(final ImplicitTransactionCallback<T> callback, final Connection connection, final DatabaseType databaseType) throws SQLException {
T result;
try {
connection.setAutoCommit(false);
result = callback.execute();
connection.commit();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
} finally {
connection.setAutoCommit(true);
}
return result;
}

private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
}

protected final void handleExceptionInTransaction(final ShardingSphereConnection connection, final MetaDataContexts metaDataContexts) {
if (connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction()) {
DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
Expand Down
Loading

0 comments on commit c27fa6c

Please sign in to comment.