ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

07_ByteTCC源码分析 0.4.x版本

2021-06-14 20:59:23  阅读:218  来源: 互联网

标签:事务 服务 07 业务 0.4 try 源码 请求 ByteTCC


07_ByteTCC源码分析 0.4.x版本

一. 说明

这篇博客仅供我自己复习使用,不保证内容100%正确,使用的是ByteTCC 0.4.x版本,基于Spring Cloud。
github地址: https://github.com/liuyangming/ByteTCC/wiki

反正是写给我自己的,就不去写如何使用了,以后若是想用,直接参考官网提供的Demo即可。

二. ByteTCC启动时做了哪些事情?

首先,随着项目启动,Spring会读取在启动类上使用的@ImportResource注解,将bytetcc-supports-springcloud.xml中配置的所有Bean创建、导入至Spring容器。ByteTCC自定义了数量众多的类,值得注意的类有:

  1. CompensableCoordinatorController
    提供了prepare、commit、forget、recover、rollback等接口,这些接口不是用来给我们调用的,而是留给ByteTCC的TM来调用,实现对分布式事务的控制。
  2. CompensableAnnotationValidator
    用于项目启动后,扫描和验证当前项目中有哪些类上使用了@Compensable注解。
  3. CompensableFeignBeanPostProcessor
    为FeignInvocationHandler做了一个动态代理。还记得Feign为远程调用的服务分别做了一个匿名的动态代理类,所有对远程服务的调用请求都会被FeignInvocationHandler拦截下来,而ByteTCC在FeignInvocationHandler的基础上,又做了一层拦截,把对Feign的所有操作都给拦截下来,对应的拦截类是CompensableFeignHandler,那么想都不用想,对远程服务的调用肯定是先走CompensableFeignHandler,再走FeignInvocationHandler,最后由FeignLoadBalancer根据负载均衡算法,通过Ribbon提供的ServerOperation把请求发给远程服务。

接着,除了创建对象外,ByteTCC对数据源也进行了封装,其实就是对原生的Datasource做了一个动态代理,所有对原生Datasource的请求会先走LocalXADatasource(由ByteTCC提供),底层再走原生Datasource的方法。(当然了,我们自己也可以做DruidDatasource,套在原生Datasource上,外面再套一层LocalXADatasource) 。

最后,伴随着项目的启动,ByteTCC还创建了一堆后台线程,比较重要的线程有:

  1. CompensableWork
    用于系统启动后尝试恢复事务,以及运行期间不断的尝试恢复中断的事务。
  2. CleanupWork
    做一些清理工作,比如RecoveredResource的forget()方法,也就是delete from bytejta where xid = ?删除bytejta表中已经执行完毕的事务对应的记录。
  3. SamleTransactionLogger
    记录分布式事务执行时的日志

三. 接收到外部请求后,主业务服务是如何向从业务服务发起try请求的?

接收到外部请求后,主业务服务主要经历了以下几个步骤:

3.1 ByteTCC框架提供的CompensableHandlerInterceptor preHandle( )

首次接收到请求,这个拦截器并没有什么用,它主要是用来解析header中存放的分布式事务的信息,由于现在是首次接到请求,所以请求中根本就没有分布式事务的信息。

3.2 ByteTCC框架提供的CompensableMethodInterceptor invoke()

ByteTCC为被@Compensable修饰的类做了动态代理,现在外部请求想要调用这个类的方法,那么请求当然会被拦截。这里其实就是在开启一个分布式事务,创建分布式事务的id,以及分布式事务的上下文(TransactionContext)等等。

3.3 spring-tx提供的TransactionInterceptor invoke( )

被调用的方法肯定会被@Transactional修饰,所以spring-tx肯定会做一个动态代理,用于实现对这个方法的事务管理。TransactionInterceptor 的invoke()方法,老生常谈了,不就是先创建事务,然后执行业务逻辑,有报错执行回滚,没报错事务提交么。这里就是创建了一个本地事务,接着执行业务逻辑。想想看,主业务服务的本地业务逻辑被执行,这个方法内会去调用从业务服务的方法,只要涉及到调用远程服务的方法,必然要走Feign的动态代理,走Ribbon,所以接下来请求一定会被CompensableFeignHandler给拦截下来。

4. ByteTCC框架提供的CompensableFeignHandler invoke( )

这个方法内代码一大坨,比如重构了负载均衡器,核心代码就一句: this.delegate.invoke(proxy, method, args);delegate就是HardCodedTarget,这玩意不就是Feign底层用来发送http请求的么,请求发送的过程学了很多遍,不赘述了。

但值得注意的是,在真正的发送请求之前,会走CompensableInterceptorImpl的beforeSendRequest( )方法,这里面有一句非常重要的代码

boolean participantEnlisted = transaction.enlistResource(descriptor);

这个方法会创建一个分布式事务的子事务(XAResourceArchive),每一个子事务对应着一个从业务服务的请求调用,最后放入resourceList内。本次主业务服务的方法内,所有对从业务服务的请求调用,都会生成一个子事务,存入resourceList。

这个resourceList非常重要,并且需要注意的是,此时try请求还没有发送出去!

四. 从业务服务try请求执行报错了怎么办?

4.1 主业务服务如何处理从业务服务的响应结果?

还是要回到CompensableFeignHandlerde的invoke( )。这里面有两个变量: participantEnlistFlag和participantDelistFlag。虽然我没有细看Feign底层发起http请求至从业务服务,解析响应结果,并填入CompensableTransactionImpl的participantEnlistFlag和participantDelistFlag内的具体过程,但是通过打断点,可以大概猜测出含义:
1.participantEnlistFlag
这个东西就是用于标识,本次分布式子事务是否需要加入分布式事务,它是一个布尔值,第一次调用从业务服务的方法时,显然就是true了。加入的地方就是resourceList。
2. participantDelistFlag
这个东西就是用于标识,是否需要从分布式子事务的列表中,移除当前分布式子事务。

我猜测ByteTCC就是用participantDelistFlag这个变量来控制下方的代码,实现是否需要在resourceList中,移除当前分布式子事务。如果从业务服务的try执行失败了,那么participantDelistFlag就等于true,那么就执行remove方法。

这里就是利用participantDelistFlag来移除分布式子事务。比如说,本次请求从业务服务的try方法执行失败了,显然就需要把这个从业务服务对应的分布式子事务从resourceList给移除掉。(写的思路有点儿跳跃,这个地方移除子事务是有用意的,毕竟resourceList剩余的子事务肯定是try执行成功的,现在有一个try执行失败了,那显然剩下来的子事务就可以用来发送cancel请求了)

下方的代码来自CompensableTransactionImpl的delistResource()方法,XAResource.TMSUCCESS代码代表分布式子事务执行成功,XAResource.TMFAIL代表执行失败。

public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
	省略.. 
	if (RemoteResourceDescriptor.class.isInstance(xaRes)) {
		省略.. 
		XAResourceArchive archive = this.resourceMap.get(identifier);
		if (flag == XAResource.TMFAIL) {
			this.resourceMap.remove(identifier);
			if (archive != null) {
				this.resourceList.remove(archive);
			} // end-if (archive != null)

			compensableLogger.updateTransaction(this.getTransactionArchive());
		} else {
			if (archive != null) {
				this.applicationMap.put(resource.getApplication(), archive);
			}
		}
	}
	return true;
}

4.2 如果发现从业务服务的try执行出错,主业务服务做了哪些操作?

如果发现某个从业务服务的try执行出错,主业务服务会执行CompensableTransactionImpl内的方法,无非就是两个步骤:

  1. 回滚主业务服务的本地事务 对应fireNativeParticipantCancel( )
  2. 回滚从业务服务的事务 对应fireRemoteParticipantCancel( )

代码逻辑都很简单,值得注意的就一点,那就是需要回滚哪些从业务服务的事务呢?

这里就需要借助之前一直强调的resourceList了,首先,被调用了try接口的服务都会放到resourceList中,接着会把那些执行报错的从业务服务给移除掉,那么resourceList剩下的从业务服务,一定都是try执行成功的,我们需要回滚的就是这些从业务服务执行的try逻辑。

此时,主业务服务会循环遍历resourceList,取出每一个从业务服务,借助SpringCloudCoordinator(这个东西也是ByteTCC自己写的),通过RestTemplate等组件发送cancel请求至从业务服务。

问:为什么不需要回滚try执行报错的从业务服务的事务呢?

答:由于从业务服务自己的try方法上一定会加@Transactional,所以从业务服务借助JTA自己就可以回滚本地事务了,根本就不需要主业务服务来帮忙。

4.3 从业务服务接收到cancel请求后,是如何处理的?

从业务服务接收请求的controller是ByteTCC提供的,叫做CompensableCoordinatorController,它的执行逻辑也很简单,首先解析请求头,反序列化出分布式事务的具体信息,接着根据启动时扫描获取的@Compensable注解信息,找到cancellableKey属性的值,然后在spring容器内找到对应的类,最后调用对应的方法,完成cancel逻辑。

五. 如果某个从业务服务的try执行失败,主业务服务如何发起cancel请求?

六. 所有从业务服务try执行成功后,主业务服务如何调用confirm的?

七. 如果confirm或cancel执行失败,主业务服务是如何不断重试的?

八. 如果tcc分布式事务执行到一半,系统宕机了,重启后是如何恢复事务的?

九. 链式调用是如何实现的?

还没写完…

标签:事务,服务,07,业务,0.4,try,源码,请求,ByteTCC
来源: https://blog.csdn.net/miaomiao19971215/article/details/117911483

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有