diff --git a/README.md b/README.md index 62fcef3..4b357ca 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,238 @@ -# tcc +## TCC + +tcc框架用于解决分布式场景多服务间的分布式事务,该框架不建立在dubbo、http 、grpc等协议基础上面,服务可以任意选择暴露的协议, + +只要遵循Try 、Commit、 Cancel规范即可。已解决悬挂、幂等、空回滚问题,业务层面无需关注这部分处理。 + +**tcc分为以下几个阶段:** + +1. 执行前置动作 +2. Try +3. 执行本地事务 +4. Commit\Cancel (根据本地事务的执行的成员与否,进行commit还是cancel) + +## 示例 + +该示例主要用于用户下单的同时,需要扣减用户积分的场景,订单服务和积分服务分别是独立服务部署,它们之间存在分布式事务的问题, 我们通过当前框架展示是如何解决以上问题的。 + + + +步骤1:运行 com.damon.sample.points.PointsApplication + +步骤2:运行 com.damon.sample.order.TestRun + +### 下单服务 + +下单服务继承TccMainService服务 + +```java +package com.damon.sample.order.app; + +import cn.hutool.core.util.IdUtil; +import com.damon.sample.order.client.IOrderSubmitAppService; +import com.damon.sample.order.domain.IPointsGateway; +import com.damon.sample.order.domain.Order; +import com.damon.tcc.TccMainConfig; +import com.damon.tcc.TccMainService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; + +@Service +public class OrderSubmitAppService extends TccMainService, Order> implements IOrderSubmitAppService { + private final JdbcTemplate jdbcTemplate; + private final IPointsGateway pointsGateway; + + @Autowired + public OrderSubmitAppService(TccMainConfig config, IPointsGateway pointsGateway) { + super(config); + this.jdbcTemplate = new JdbcTemplate(config.getDataSource()); + this.pointsGateway = pointsGateway; + } + + /** + * 检查失败的日志,已纠正事务是否需要回顾还是提交 + */ + public void executeFailedLogCheck() { + super.executeFailedLogCheck(); + } + + /** + * 检查死亡的日志,已纠正事务是否需要回顾还是提交 + */ + public void executeDeadLogCheck() { + super.executeDeadLogCheck(); + } + /** + * 执行失败日志检查的时候需要回查请求参数(因为事务日志未记录方法请求参数,所以需要回查一下) + * @param bizId 实体对象id(业务id) + * @return + */ + @Override + protected Order callbackParameter(Long bizId) { + return jdbcTemplate.queryForObject("select * from tcc_demo_order where order_id = ? ", new BeanPropertyRowMapper<>(Order.class), bizId); + } + + /** + * 创建订单 (1 预先创建订单 2 执行try动作) + * + * @param userId + * @param points + * @return + */ + @Override + public Long submitOrder(Long userId, Long points) { + Long orderId = IdUtil.getSnowflakeNextId(); + jdbcTemplate.update("insert into tcc_demo_order(order_id, user_id, status, deduction_points) values (?, ?, ? ,? )", orderId, userId, 0, points); + Order order = new Order(orderId, 0, userId, points); + return super.process(order); + } + + /** + * try执行用户积分扣除 + * + * @param order + * @return + */ + @Override + protected Map attempt(Order order) { + Boolean result = pointsGateway.tryDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); + Map map = new HashMap<>(); + map.put("flag", result); + return map; + } + + @Override + protected Long executeLocalTransaction(Order object, Map map) { + int result = jdbcTemplate.update("update tcc_demo_order set status = ? where order_id = ? ", 1, object.getOrderId()); + if (result == 0) { + throw new RuntimeException("无效的订单id : " + object.getOrderId()); + } + return object.getOrderId(); + } + + /** + * commit积分 + * + * @param order + */ + @Override + protected void commit(Order order) { + pointsGateway.commitDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); + } + + /** + * cancel回滚积分 + * + * @param order + */ + @Override + protected void cancel(Order order) { + pointsGateway.cancelDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); + } +} + +``` + +### 积分服务 + +积分服务继承TccSubService服务 + +```java +package com.damon.sample.points.app; + +import com.damon.sample.points.client.IPointsDeductionAppService; +import com.damon.sample.points.client.PointsDeductCmd; +import com.damon.tcc.TccSubConfig; +import com.damon.tcc.TccSubService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +@Service +public class PointsDeductionAppService extends TccSubService implements IPointsDeductionAppService { + private final Logger log = LoggerFactory.getLogger(PointsDeductionAppService.class); + private final JdbcTemplate jdbcTemplate; + + @Autowired + public PointsDeductionAppService(TccSubConfig config) { + super(config); + this.jdbcTemplate = new JdbcTemplate(config.getDataSource()); + } + + /** + * try执行积分扣减 + * @param parameter + * @return + */ + @Override + public boolean attempt(PointsDeductCmd parameter) { + return super.attempt(parameter, cmd -> { + int result = jdbcTemplate.update("update tcc_demo_user_points set points = points - ? where user_id = ? and points - ? >= 0", + cmd.getDeductionPoints(), cmd.getUserId(), cmd.getDeductionPoints()); + boolean transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); + + if (result == 0) { + throw new RuntimeException("用户积分不足 || 用户不存在"); + } + + int result2 = jdbcTemplate.update("insert tcc_demo_points_changing_Log (user_id, change_points, change_type, biz_id, status) values(?,?,?,?,?)", + cmd.getUserId(), cmd.getDeductionPoints(), 1, cmd.getOrderId(), 0); + + return true; + }); + } + + /** + * commit提交积分扣减 + * @param parameter + */ + @Override + public void commit(PointsDeductCmd parameter) { + super.commit(parameter, cmd -> { + int result = jdbcTemplate.update("update tcc_demo_points_changing_Log set status = 1 where biz_id = ?", cmd.getBizId()); + if (result == 0) { + throw new RuntimeException("无效的业务id,无法积分commit"); + } + }); + } + /** + * cancel回顾积分扣减 + * @param parameter + */ + @Override + public void cancel(PointsDeductCmd parameter) { + super.cancel(parameter, cmd -> { + int result = jdbcTemplate.update("update tcc_demo_points_changing_Log set status = 2 where biz_id = ?", cmd.getBizId()); + if (result == 0) { + log.error("无效的业务id : {},无法进行积分cancel", cmd.getBizId()); + return; + } + + int result2 = jdbcTemplate.update("update tcc_demo_user_points set points = points + ? where user_id = ?", + cmd.getDeductionPoints(), cmd.getUserId() + ); + if (result2 == 0) { + throw new RuntimeException("无效的用户id,无法进行积分rollback"); + } + }); + } + + +} + +``` + + -已解决悬挂、幂等、空回滚问题,业务层面无需关注这部分处理。 -# 使用示例 -https://github.com/654894017/tcc/tree/master/src/test/java/com/damon/sample -### 步骤1:运行 com.damon.sample.points.PointsApplication -### 步骤2:运行 com.damon.sample.order.TestRun \ No newline at end of file diff --git a/src/main/java/com/damon/tcc/TccMainService.java b/src/main/java/com/damon/tcc/TccMainService.java index 8e771aa..748beac 100644 --- a/src/main/java/com/damon/tcc/TccMainService.java +++ b/src/main/java/com/damon/tcc/TccMainService.java @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit; -public abstract class TccMainService { +public abstract class TccMainService { private final Logger log = LoggerFactory.getLogger(TccMainService.class); private final ExecutorService asyncCommitExecutorService; private final ExecutorService asyncCheckExecutorService; @@ -96,17 +96,17 @@ protected R process(O parameter) throws TccTryException { TccMainLog tccMainLog = new TccMainLog(parameter.getBizId()); tccLogService.create(tccMainLog); log.info("业务类型: {}, 业务id : {}, 创建事务日志成功", bizType, parameter.getBizId()); - this.executeTry(parameter); + PD processData = this.executeTry(parameter); log.info("业务类型: {}, 业务id : {}, 预执行成功", bizType, parameter.getBizId()); - R result = this.executeLocalTransaction(parameter, tccMainLog); + R result = this.executeLocalTransaction(parameter, tccMainLog, processData); log.info("业务类型: {}, 业务id : {}, 本地事务成功", bizType, parameter.getBizId()); this.executeCommit(parameter, tccMainLog); return result; } - private void executeTry(O parameter) throws TccTryException { + private PD executeTry(O parameter) throws TccTryException { try { - attempt(parameter); + return attempt(parameter); } catch (Exception exception) { log.error("业务类型: {}, 业务id : {}, 预执行失败", bizType, parameter.getBizId(), exception); asyncCommitExecutorService.execute( @@ -122,10 +122,10 @@ private void executeCommit(O parameter, TccMainLog tccMainLog) { ); } - private R executeLocalTransaction(O parameter, TccMainLog tccMainLog) { + private R executeLocalTransaction(O parameter, TccMainLog tccMainLog, PD processData) { try { return localTransactionService.execute( - new TccLocalTransactionSupplier<>(tccLogService, tccMainLog, this::executeLocalTransaction, parameter) + new TccLocalTransactionSupplier<>(tccLogService, tccMainLog, this::executeLocalTransaction, parameter, processData) ); } catch (Exception exception) { log.error("业务类型: {}, 业务id : {}, 本地事务执行失败", bizType, parameter.getBizId(), exception); @@ -150,16 +150,17 @@ private R executeLocalTransaction(O parameter, TccMainLog tccMainLog) { * 服务调用者可以比较好的应对较复杂的业务逻辑 * * @param object + * @return 返回的结果会作为本地事务方法的入参(processData) */ - protected abstract void attempt(O object); + protected abstract PD attempt(O object); /** * 执行本地事务方法和tcc事务日志在一个事务域内处理 - * * @param object + * @param processData attempt 方法返回的结果 * @return */ - protected abstract R executeLocalTransaction(O object); + protected abstract R executeLocalTransaction(O object, PD processData); protected abstract void commit(O object); diff --git a/src/main/java/com/damon/tcc/transaction/DefaultLocalTransactionService.java b/src/main/java/com/damon/tcc/transaction/DefaultLocalTransactionService.java index 05e73f7..ee7f51d 100644 --- a/src/main/java/com/damon/tcc/transaction/DefaultLocalTransactionService.java +++ b/src/main/java/com/damon/tcc/transaction/DefaultLocalTransactionService.java @@ -9,6 +9,7 @@ public class DefaultLocalTransactionService implements ILocalTransactionService { private final Logger log = LoggerFactory.getLogger(DefaultLocalTransactionService.class); + @Transactional(rollbackFor = Exception.class) public R execute(Supplier supplier) { boolean isTransactionActive = TransactionSynchronizationManager.isActualTransactionActive(); diff --git a/src/main/java/com/damon/tcc/transaction/TccLocalTransactionSupplier.java b/src/main/java/com/damon/tcc/transaction/TccLocalTransactionSupplier.java index 43f1ce6..3983c99 100644 --- a/src/main/java/com/damon/tcc/transaction/TccLocalTransactionSupplier.java +++ b/src/main/java/com/damon/tcc/transaction/TccLocalTransactionSupplier.java @@ -4,26 +4,28 @@ import com.damon.tcc.main_log.ITccMainLogService; import com.damon.tcc.main_log.TccMainLog; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.Supplier; -public class TccLocalTransactionSupplier implements Supplier { +public class TccLocalTransactionSupplier implements Supplier { private final ITccMainLogService tccLogService; private final TccMainLog tccMainLog; - private final Function localTransactionPhaseFunction; + private final BiFunction localTransactionPhaseFunction; private final O parameter; + private final PD processData; - public TccLocalTransactionSupplier(ITccMainLogService tccLogService, TccMainLog tccMainLog, Function localTransactionPhaseFunction, O parameter) { + public TccLocalTransactionSupplier(ITccMainLogService tccLogService, TccMainLog tccMainLog, BiFunction localTransactionPhaseFunction, O parameter, PD processData) { this.tccMainLog = tccMainLog; this.tccLogService = tccLogService; this.localTransactionPhaseFunction = localTransactionPhaseFunction; this.parameter = parameter; + this.processData = processData; } @Override public R get() { tccMainLog.commitLocal(); tccLogService.update(tccMainLog); - return localTransactionPhaseFunction.apply(parameter); + return localTransactionPhaseFunction.apply(parameter, processData); } } diff --git a/src/test/java/com/damon/sample/order/app/OrderSubmitAppService.java b/src/test/java/com/damon/sample/order/app/OrderSubmitAppService.java index 3b7afe4..1e89145 100644 --- a/src/test/java/com/damon/sample/order/app/OrderSubmitAppService.java +++ b/src/test/java/com/damon/sample/order/app/OrderSubmitAppService.java @@ -11,32 +11,52 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; +import java.util.HashMap; +import java.util.Map; + @Service -public class OrderSubmitAppService extends TccMainService implements IOrderSubmitAppService { +public class OrderSubmitAppService extends TccMainService, Order> implements IOrderSubmitAppService { private final JdbcTemplate jdbcTemplate; private final IPointsGateway pointsGateway; @Autowired - public OrderSubmitAppService(TccMainConfig config, - IPointsGateway pointsGateway) { + public OrderSubmitAppService(TccMainConfig config, IPointsGateway pointsGateway) { super(config); this.jdbcTemplate = new JdbcTemplate(config.getDataSource()); this.pointsGateway = pointsGateway; } + /** + * 检查失败的日志,已纠正事务是否需要回顾还是提交 + */ public void executeFailedLogCheck() { super.executeFailedLogCheck(); } + /** + * 检查死亡的日志,已纠正事务是否需要回顾还是提交 + */ public void executeDeadLogCheck() { super.executeDeadLogCheck(); } + /** + * 执行失败日志检查的时候需要回查请求参数(因为事务日志未记录方法请求参数,所以需要回查一下) + * @param bizId 实体对象id(业务id) + * @return + */ @Override protected Order callbackParameter(Long bizId) { return jdbcTemplate.queryForObject("select * from tcc_demo_order where order_id = ? ", new BeanPropertyRowMapper<>(Order.class), bizId); } + /** + * 创建订单 (1 预先创建订单 2 执行try动作) + * + * @param userId + * @param points + * @return + */ @Override public Long submitOrder(Long userId, Long points) { Long orderId = IdUtil.getSnowflakeNextId(); @@ -44,22 +64,45 @@ public Long submitOrder(Long userId, Long points) { Order order = new Order(orderId, 0, userId, points); return super.process(order); } + + /** + * try执行用户积分扣除 + * + * @param order + * @return + */ @Override - protected void attempt(Order order) { - pointsGateway.tryDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); + protected Map attempt(Order order) { + Boolean result = pointsGateway.tryDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); + Map map = new HashMap<>(); + map.put("flag", result); + return map; } + @Override - protected Long executeLocalTransaction(Order object) { + protected Long executeLocalTransaction(Order object, Map map) { int result = jdbcTemplate.update("update tcc_demo_order set status = ? where order_id = ? ", 1, object.getOrderId()); if (result == 0) { throw new RuntimeException("无效的订单id : " + object.getOrderId()); } return object.getOrderId(); } + + /** + * commit积分 + * + * @param order + */ @Override protected void commit(Order order) { pointsGateway.commitDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); } + + /** + * cancel回滚积分 + * + * @param order + */ @Override protected void cancel(Order order) { pointsGateway.cancelDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints()); diff --git a/src/test/java/com/damon/sample/points/adapter/PointsController.java b/src/test/java/com/damon/sample/points/adapter/PointsController.java index 2f01333..2e9bccb 100644 --- a/src/test/java/com/damon/sample/points/adapter/PointsController.java +++ b/src/test/java/com/damon/sample/points/adapter/PointsController.java @@ -13,14 +13,17 @@ public class PointsController { @Autowired private PointsDeductionAppService pointsDeductionAppService; + @PostMapping("try_deduction") public void deductionTry(@RequestBody PointsDeductCmd cmd) { pointsDeductionAppService.attempt(cmd); } + @PostMapping("commit_deduction") public void deductionCommit(@RequestBody PointsDeductCmd cmd) { pointsDeductionAppService.commit(cmd); } + @PostMapping("cancel_deduction") public void deductionCancel(@RequestBody PointsDeductCmd cmd) { pointsDeductionAppService.cancel(cmd); diff --git a/src/test/java/com/damon/sample/points/app/PointsDeductionAppService.java b/src/test/java/com/damon/sample/points/app/PointsDeductionAppService.java index c8d0e49..22acad9 100644 --- a/src/test/java/com/damon/sample/points/app/PointsDeductionAppService.java +++ b/src/test/java/com/damon/sample/points/app/PointsDeductionAppService.java @@ -15,11 +15,18 @@ public class PointsDeductionAppService extends TccSubService implements IPointsDeductionAppService { private final Logger log = LoggerFactory.getLogger(PointsDeductionAppService.class); private final JdbcTemplate jdbcTemplate; + @Autowired public PointsDeductionAppService(TccSubConfig config) { super(config); this.jdbcTemplate = new JdbcTemplate(config.getDataSource()); } + + /** + * try执行积分扣减 + * @param parameter + * @return + */ @Override public boolean attempt(PointsDeductCmd parameter) { return super.attempt(parameter, cmd -> { @@ -38,6 +45,10 @@ public boolean attempt(PointsDeductCmd parameter) { }); } + /** + * commit提交积分扣减 + * @param parameter + */ @Override public void commit(PointsDeductCmd parameter) { super.commit(parameter, cmd -> { @@ -47,7 +58,10 @@ public void commit(PointsDeductCmd parameter) { } }); } - + /** + * cancel回顾积分扣减 + * @param parameter + */ @Override public void cancel(PointsDeductCmd parameter) { super.cancel(parameter, cmd -> { @@ -58,7 +72,7 @@ public void cancel(PointsDeductCmd parameter) { } int result2 = jdbcTemplate.update("update tcc_demo_user_points set points = points + ? where user_id = ?", - cmd.getDeductionPoints(), cmd.getUserId() + cmd.getDeductionPoints(), cmd.getUserId() ); if (result2 == 0) { throw new RuntimeException("无效的用户id,无法进行积分rollback");