Seata 是 Spring Cloud Alibaba 中一款开源的分布式事务解决方案,本文具体就 Seata 的 TCC 模式进行介绍、实践
abstract.png
环境配置 基础环境 首先通过 Docker Compose 搭建基础环境 ——Nacos、MySQL 服务,具体如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 version: '3.8' services: Nacos-Service-1: image: nacos/nacos-server:1.4.2 container_name: Nacos-Service-1 ports: - "9848:8848" environment: MODE: standalone networks: seata_tcc_net: ipv4_address: 130.130.130.32 PayService-DB: image: mysql:5.7 container_name: PayService-DB ports: - "9306:3306" environment: MYSQL_ROOT_PASSWORD: 12345 networks: seata_tcc_net: ipv4_address: 130.130.130.36 StorageService-DB: image: mysql:5.7 container_name: StorageService-DB ports: - "9307:3306" environment: MYSQL_ROOT_PASSWORD: 12345 networks: seata_tcc_net: ipv4_address: 130.130.130.37 networks: seata_tcc_net: ipam: config: - subnet: 130.130.130.0/24
配置 Seata Server 通过 Github 下载 Seata Server,命令如下
1 wget https://github.com/seata/seata/releases/download/v1.3.0/seata-server-1.3.0.zip
修改 Seata Server 下 conf 目录的 registry.conf 文件,将注册中心、配置中心均设置 Nacos。需要注意的是如果没有 logs 目录,则需要手动创建该目录
figure 1.jpeg
对于 Seata Server 而言,其配置信息支持两种形式:本地文件、配置中心。对于后者而言,我们需要将 Seata 的相关配置项导入到配置中心。同样,我们需要通过 Github 来下载配置文件 config.txt 及相应的导入脚本 nacos-config.sh
1 2 3 4 5 https://github.com/seata/seata/blob/1.3.0/script/config-center/config.txt https://github.com/seata/seata/blob/1.3.0/script/config-center/nacos/nacos-config.sh
在通过 Shell 脚本导入配置至 Nacos 过程中,配置文件 config.txt 应与 Shell 脚本的上一级目录保持平行。然后在 Shell 脚本所在目录中执行如下命令即可
1 2 sh nacos-config.sh -h localhost -p 9848
该 Shell 脚本支持的选项如下所示
-h : Nacos 服务的 IP 地址,默认为 localhost
-p : Nacos 服务的 Port 端口,默认为 8848
-g : Nacos 分组名,默认为 SEATA_GROUP
-t : Nacos 命名空间 ID。默认为””,即使用 public 命名空间
-u : Nacos 服务的用户名
-w : Nacos 服务的密码
效果如下所示
figure 2.jpeg
至此 Seata Server 相关环境及配置就完成了,只需通过 Seata Server 下 bin 目录的 seata-server.sh 脚本启动服务即可。其中 - p 选项指定服务使用的端口,默认为 8091
figure 3.jpeg
搭建库存服务 POM 依赖 通过 SpringBoot 搭建库存服务 StorageService。这里给出关键性的依赖及版本,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > 2.3.2.RELEASE</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > Hoxton.SR8</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-dependencies</artifactId > <version > 2.2.3.RELEASE</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > <exclusions > <exclusion > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.3.0</version > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.76</version > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-boot-starter</artifactId > <version > 3.4.1</version > </dependency > </dependencies >
服务配置 该服务的配置文件如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 server: port: 8080 spring: application: name: StorageService datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:9307/StorageDb?allowPublicKeyRetrieval=true&useSSL=false username: root password: 12345 cloud: nacos: discovery: server-addr: 127.0 .0 .1 :9848 alibaba: seata: tx-service-group: my_test_tx_group mybatis-plus: mapper-locations: classpath:mapper/*.xml seata: registry: type: nacos nacos: application: seata-server server-addr: 127.0 .0 .1 :9848 username: nacos password: nacos group: SEATA_GROUP config: type: nacos nacos: server-addr: 127.0 .0 .1 :9848 username: nacos password: nacos group: SEATA_GROUP enable-auto-data-source-proxy: true management: endpoints: web: exposure: include: "*" base-path: /actuator
服务实现 这里直接添加一个 Controller 类用以实现库存扣减,核心代码实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @RestController @RequestMapping("goods") @Slf4j public class GoodsController { @Autowired private GoodsStorageService goodsStorageService; @RequestMapping("/sell") public String sell (@RequestBody GoodsDto goodsDto) { String msg = "success" ; goodsStorageService.sell(null , goodsDto); return msg; } } ... @Data @Builder @AllArgsConstructor @NoArgsConstructor public class GoodsDto { private String goodsName; private int num; }
作为 TCC 的分布式事务方案来说,对于一个售卖商品扣减库存的过程。需要根据 Try-Confirm-Cancel 的设计要求,将库存的扣减分为两阶段完成。首先在 GoodsStorageService 接口中定义 sell、confirmSell、cancelSell 方法,然后在接口上添加 @LocalTCC 注解,最后在 Try 方法上添加 @TwoPhaseBusinessAction 注解。具体地,@TwoPhaseBusinessAction 注解的 name 属性只需保证唯一性即可、commitMethod/rollbackMethod 属性用来设置两阶段调用的方法名。TCC 中各方法的 BusinessActionContext 参数是 TCC 两阶段之间用来传递参数的 context 上下文,故在 Controller 中调用 Try 方法时 BusinessActionContext 参数只需传 null 值即可。并将通过 @BusinessActionContextParameter 注解将相关参数以指定名称存入 context 上下文。与此同时还在接口中提供了一个默认方法 getParamByContext,以便于二阶段时从 context 上下文获取参数。为了保证 TCC 二阶段的 Confirm、Cancel 接口的幂等性,这里在实现类中通过向 resultHolder 存入 xid 全局事务 ID 进行幂等控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 @LocalTCC public interface GoodsStorageService { String paramName = "params" ; default GoodsDto getParamByContext (BusinessActionContext context) { JSONObject jsonObject = (JSONObject) context.getActionContext(paramName); GoodsDto goodsDto = jsonObject.toJavaObject(GoodsDto.class); return goodsDto; } @TwoPhaseBusinessAction(name = "sell", commitMethod = "confirmSell", rollbackMethod = "cancelSell") int sell (BusinessActionContext context, @BusinessActionContextParameter(paramName = paramName) GoodsDto goodsDto) ; void confirmSell (BusinessActionContext context) ; void cancelSell (BusinessActionContext context) ; } ... @Service @Slf4j public class GoodsStorageServiceImpl implements GoodsStorageService { private static Set<String> resultHolder = new ConcurrentHashSet <>(); @Autowired private GoodsStorageMapper goodsStorageMapper; @Override public int sell (BusinessActionContext context, GoodsDto goodsDto) { String xid = context.getXid(); int result = goodsStorageMapper.sell(goodsDto); log.info("[Goods Storage Service]: result: {}" , result); if ( result != 1 ) { throw new RuntimeException ("商品库存不足" ); } resultHolder.add( xid ); return result; } @Override public void confirmSell (BusinessActionContext context) { String xid = context.getXid(); if ( !resultHolder.contains(xid) ) { return ; } GoodsDto goodsDto = getParamByContext(context); goodsStorageMapper.confirmSell(goodsDto); resultHolder.remove(xid); log.info("[Goods Storage Service]: confirm sell" ); } @Override public void cancelSell (BusinessActionContext context) { String xid = context.getXid(); if ( !resultHolder.contains(xid) ) { return ; } GoodsDto goodsDto = getParamByContext(context); goodsStorageMapper.cancelSell(goodsDto); resultHolder.remove(xid); log.info("[Goods Storage Service]: cancel sell" ); } }
DB 层面 商品库存表对应的实体类 GoodsStorage 如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Data @Builder @AllArgsConstructor @NoArgsConstructor @TableName("goods_storage") public class GoodsStorage { @TableId private int id; private String goodsName; private int stock; private int soldNum; private int freezeNum; }
而售卖商品扣减库存的各阶段方法所使用的 SQL 如下所示,至此就可以明白 freezeNum 冻结库存数这一中间状态的含义。这也是 TCC 方案两阶段的具体体现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <update id ="sell" parameterType ="com.aaron.StorageService.dto.GoodsDto" > update goods_storage set stock = stock - #{num}, freeze_num = freeze_num + #{num} where goods_name = #{goodsName} and (stock - #{num}) >= 0 </update > <update id ="confirmSell" parameterType ="com.aaron.StorageService.dto.GoodsDto" > update goods_storage set sold_num = sold_num + #{num}, freeze_num = freeze_num - #{num} where goods_name = #{goodsName} </update > <update id ="cancelSell" parameterType ="com.aaron.StorageService.dto.GoodsDto" > update goods_storage set stock = stock + #{num}, freeze_num = freeze_num - #{num} where goods_name = #{goodsName} </update >
搭建支付服务 为了验证分布式事务,自然不能只有一个微服务。故这里类似地我们再搭建一个 PayService 支付服务。当然基本搭建过程与 StorageService 服务并无明显差异。首先在 POM 依赖方面,PayService 服务的 POM 依赖与 StorageService 服务一致,同样也需要引入 Seata、Nacos 等相关依赖。其次在服务配置方面,PayService 服务的 application.yml 配置文件中关于 Seata、Nacos 相关的配置自然与 StorageService 服务并无二致。但需调整修改其所连接的数据库信息,部分配置如下所示
1 2 3 4 5 6 7 8 9 10 11 12 server: port: 90 spring: application: name: PayService datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:9306/PayDb?allowPublicKeyRetrieval=true&useSSL=false username: root password: 12345
这里添加一个 Controller 用于进行余额、库存的扣减。作为分布式事务的发起者,这里需要添加一个 @GlobalTransactional 注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @RestController @RequestMapping("pay") @Slf4j public class PayController { @Autowired private RestTemplate restTemplate; @Autowired private PayService payService; @GlobalTransactional @RequestMapping("/buy") public String buy () { PayDto payDto = new PayDto ("Aaron" ,2000 ); payService.pay(null , payDto); String url = "http://StorageService/goods/sell" ; GoodsDto goodsDto = new GoodsDto ("iPhone" , 5 ); String response = restTemplate.postForObject(url, goodsDto, String.class); return "complete" ; } } ... @Configuration public class RestTemplateConfig { @Bean @LoadBalanced public RestTemplate restTemplate () { return new RestTemplate (); } } ... @Data @Builder @AllArgsConstructor @NoArgsConstructor public class PayDto { private String name; private int money; }
类似地,Service 层同样按照 TCC 的设计原则进行设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 @LocalTCC public interface PayService { String paramName = "params" ; default PayDto getParamByContext (BusinessActionContext context) { JSONObject jsonObject = (JSONObject) context.getActionContext(paramName); PayDto payDto = jsonObject.toJavaObject(PayDto.class); return payDto; } @TwoPhaseBusinessAction(name = "pay", commitMethod = "confirmPay", rollbackMethod = "cancelPay") int pay (BusinessActionContext context, @BusinessActionContextParameter(paramName = paramName) PayDto payDto ) ; void confirmPay (BusinessActionContext context) ; void cancelPay (BusinessActionContext context) ; } ... @Service @Slf4j public class PayServiceImpl implements PayService { private static Set<String> resultHolder = new ConcurrentHashSet <>(); @Autowired private PayMapper payMapper; @Override public int pay (BusinessActionContext context, PayDto payDto) { String xid = context.getXid(); int result = payMapper.pay(payDto); log.info("[Pay Service]: result: {}" , result); if ( result != 1 ) { throw new RuntimeException ("账户余额不足" ); } resultHolder.add( xid ); return result; } @Override public void confirmPay (BusinessActionContext context) { String xid = context.getXid(); if ( !resultHolder.contains(xid) ) { return ; } PayDto payDto = getParamByContext(context); payMapper.confirmPay( payDto ); resultHolder.remove(xid); log.info("[Pay Service]: confirm pay" ); } @Override public void cancelPay (BusinessActionContext context) { String xid = context.getXid(); if ( !resultHolder.contains(xid) ) { return ; } PayDto payDto = getParamByContext(context); payMapper.cancelPay( payDto ); resultHolder.remove(xid); log.info("[Pay Service]: cancel pay" ); } }
余额表对应的实体类 Pay 如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Data @Builder @AllArgsConstructor @NoArgsConstructor @TableName("member_balance") public class Pay { @TableId private int id; private String name; private int balance; private int freeze; }
而支付的各阶段方法所使用的 SQL 如下所示,其同样通过中间状态 freeze 冻结金额这一中间状态实现 TCC 的两阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <update id ="pay" parameterType ="com.aaron.PayService.dto.PayDto" > update member_balance set balance = balance - #{money}, freeze = freeze + #{money} where name = #{name} and (balance - #{money}) >= 0 </update > <update id ="confirmPay" parameterType ="com.aaron.PayService.dto.PayDto" > update member_balance set freeze = freeze - #{money} where name = #{name} </update > <update id ="cancelPay" parameterType ="com.aaron.PayService.dto.PayDto" > update member_balance set balance = balance + #{money}, freeze = freeze - #{money} where name = #{name} </update >
测试 在测试之前,需要对 PayService 支付服务、StorageService 库存服务各自的数据库完成表的建立及数据初始化工作,如下图所示
figure 4.jpeg
分别在 90、8080 端口启动 PayService 支付服务、StorageService 库存服务,当我们第一次调用 PayService 支付服务的 buy 接口时,可以看到余额、库存均被正常扣减
figure 5.jpeg
而当第二次调用该接口时,由于商品库存不足。则会导致整个分布式事务进行回滚。可以看到余额、库存的数据由于被正常回滚,故未发生意外扣除
figure 6.jpeg
v1.5.2