Seata 分布式事务处理
Seata介绍
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务
下载地址:https://seata.apache.org/zh-cn/unversioned/download/seata-server
Seata模式
Seata 有AT、TCC、Saga、XA四个模式
本文中,我们将重点介绍 Seata AT 模式的使用
AT模式官方文档:https://seata.apache.org/zh-cn/docs/dev/mode/at-mode
SpringCloud Alibaba Seata
启动Nacos
启动Seata
下载seata(本文档采用的1.8.8)
修改配置文件(seata/conf/application.yml)
seata: config: # 打开nacos配置中心 type: nacos nacos: # nacos地址 server-addr: 127.0.0.1:8848 namespace: # nacos分组 group: DEFAULT_GROUP # nacos文件datd-id,就需要在nacos上新建seata的同名配置文件。第4步骤 data-id: seataServer.properties registry: # 打开nacos注册中心配置 type: nacos nacos: application: seata-server # 注册中心地址 server-addr: 127.0.0.1:8848 # 分组 group: DEFAULT_GROUP namespace: cluster: default store: # 打开数据库模式 mode: db # 数据库具体参数信息 db: datasource: druid db-type: mysql driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.163.128:3306/seata?rewriteBatchedStatements=true user: root password: 123456 min-conn: 10 max-conn: 100 global-table: global_table branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock query-limit: 1000 max-wait: 5000
数据库初始化,创建数据库,执行seata/script/server/db/mysql.sql
nacos 新建配置,此处 Data ID 为 seataServer.properties,(Data ID可以自己修改,但是必须与第2步配置文件里面的data-id配置保持一致)。配置内容参考https://github.com/apache/incubator-seata/tree/develop/script/config-center 的 config.txt 并按需修改保存。
启动seata-server。seata/bin/seata-server.bat
查看seata控制台,localhost:7091。用户名密码默认为seata
查看nacos控制台,发现seata-server已注册进nacos
测试分布式事务
创建帐户服务
maven坐标
<!-- seata --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Nacos 服务发现 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
application.yml
server: port: 9001 spring: application: name: cloud-alibaba-seata-account cloud: nacos: discovery: server-addr: localhost:8848 datasource: url: jdbc:mysql://192.168.163.128:3306/seata-demo?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 # seata配置 seata: application-id: cloud-alibaba-seata-account # 通过nacos寻找seata服务,然后注册进seata服务端 registry: type: nacos nacos: server-addr: localhost:8848 #nacos地址 group: DEFAULT_GROUP #分组 application: seata-server #seata服务端服务名称 namespace: tx-service-group: seata_tx_group #这个通常使用默认的 service: vgroup-mapping: seata_tx_group: default data-source-proxy-mode: AT #AT模式
执行数据库脚本
创建undo_log表。
UNDO_LOG脚本https://github.com/apache/incubator-seata/tree/2.x/script/client/at/db
CREATE TABLE `undo_log` ( `branch_id` bigint NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`), INDEX `ix_log_created`(`log_created`) );
创建account表
CREATE TABLE `account` ( `user_id` int NOT NULL, `balance` int NULL DEFAULT NULL, PRIMARY KEY (`user_id`) ); SET FOREIGN_KEY_CHECKS = 1; INSERT INTO `account` (`user_id`, `balance`) VALUES (1, 100);
代码
@RestController class AccountController { @Autowired AccountService accountService; /** * 扣减帐户中的费用 * * @param userId 用户id * @param cost 费用 * @return */ @GetMapping("/debit") public String debit(Integer userId, Integer cost) { return accountService.debit(userId, cost); } } @Service class AccountService { @Autowired JdbcTemplate jdbcTemplate; public String debit(Integer userId, Integer cost) { // 此处需要考虑锁的问题,这里演示分布式事务,则没有加上 Map<String, Object> map = jdbcTemplate.queryForMap("SELECT balance FROM account WHERE user_id = ?", userId); if (null != map && map.containsKey("balance")) { Integer balance = (Integer) map.get("balance"); if (balance >= cost) { jdbcTemplate.update("UPDATE account SET balance = ? WHERE user_id = ?", (balance - cost), userId); return "SUCCESS"; } } throw new RuntimeException("余额不足"); } }
创建仓库服务
maven依赖和帐户服务一致
application.yml
server: port: 9003 spring: application: name: cloud-alibaba-seata-storage cloud: nacos: discovery: server-addr: localhost:8848 datasource: url: jdbc:mysql://192.168.163.128:3306/seata-demo?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 seata: application-id: cloud-alibaba-seata-storage registry: type: nacos nacos: server-addr: localhost:8848 #nacos地址 group: DEFAULT_GROUP #分组 application: seata-server #seata服务端服务名称 namespace: tx-service-group: seata_tx_group #这个通常使用默认的 service: vgroup-mapping: seata_tx_group: default data-source-proxy-mode: AT #AT模式
创建库存表goods_storage
CREATE TABLE `goods_storage` ( `name` varchar(255) NOT NULL, `number` int NULL DEFAULT NULL, PRIMARY KEY (`name`) USING BTREE ); SET FOREIGN_KEY_CHECKS = 1; INSERT INTO `seata-demo`.`goods_storage` (`name`, `number`) VALUES ('book', 100);
代码
@RestController class StorageController { @Autowired StorageService storageService; /** * 扣减库存 * * @param name * @param count * @return */ @GetMapping("/deduct") public String deduct(String name, Integer count) { return storageService.deduct(name, count); } } @Service class StorageService { @Autowired JdbcTemplate jdbcTemplate; public String deduct(String name, Integer count) { //此处也应该考虑分布式锁的情况 Map<String, Object> map = jdbcTemplate.queryForMap("SELECT `number` FROM goods_storage WHERE `name` = ?", name); if (null != map && map.containsKey("number")) { Integer number = (Integer) map.get("number"); if (number >= count) { jdbcTemplate.update("UPDATE goods_storage SET `number` = ? WHERE `name` = ?", number - count, name); return "SUCCESS"; } } throw new RuntimeException("库存不足"); } }
创建订单服务
maven,比之前两个服务多了openfeign的依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Nacos 服务发现 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- 服务发现:OpenFeign服务调用 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!-- 负载均衡器 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> </dependency>
application.yml
server: port: 9002 spring: application: name: cloud-alibaba-seata-order cloud: nacos: discovery: server-addr: localhost:8848 datasource: url: jdbc:mysql://192.168.163.128:3306/seata-demo?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 seata: application-id: cloud-alibaba-seata-order registry: type: nacos nacos: server-addr: localhost:8848 #nacos地址 group: DEFAULT_GROUP #分组 application: seata-server #seata服务端服务名称 namespace: tx-service-group: seata_tx_group #这个通常使用默认的 service: vgroup-mapping: seata_tx_group: default data-source-proxy-mode: AT #AT模式
创建订单表
CREATE TABLE `order_list` ( `id` bigint NOT NULL, `name` varchar(255) DEFAULT NULL, `count` int NULL DEFAULT NULL, `price` int NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ; SET FOREIGN_KEY_CHECKS = 1;
代码。重点:加上全局事务注解@GlobalTransactional
//记得启动类上加上@EnableFeignClients注解。 @RestController class OrderController { @Autowired OrderService orderService; @GetMapping("/order/create") public String create(Integer userId, String name, Integer count, Integer price) { try { orderService.createOrder(userId, name, count, price); } catch (Exception e) { return "ERROR"; } return "SUCCESS"; } } @Service class OrderService { @Autowired StorageService storageService; @Autowired AccountService accountService; @Autowired JdbcTemplate jdbcTemplate; /* * 加上全局事务注解。 * 就不需要在加@Transaction注解了。库存和帐户service也不需要加 * seata会通过undo_log表进行反向补偿 */ @GlobalTransactional(rollbackFor = Exception.class) public String createOrder(Integer userId, String name, Integer count, Integer price) { // 创建订单 int update = jdbcTemplate.update("INSERT INTO `order_list` (`name`, `count`, `price`) VALUES (?, ?, ?)", name, count, price); if (update == 1) { // 扣减库存 storageService.deduct(name, count); // 扣减余额 accountService.debit(userId, count * price); } return "SUCCESS"; } } @FeignClient(value = "cloud-alibaba-seata-storage") interface StorageService { //扣减库存 @GetMapping("/deduct") public String deduct(@RequestParam("name") String name, @RequestParam("count") Integer count); } @FeignClient(value = "cloud-alibaba-seata-account") interface AccountService { //扣减余额 @GetMapping("/debit") public String debit(@RequestParam("userId") Integer userId, @RequestParam("cost") Integer cost); }
全局事务测试
启动帐户,仓库,订单服务
查看nacos控制台,seata、帐户、仓库、订单都注册进nacos,且都在一个GEOUP下
查看seata日志,三个服务也都注册进seata服务端
先测试一个正确的,确保流程能正确走完
创建一个订单,用户id1,商品为book,数量1,单价10
浏览器访问 localhost:9002/order/create?userId=1&name=book&count=1&price=10
返回结果SUCCESS,查看数据库
订单创建成功
库存成功扣减
余额成功扣减
错误的,直接将金额设置为1000
浏览器访问 localhost:9002/order/create?userId=1&name=book&count=1&price=1000
返回ERROR
查看数据库,订单,仓库,余额都没有变化。
订单表,没有新增订单
库存表也没有新增
账户表的余额也没有减少
从seata日志查看事务过程解析
开启一个全局事务
Begin new global transaction applicationId: cloud-alibaba-seata-order, transactionServiceGroup: seata_tx_group, transactionName: createOrder(java.lang.Integer, java.lang.String, java.lang.Integer, java.lang.Integer), timeout:60000, xid:192.168.163.1:8091:2972952912344027137
seata控制台
创建订单注册进分支事务
Register branch successfully, xid = 192.168.163.1:8091:2972952912344027137, branchId = 2972952912344027138, resourceId = jdbc:mysql://192.168.163.128:3306/seata-demo , lockKeys = order_list:13
扣减库存注册进分支事务
Register branch successfully, xid = 192.168.163.1:8091:2972952912344027137, branchId = 2972952912344027139, resourceId = jdbc:mysql://192.168.163.128:3306/seata-demo , lockKeys = goods_storage:book
仓库事务回滚成功
Rollback branch transaction successfully, xid = 192.168.163.1:8091:2972952912344027137 branchId = 2972952912344027139
订单事务回滚成功
Rollback branch transaction successfully, xid = 192.168.163.1:8091:2972952912344027137 branchId = 2972952912344027138
全局事务回滚成功
Rollback global transaction successfully, xid = 192.168.163.1:8091:2972952912344027137
原理解析
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
- 二阶段:
- 提交异步化,非常快速地完成。
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
- 回滚通过一阶段的回滚日志进行反向补偿。
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句
- 提交异步化,非常快速地完成。