Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto commit in jdbc adapter #28660

Merged
merged 5 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
private boolean closed;

protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final ExecutionContext executionContext) {
return isInDistributedTransaction(connection) && isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
return connection.getAutoCommit() && isNotInDistributedTransaction(connection) && isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
}

private boolean isInDistributedTransaction(final ShardingSphereConnection connection) {
private boolean isNotInDistributedTransaction(final ShardingSphereConnection connection) {
ConnectionTransaction connectionTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction();
boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().isInTransaction();
return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.api.TransactionType;

import java.sql.Array;
import java.sql.CallableStatement;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
if (databaseConnectionManager.getConnectionTransaction().isLocalTransaction()) {
processLocalTransaction();
} else {
processDistributeTransaction();
processDistributedTransaction();
}
}

Expand All @@ -163,12 +164,10 @@ private void processLocalTransaction() throws SQLException {
}
}

private void processDistributeTransaction() throws SQLException {
private void processDistributedTransaction() throws SQLException {
switch (databaseConnectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit)) {
case BEGIN:
databaseConnectionManager.close();
databaseConnectionManager.getConnectionTransaction().begin();
getConnectionContext().getTransactionContext().setInTransaction(true);
beginDistributedTransaction();
break;
case COMMIT:
databaseConnectionManager.getConnectionTransaction().commit();
Expand All @@ -178,6 +177,24 @@ private void processDistributeTransaction() throws SQLException {
}
}

private void beginDistributedTransaction() throws SQLException {
databaseConnectionManager.close();
databaseConnectionManager.getConnectionTransaction().begin();
getConnectionContext().getTransactionContext().setInTransaction(true);
}

/**
* Handle auto commit.
*
* @throws SQLException SQL exception
*/
public void handleAutoCommit() throws SQLException {
if (!autoCommit && TransactionType.isDistributedTransaction(databaseConnectionManager.getConnectionTransaction().getTransactionType())
&& !databaseConnectionManager.getConnectionTransaction().isInTransaction()) {
beginDistributedTransaction();
}
}

@Override
public void commit() throws SQLException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;

import java.sql.Connection;
import java.sql.ParameterMetaData;
Expand Down Expand Up @@ -267,6 +268,12 @@ private boolean decide(final QueryContext queryContext, final ShardingSphereData
return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
}

private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
connection.handleAutoCommit();
}
}

private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
Expand Down Expand Up @@ -338,6 +345,7 @@ public int executeUpdate() throws SQLException {
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
Expand Down Expand Up @@ -400,6 +408,7 @@ public boolean execute() throws SQLException {
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
Expand Down Expand Up @@ -454,6 +463,8 @@ private boolean executeWithImplicitCommitTransaction() throws SQLException {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand All @@ -469,6 +480,8 @@ private int executeUpdateWithImplicitCommitTransaction() throws SQLException {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;

import java.sql.Connection;
import java.sql.ResultSet;
Expand Down Expand Up @@ -310,6 +311,7 @@ private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLS

private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext);
databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
Expand All @@ -335,6 +337,8 @@ private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallba
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down Expand Up @@ -428,6 +432,7 @@ public boolean execute(final String sql, final String[] columnNames) throws SQLE
private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
try {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext);
databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
Expand All @@ -452,6 +457,12 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback
}
}

private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
connection.handleAutoCommit();
}
}

private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
Expand Down Expand Up @@ -527,6 +538,8 @@ private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callb
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private Collection<ExecutionContext> generateExecutionContexts() {
}

private boolean isNeedImplicitCommitTransaction(final Collection<ExecutionContext> executionContexts) {
if (!databaseConnectionManager.getConnectionSession().isAutoCommit()) {
return false;
}
TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
if (!TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) || transactionStatus.isInTransaction()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.shardingsphere.test.e2e.transaction.cases.alterresource;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

Expand All @@ -36,8 +34,8 @@
@TransactionTestCase(adapters = TransactionTestConstants.PROXY, scenario = "addResource")
public final class AddResourceTestCase extends BaseTransactionTestCase {

public AddResourceTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public AddResourceTestCase(final TransactionTestCaseParameter testCaseParam) {
super(testCaseParam);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

Expand All @@ -35,8 +33,8 @@
@Slf4j
public final class CloseResourceTestCase extends BaseTransactionTestCase {

public CloseResourceTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public CloseResourceTestCase(final TransactionTestCaseParameter testCaseParam) {
super(testCaseParam);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.e2e.transaction.cases.autocommit;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Auto commit transaction integration test.
*/
public abstract class AutoCommitTestCase extends BaseTransactionTestCase {

protected AutoCommitTestCase(final TransactionTestCaseParameter testCaseParam) {
super(testCaseParam);
}

protected void assertAutoCommitWithStatement() throws SQLException {
try (Connection connection = getDataSource().getConnection()) {
connection.setAutoCommit(false);
executeWithLog(connection, "DELETE FROM account");
assertFalse(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (1, 1, 1)");
connection.commit();
assertFalse(connection.getAutoCommit());
executeUpdateWithLog(connection, "INSERT INTO account VALUES (2, 2, 2)");
connection.commit();
assertFalse(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (3, 3, 3)");
connection.rollback();
assertFalse(connection.getAutoCommit());
assertAccountRowCount(connection, 2);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (4, 4, 4)");
assertAccountRowCount(connection, 3);
}
}

protected void assertAutoCommitWithPrepareStatement() throws SQLException {
try (Connection connection = getDataSource().getConnection()) {
connection.setAutoCommit(false);
executeWithLog(connection, "DELETE FROM account");
assertFalse(connection.getAutoCommit());
PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO account VALUES(?, ?, ?)");
setPrepareStatementParameters(prepareStatement, 1);
prepareStatement.execute();
connection.commit();
assertFalse(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 2);
prepareStatement.executeUpdate();
connection.commit();
assertFalse(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 3);
prepareStatement.execute();
connection.rollback();
assertFalse(connection.getAutoCommit());
assertAccountRowCount(connection, 2);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 4);
prepareStatement.execute();
assertAccountRowCount(connection, 3);
}
}

private void setPrepareStatementParameters(final PreparedStatement prepareStatement, final int value) throws SQLException {
prepareStatement.setInt(1, value);
prepareStatement.setInt(2, value);
prepareStatement.setInt(3, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.shardingsphere.test.e2e.transaction.cases.autocommit;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;
import org.apache.shardingsphere.transaction.api.TransactionType;
import org.awaitility.Awaitility;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,16 +33,20 @@
/**
* MySQL auto commit transaction integration test.
*/
@TransactionTestCase(dbTypes = TransactionTestConstants.MYSQL, transactionTypes = TransactionType.LOCAL)
public final class MySQLAutoCommitTestCase extends BaseTransactionTestCase {
@TransactionTestCase(dbTypes = TransactionTestConstants.MYSQL)
public final class MySQLAutoCommitTestCase extends AutoCommitTestCase {

public MySQLAutoCommitTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public MySQLAutoCommitTestCase(final TransactionTestCaseParameter testCaseParam) {
super(testCaseParam);
}

@Override
public void executeTest(final TransactionContainerComposer containerComposer) throws SQLException {
assertAutoCommit();
if (TransactionType.LOCAL == getTransactionType()) {
assertAutoCommit();
}
assertAutoCommitWithStatement();
assertAutoCommitWithPrepareStatement();
}

private void assertAutoCommit() throws SQLException {
Expand Down
Loading