ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

分布式事务讲解 - Seata分布式事务框架(AT、TCC两种模式)

2022-01-22 11:59:57  阅读:258  来源: 互联网

标签:事务 SET Seata utf8 seata NULL id 分布式


分布式事务讲解 - Seata分布式事务框架(AT、TCC两种模式)


Seata分布式事务框架在公司中使用特别广泛,其中含有四种分布式事务模式:AT、TCC、SAGA、XA,其实与TX-LCN框架一样,都可以说是2PC分布式事务模型的一个变种,工作原理也都相似,值得一说的是,Seata框架的AT模式有对回滚数据的保存,但是TX-LCN这种框架现在还没有做到这么智能,所以现在公司大多是用Seata框架。
本篇博客内容太多,大家可先看理论部分,代码实战部分可以直接用最后分享的源码在本地跑一下,然后了解简单使用即可。

Seata原理

官方文档地址

这篇博客来源于本人在官网文档对这个框架解释的思考和总结,也看了很多其他人的博客。

http://seata.io/zh-cn/docs/overview/what-is-seata.html

Seata框架主要组成部分

Seata和TX-LCN框架的组成部分比较容易混淆,最好大家能理解这部分,区分这两个框架。

  1. TC (Transaction Coordinator) - 事务协调者
    维护全局和分支事务的状态,驱动全局事务提交或回滚。相当于TX-LCN的【TM】事务管理者。
  2. TM (Transaction Manager) - 事务管理器
    定义全局事务的范围:开始全局事务、提交或回滚全局事务。可以理解为【事务的发起者】。
    在我看来,之所以Seata框架把【事务发起者】作为【事务管理器】,是因为,事务发起者有创建事务组和提交或回滚全局事务的能力。
  3. RM (Resource Manager) - 资源管理器
    管理分支事务处理的资源,有向TC注册分支事务和报告分支事务的状态的能力,并驱动分支事务提交或回滚。

Seata工作流程

Seata原理图我就不展示了,可以说是与TX-LCN原理图一样的,大家有兴趣的话可以去看一下【分布式事务讲解 - TX-LNC分布式事务框架(含LCN、TCC、TXC三种模式)】这篇博客。

  1. 【事务管理器】发起事务,向【事务协调者】开启全局事务,并向【事务协调者】注册一个分支事务。
  2. 【事务管理器】调用【资源管理器A】,【资源管理器A】向【事务协调者】注册一个新的分支事务,【资源管理器A】执行业务,返回执行结果给【事务管理器】。
  3. 【资源管理器A】调用【资源管理器B】,【资源管理器B】向【事务协调者】注册一个新的分支事务,【资源管理器B】执行业务,返回执行结果给【事务管理器】。
  4. 等【事务管理器】调用的所有分支事务执行完毕并收到所有反馈信息,【事务管理器】会向【事务协调者】发送通知,表明分支事务全部执行完毕。
  5. 【事务协调者】根据【事务管理器】发送的分支事务执行情况,依次向所有【资源管理器】发出提交或回滚指令。
  6. 分布式事务结束。

全局事务和分支事务

Seata框架中把事务分成了全局事务和分支事务,正因为全局事务锁和分支事务锁的配合,解决了AT模式下的“脏读”和“脏写”问题。

全局事务

Seata框架有一个注解@GlobalTransactional,一个系统中加了这个注解的所有方法,属于一个共同的全局事务,这些方法执行时共同享有一个全局事务锁。同一个全局事务需要遵从AT模式的读写隔离机制。

分支事务(本地事务)

全局事务中会存在调用其他【资源管理器】的方法,每个方法就是一个分支事务,相同的方法属于同一个分支事务,同一个分支事务使用同一个分支事务锁。

AT模式(Automatic Transaction自动化事务)

这种事务模式使用起来比较简单,事务的回滚逻辑不用我们自行实现,我们只要实现分支事务逻辑即可,并且减少了数据库连接占用时长。

AT模式的工作流程是基于Seata框架流程的:

  1. 【事务管理器】发起事务,向【事务协调者】开启全局事务,并向【事务协调者】注册一个分支事务。
  2. 【事务管理器】调用【资源管理器A】,【资源管理器A】向【事务协调者】注册一个新的分支事务,【资源管理器A】执行本地事务SQL并提交事务以及保存回滚日志,释放本地占用的资源,返回执行结果给【事务管理器】。
  3. 【资源管理器A】调用【资源管理器B】,【资源管理器B】向【事务协调者】注册一个新的分支事务,【资源管理器B】执行本地事务SQL并提交事务以及保存回滚日志,释放对本地资源的占用,返回执行结果给【事务管理器】。
  4. 等【事务管理器】调用的所有分支事务执行完毕并收到所有反馈信息,【事务管理器】会向【事务协调者】发送通知,表明分支事务全部执行完毕。
  5. 【事务协调者】根据【事务管理器】发送的分支事务执行情况,依次向所有【资源管理器】发出提交或回滚指令,如果分支事务收到的是提交指令,那只把回滚日志删除即可;如果收到回滚命令,那就执行回滚日志并删除日志,如果发现当前数据和回滚日志中的修改后数据有差异,那就保留回滚日志等人工处理。
  6. 分布式事务结束。

由流程中可见:

  1. 在分支事务执行阶段就把sql提交,减少数据库连接占用时长,释放了本地事务的资源,提高了事务执行效率。
  2. 在分支事务执行阶段就保留回滚日志,使开发人员不用关心事务失败回滚的逻辑,事务失败,框架直接执行对应分支事务在对应回滚日志表中的记录进行事务回滚。

回滚日志具体内容

{
	// 分支id
	"branchId": 641789253,
	"undoItems": [{
		// 修改后信息
		"afterImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "GTS"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		// 修改前信息
		"beforeImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "TXC"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			// 数据库表名
			"tableName": "product"
		},
		// sql执行类型
		"sqlType": "UPDATE"
	}],
	// 全局事务id
	"xid": "xid:xxx"
}

回滚日志有对修改前和修改后数据的保存,用于回滚操作的自动进行,在执行回滚操作时,会根据分支事务id和全局事务id共同确定回滚日志,并且将回滚日志中的修改后信息与数据库当前数据信息进行对比,如果有差异,说明数据被修改过了,那就回滚失败,保留回滚日志,等候人工处理。

写隔离

AT模式为了防止产生“脏写”问题,加入了【写隔离】机制。

原理

  1. 两个不同线程(线程A、线程B)进入同一个分支事务执行相同业务,这时就要保证这两个线程不要出现脏写现象,这两个线程属于同一个全局事务。
  2. 线程A先拿到本地事务锁,执行业务,但是不进行事务提交。
  3. 线程A注册分支事务,拿到全局事务锁,提交本地事务。
  4. 线程A释放本地锁,但是还有其他业务未执行,全局事务锁依然没有释放。
  5. 本地事务锁释放后,线程B就可执行业务,执行完后不进行事务提交。
  6. 线程B注册分支事务,但是全局锁还被线程A占有,就会抛出异常,开启重试机制,如果重试超时依然没有拿到全局事务锁,那线程B就执行回滚操作,线程B整体事务结束。
  7. 线程A执行完全部业务,并全部提交成功,释放全局锁。
  8. 线程B重试获取全局锁成功,提交已执行的业务,等执行完全部业务,并全部提交成功,释放全局锁。
  1. 使用本地事务锁,才能减少对数据库连接的资源占用。
    如果没有本地事务锁,那就会有多个线程占用数据库连接资源,数量达到阈值可能出现OOM。
  2. 使用全局事务锁,才能保证对相同业务不被同时操作,才能保证整个分布式事务的提交和回滚操作正常进行。
    如果没有全局事务锁,那就会出现线程A出错回滚事务时发现欲回滚数据已被线程B修改过的现象,导致无法正常回滚事务的后果。

读隔离

AT模式为了防止产生“脏读”问题,加入了【读隔离】机制。

原理

Seata代理了select for update,当要执行select for update语句时,会有以下流程:

  1. 获取本地事务锁,执行select for update。
  2. 如果方法被@GlobalTransactional注解修饰,那就检查全局事务锁是否能拿到。
  3. 如果全局事务锁被占用,那就回滚本地事务,重试回去本地事务锁和全局锁,直到全局锁释放并被拿到为止。

由原理可以看出,使用select for update进行查询的时候,会像写隔离机制一样也会等到全局事务锁的释放后才能进行查询,这种方式也就避免了脏读的情况。

官网语录解析

官网说过:【AT模式 基于 支持本地 ACID 事务的 关系型数据库】,那原因是什么呢?这个我想了很久,网上的和官网也都有做过简单描述,但是描述的都一笔带过,让原本就不懂的人还是不懂,我是这样认为的:

  1. AT模式是基于支持本地事务的关系型数据库。
  2. AT模式要实现【读隔离】和【写隔离】,就一定需要支持不能自动提交执行sql的数据库,因为,这两种隔离原理就是通过拿到全局事务锁才能进行事务提交来实现的,但是不支持本地事务的数据库执行完sql就自动提交了,无法适用于这两种隔离,也就无法解决【脏读】和【脏写】问题。

所以,根据以上两点,我认为使用AT模式的分布式事务数据库必须在本地支持事务。

代码演示

创建数据库及表信息

-- ------------------seata-server数据库及表信息 start------------------------------------
CREATE DATABASE seata-server;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table`  (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `status` tinyint(4) NULL DEFAULT NULL,
  `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime(6) NULL DEFAULT NULL,
  `gmt_modified` datetime(6) NULL DEFAULT NULL,
  PRIMARY KEY (`branch_id`) USING BTREE,
  INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table`  (
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `timeout` int(11) NULL DEFAULT NULL,
  `begin_time` bigint(20) NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime(0) NULL DEFAULT NULL,
  `gmt_modified` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`xid`) USING BTREE,
  INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,
  INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (
  `row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `branch_id` bigint(20) NOT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime(0) NULL DEFAULT NULL,
  `gmt_modified` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`row_key`) USING BTREE,
  INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
-- ------------------seata-server数据库及表信息 end--------------------------------------
-- ------------------seata-tm数据库及表信息 start--------------------------------------
CREATE DATABASE seata-tm;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for tm
-- ----------------------------
DROP TABLE IF EXISTS `tm`;
CREATE TABLE `tm`  (
  `id` int(16) NOT NULL AUTO_INCREMENT,
  `name` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 40 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime(0) NOT NULL,
  `log_modified` datetime(0) NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 47 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
-- ------------------seata-tm数据库及表信息 end--------------------------------------
-- ------------------seata-rm-one数据库及表信息 start--------------------------------------
CREATE DATABASE seata-rm-one;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for rm_one
-- ----------------------------
DROP TABLE IF EXISTS `rm_one`;
CREATE TABLE `rm_one`  (
  `id` int(16) NOT NULL AUTO_INCREMENT,
  `name` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 35 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime(0) NOT NULL,
  `log_modified` datetime(0) NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 55 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
-- ------------------seata-rm-one数据库及表信息 end--------------------------------------
-- ------------------seata-rm-two数据库及表信息 start--------------------------------------
CREATE DATABASE seata-rm-two;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for rm_two
-- ----------------------------
DROP TABLE IF EXISTS `rm_two`;
CREATE TABLE `rm_two`  (
  `id` int(16) NOT NULL AUTO_INCREMENT,
  `name` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 31 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime(0) NOT NULL,
  `log_modified` datetime(0) NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 41 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
-- ------------------seata-rm-two数据库及表信息 end--------------------------------------

新建eureka-server工程

eureka作为我们此次实战Seate的基础,TC、TM、RM都要注册进入eureka-server。
对于eureka的工程搭建,可以看【Spring Cloud完整组件搭建之路-Eureka】这一篇博客,里面有详细的讲解。

配置TC事务协调者

说白了,事务协调者就是Seata框架提供的一个服务程序,像RabbitMQ一样的一个管理平台。

下载seata-server

选择最新版本即可

http://seata.io/zh-cn/blog/download.html

配置seata-server

下载完毕并解压,打开以下目录:

在这里插入图片描述

打开registry.conf文件修改配置,文件最终修改为:
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 修改
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  # 修改
  eureka {
    # eureka-server的地址
    serviceUrl = "http://euk-server-one.com:7900/eureka/"
    # 注册进入eureka-server的服务名称
	application = "seata-server"
	# 权重,非必须
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

# 配置中心
# 如果type=file,则从本地file.conf中获取配置参数,config.type=file时,才加载file.conf中的配置参数.
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  # 查看name属性值文件中的内容
  file {
    name = "file.conf"
  }
}
打开file.conf文件修改配置,文件最终修改为:
service {
  #transaction service group mapping
  #修改点1
  vgroup_mapping.my_tx_group = "seata-server"
  
  disableGlobalTransaction = true
}

## transaction log store, only used in seata-server
store {
  ## store mode: file、db
  # 修改
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
  }

  ## database store property
  #修改
  db {	
    datasource = "druid"
    db-type = "mysql"
	# 有关数据库驱动需要使用一个新的
    driver-class-name = "com.mysql.cj.jdbc.Driver"
    # 配置数据库链接信息
    url = "jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
    user = "root"
    password = "wk3515134"
  }
}
数据库驱动存放地址:

在这里插入图片描述
在博客后面我会附上代码仓库地址,与截图位置相同。

启动seata-server

执行以下程序即可:
在这里插入图片描述

创建TM事务管理者(seata-tm)

创建一个空的Spring Boot工程

添加依赖

<!-- 实体类Data注解所需依赖 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!-- 允许web访问所需依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- eureka客户端所需依赖 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<!-- mysql:MyBatis相关依赖 -->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- mysql:mysql驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mysql:阿里巴巴数据库连接池 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.12</version>
</dependency>
<!-- seata所需依赖 -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

注意在TM引依赖时我当时出现了spring boot版本问题,总是无法连接上TC,大家可以先自己加上这些依赖配置一下,如果不行就看我最后附上的Seata实战源码吧。

修改配置文件(application.yml)

配置文件中主要四步:

  1. 配置启动端口
  2. 配置事务分组信息,与seata-server的file.conf配置文件的vgroup_mapping.my_tx_group属性值相同。
  3. 配置数据库连接信息
  4. 配置欲注册的eureka-server地址
  5. 配置工程的mapper扫描路径
server:
  port: 1001
spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: my_tx_group
  application:
    name: seata-tm
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata-tm?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
    username: root
    password: wk3515134
    dbcp2:
      initial-size: 5
      min-idle: 5
      max-total: 5
      max-wait-millis: 200
      validation-query: SELECT 1
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
mybatis:
  mapper-locations:
  - classpath:mapper/*.xml
eureka:
  client:
    service-url:
      defaultZone: http://euk-server-one.com:7900/eureka/

启动类添加Bean

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
    return new RestTemplate();
}

创建测试Controller类(TmController)

这个类中的方法是整个分布式事务的起点。
需要注意@GlobalTransactional注解,结合我以上讲解的AT模式的理论部分。

@RestController
public class TmController {

    @Autowired
    TmService tmService;

    @GetMapping("/tm")
    @GlobalTransactional(rollbackFor = Exception.class)
    public String tm() throws InterruptedException {
        tmService.tm();
//        TimeUnit.MINUTES.sleep(1);
//        System.out.println(1/0);
        return "success";
    }
}

创建测试Service类(TmService)

在service类中添加使用restTemplate调用分布式系统的逻辑。
其中的Mapper类和实体类内容我就不详解了,大家按照也无所需自行编写即可。

/**
 * @author yueyi2019
 */
@Service
public class TmService {
	@Autowired
	TmDao mapper;
	public String tm() {
		Tm o = new Tm();
		o.setId(1);
		o.setName("tm");
		mapper.insertSelective(o);

		rm1();
		rm2();

		return "";
	}
	@Autowired
	private RestTemplate restTemplate;
	private void rm1() {
		restTemplate.getForEntity("http://seata-rm-one/rm1", null);
	}
	private void rm2() {
		restTemplate.getForEntity("http://seata-rm-two/rm2", null);
	}
}

添加conf文件

registry.conf文件其实就是配置TC时候的registry.conf文件,file.conf文件中有对undo日志、tm重试次数、rm的重试信息,如果不添加这两个文件,TM就在TC注册失败直至超时。
在代码的resources路径下添加以下文件:

添加registry.conf
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://euk-server-one.com:7900/eureka/"
    # 修改点
    application = "seata-server"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}
添加filey.conf
service {
  vgroup_mapping.my_tx_group="seata-server"
  disableGlobalTransaction = false
}

client {
  rm {
    async.commit.buffer.limit = 10000
    lock {
      retry.internal = 10
      retry.times = 30
      retry.policy.branch-rollback-on-conflict = true
    }
    report.retry.count = 5
    table.meta.check.enable = false
    report.success.enable = true
  }
  tm {
    commit.retry.count = 5
    rollback.retry.count = 5
  }
  undo {
    data.validation = true
    log.serialization = "jackson"
    log.table = "undo_log"
  }
  log {
    exceptionRate = 100
  }
  support {
    # auto proxy the DataSource bean
    spring.datasource.autoproxy = false
  }
}

创建RM资源管理者(rm-one、rm-two)

rm-one、rm-two工程创建过程是相同的,配置文件端口不能相同,代码逻辑也可以按需编写。

创建一个空的Spring Boot工程

添加依赖

RM也需要seata、jdbc、eureka-client等依赖,内容与添加的TM依赖相同。

修改配置文件

除了端口信息和数据库连接信息,其他内容可以使用TM的。

启动类添加Bean

与TM配置相同

创建测试Controller类(RmOneController、RmTwoController)

就按照平时写项目时的创建过程就可以。作为一个AT模式的RM,没有特殊要求。

添加conf文件

文件可直接使用TM中配置的registry.conf文件和file.conf文件

代码执行测试

分布式事务执行成功

在这里插入图片描述
在这里插入图片描述

分布式事务执行失败

在任意一个业务中添加异常。
最后会发现,报错后,表的信息没有变化。
在这里插入图片描述
在这里插入图片描述

测试数据库中其他表的作用

在Debug模式打一下断点。
由此可见,截图中从上往下分别是:

  1. rm-one回滚日志表。
  2. rm-two回滚日志表(因为没有执行到rm-two,所以没有日志)。
  3. seata-server的全局事务表。
  4. seata-server的分支事务锁表,有两把分支事务锁分别被rm-one和tm占用。
  5. tm回滚日志表。
  6. seata-server的分支事务表,表明全局事务下有两个分支事务分别执行tm和rm-one任务。
    在这里插入图片描述

TCC模式(Try Cancel Confirm)

TCC模式主要流程与AT模式相似,在流程中就可见TCC模式不如AT模式方便。

TCC模式工作流程:

  1. 【事务管理器】发起事务,向【事务协调者】开启全局事务,并向【事务协调者】注册一个分支事务。
  2. 【事务管理器】调用【资源管理器A】,【资源管理器A】向【事务协调者】注册一个新的分支事务,【资源管理器A】调用try方法执行本地事务并提交事务,释放本地占用的资源,返回执行结果给【事务管理器】。
  3. 【事务管理器】调用【资源管理器B】,【资源管理器B】向【事务协调者】注册一个新的分支事务,【资源管理器B】调用try方法执行本地事务并提交事务,释放本地占用的资源,返回执行结果给【事务管理器】。
  4. 【事务管理器】收到所有【资源管理器】执行本地事务完成返回的结果之后,根据结果来给【事务协调者】发送回滚或者提交请求,如果是回滚请求就调用各【资源管理器】的cancel方法,提交请求就调用confirm方法。

在流程中如果执行sql的时候没有保留回滚日志,所以TCC模式的try、cancel、confirm三个方法的逻辑必须开发人员自己实现,框架的TCC模式只是给我们提供了分布式事务回调cancel和confirm的机制,具体回调之后的逻辑是我们自己实现的。

TCC模式异常问题

空回滚问题

这种问题的出现场景就是,在try方法执行超时情况下,TC已经发送了回滚指令,即,try方法还没执行,cancel就开始执行了。如果出现这种问题,那就会造成数据的不一致性。

情景模拟

情景模拟:

  1. try方法将数字a减1。
  2. cancel方法说明try失败,那就把数字a加1。
  3. confirm方法说明try成功,执行成功后逻辑(一般什么都不做)。

试想一下,如果try还没执行完就执行了cancel,后果是什么?a直接加了1,没有进行减法操作,如果这种情况出现在转账业务上,可见这个问题的重要性。

解决方案

添加一个事务控制表,执行try的时候新增一条记录,执行cancel或者confirm之后都更新执行状态,保存每个全局事务下分支事务当前的执行状态,可以用1,2,3分别表示try,cancel,confirm,这样在执行cancel之前,根据当前的全局事务id和分支事务id查询出当前的状态即可,只有try状态才同意执行cancel方法,否则,不执行cancel方法体内的逻辑。

我对这个表字段进行简单设计:

  1. global_tx_id(全局事务id)
  2. branch_tx_id(分支事务id)
  3. state(事务执行状态,标识位:数字,枚举类型都可以)

幂等问题

这种问题的出现场景就是,因为Seata框架有重试机制,会发生多次执行cancel和confirm的情景。

幂等解释

【幂等】就是说,我执行一次try方法,相对应的就要执行一次cancel或者confirm方法,这样有来有回,一次操作对应一次其他操作,这就是【幂等】,如果一次try操作对应了其他cancel操作多次,那就违反了幂等性了。

解决方案

与【空回滚问题】解决方案相同,一个存有当前事务状态的【事务控制表】,就能解决问题。

悬挂问题

这种问题的出现场景就是,try方法有非常复杂的业务逻辑,一直是阻塞状态,这个时候已经达到了TM的超时阈值,那就会执行回滚命令,回滚完之后,try方法阻塞结束,并继续执行try方法后续逻辑,即,cancel先于try执行了。

解决方案

基于【空回滚问题】和【幂等问题】的解决方案,实在执行try的时候才会在【事务控制表】新增一条数据,但是【悬挂问题】出现执行cancel的时候,【事务控制表】是没有当前事务的执行状态的,所以我们在执行cancel方法时,如果发现表中没有存储当前事务,那就说明发生了【悬挂问题】,那就不执行cancel方法中的业务逻辑,并最后新增一条状态为cancel状态的数据进入【事务控制表】。

TCC模式异常总结

出现这三种异常情况可根据一张【事务控制表】完美解决,接下来,我配上一张自己画的异常处理逻辑图:
在这里插入图片描述

官网语录解析

TCC 模式,不依赖于底层数据资源的事务支持

  1. 因为框架的TCC模式不支持自动回滚和提交,所有业务逻辑由开发人员实现,所以,支持任何分布式事务。
  2. TCC模式是没有读隔离和写隔离机制的,官方也是聪明,没有咱就不写,也不说没有,因为TCC支持不支持事务的数据库,比如Mysql的MyIsam引擎,这种在根本上就不能控制事务的提交,也就不能实现隔离。
  3. 因为try、confirm、cancel都是由开发人员自己编写的逻辑,所以开发人员写成什么样就支持什么样,对于就简单面向业务,而不操作数据库的场景,以及不支持本地事务的场景,TCC模式都是首选。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中

对应上一条语录的第三条,可自定义分布式事务,就是TCC模式的优点,也同样是TCC模式的缺点。

代码演示

以AT模式代码为基础,编写TCC模式实战代码

TM事务管理者(seata-tm)

Controller测试类新增TCC测试方法(TmController)

@Autowired
    private TmInterface tmInterface;
    
    @GetMapping("/tm-tcc")
    @GlobalTransactional(rollbackFor = Exception.class)
    public String tmTcc() throws InterruptedException {
        tmInterface.tm(null);
        return "success";
    }

注意方法的全局事务注解

新增TCC测试Service接口类(TmInterface)

@LocalTCC
public interface TmInterface {

    @TwoPhaseBusinessAction(name = "tmTccAction" , 
	    commitMethod = "tmCommit" ,
	    rollbackMethod = "tmRollback")
    public String tm(BusinessActionContext businessActionContext);
    public boolean tmCommit(BusinessActionContext businessActionContext);
    public boolean tmRollback(BusinessActionContext businessActionContext);
}
  1. 注意类的@LocalTCC注解。
  2. tm方法作为try方法,在tm()上加入TwoPhaseBusinessAction注解,其中的name属性值可自己按需填写,没有要求;commitMethod属性值是TC发送confirm指令后要调用的方法名;rollbackMethod属性值是TC发送cancel指令后要调用的方法名。

新增TCC测试Service实现类(TmInterface)

@Component
public class TmInterfaceImpl implements TmInterface {

    @Override
    @Transactional
    public String tm(BusinessActionContext businessActionContext) {
        // 查询是事务记录表,xxxx
        System.out.println("tm try");
        rm1();
        rm2();
        return null;
    }
    @Override
    @Transactional
    public boolean tmCommit(BusinessActionContext businessActionContext) {
        System.out.println("tm confirm");
        return true;
    }
    @Override
    @Transactional
    public boolean tmRollback(BusinessActionContext businessActionContext) {
        System.out.println("tm rollback");
        return true;
    }
    @Autowired
    private RestTemplate restTemplate;
    private void rm1() {
        restTemplate.getForEntity("http://seata-rm-one/rm1-tcc", null);
    }
    private void rm2() {
        restTemplate.getForEntity("http://seata-rm-two/rm2-tcc", null);
    }
}

使用RestTemplate调用其他服务的方法。

RM资源管理者(rm-one、rm-two)

Controller测试类添加方法

@Autowired
    private RmOneInterface rmOneInterface;
    @GetMapping("/rm1-tcc")
    @GlobalTransactional(rollbackFor = Exception.class)
    public String oneTcc(){
        rmOneInterface.rm1(null);
        return "success";
    }

注意@GlobalTransactional注解

新增TCC模式测试Service接口类

@LocalTCC
public interface RmOneInterface {

    @TwoPhaseBusinessAction(name = "tmTccAction" , commitMethod = "tmCommit" ,rollbackMethod = "tmRollback")
    public String rm1(BusinessActionContext businessActionContext);

    public boolean tmCommit(BusinessActionContext businessActionContext);

    public boolean tmRollback(BusinessActionContext businessActionContext);
}

与TM的接口类配置相同

新增TCC模式测试Service实现类

@Component
public class RmOneInterfaceImpl implements RmOneInterface {
    @Override
    @Transactional
    public String rm1(BusinessActionContext businessActionContext) {
        System.out.println("rm1 try");
        return null;
    }
    @Override
    @Transactional
    public boolean tmCommit(BusinessActionContext businessActionContext) {
        System.out.println("rm1 confirm");
        return true;
    }
    @Override
    @Transactional
    public boolean tmRollback(BusinessActionContext businessActionContext) {
        System.out.println("rm1 rollback");
        return true;
    }
}

与TM的实现类配置相同

代码执行测试

分布式事务执行成功

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

分布式事务执行失败

将rm-two系统关闭。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

标签:事务,SET,Seata,utf8,seata,NULL,id,分布式
来源: https://blog.csdn.net/m0_63164811/article/details/122582122

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有