ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

分布式事务与Seate框架(2)——Seata实践

2021-11-25 20:30:20  阅读:140  来源: 互联网

标签:Seate Seata 8091 16 seata --- 169.254 分布式 6.29


一、实践准备工作

1、框架介绍

实践主要是以“订单-库存-账户”系统演示,主要的框架图如下,图中各个部分充当的分布式事务角色已标明。

    

  具体流程:

  1)用户登录XXX商品购物系统(假设已有账户),

  2)点击购买某个商品,发起创建订单请求;

  3)检查购买商品的库存量,如果不够则创建订单失败提示库存不足;否则锁定该商品---->减少库存--->创建订单;

  4)订单创建成功后点击付款(或直接付款无需点击,实际上整个Demo中下单之后模拟立马支付,并不会点击付款);

  5)如果购买成功则对账户进行余额进行判断,余额足够则进行减扣,余额不够则进行提示说明

  6)返回购买成功失败提示说明。

2、项目结构

项目结构如下:

mvn package打包运行seata服务,即运行TC服务器(这里只展示单机)

初始化Seata库,导入sql脚本

二、代码实践

这里只展示关键代码,全部代码已提交gituhb:,有需要的小伙伴可以自行获取

1、“订单-库存-账户”服务

订单服务:

    TM(microService):seata-order-service

    RM(DB Resources):jdbc:mysql://127.0.0.1:3306/order

OrderService:


@GlobalTransactional // TM开启全局事务 @Transactional(rollbackFor = Exception.class) public void createOrder(Long productId, BigDecimal price){ // 这里模拟获取的是用户的账户ID // 通过上下文获取userId再获取accountId(单个账户) Long accountId = 1L; // 假设已经获取到了账户ID // 1.rpc调用库存微服务检查库存并减库存操作 Boolean deductStorageSuccess = storageFeignClient.deduct(productId); if (!deductStorageSuccess) { throw new RuntimeException("storage deduct failed!"); } // 2.创建订单 ProductOrder order = ProductOrder.builder() .productId(productId) .accountId(accountId) .payAmount(price) .build(); log.info("create order : {}", order); // 这里为了模拟回滚,所以先对价格的判断放到了创建订单之后,抛出runtime exception if (price.compareTo(BigDecimal.ZERO) < 0) { throw new NumberFormatException("product price must greater than zero!"); } orderMapper.insertSelective(order); // 3.rpc调用账户微服务对余额检查并扣款操作 Boolean deductAccountSuccess = accountFeignClient.deduct(accountId, price); if (!deductAccountSuccess) { throw new RuntimeException("account deduct failed!"); } // 4. 反馈结果 }

OrderController:


/** * 模拟创建订单 * @param productId * @param price * @return */ @PostMapping("/create") public String create(Long productId, BigDecimal price){ try { orderService.createOrder(productId, price); } catch (Exception e) { log.error("order failed: ", e); return "order failed"; } return "order success"; }

调用的Feign:


@FeignClient(name="seata-account-service") public interface AccountFeignClient { @PostMapping("/account/deduct") Boolean deduct(@RequestParam("accountId") Long accountId, @RequestParam("payAmount") BigDecimal payAmount); } @FeignClient(name="seata-storage-service") public interface StorageFeignClient { @PostMapping("/storage/deduct") Boolean deduct(@RequestParam("productId") Long productId); }

库存服务:

    microService:seata-storage-service

    RM(DB Resources):jdbc:mysql://127.0.0.1:3306/storage

StorageService


public Boolean deduct(Long productId){ // 这里先检查有没有库存了, 生产环境下这里是需要for update数据库锁,或者分布式锁 Repo repoFromDB = repoMapper.selectByPrimaryKey(productId); if (repoFromDB == null) { throw new RuntimeException("product not exist!"); } // 对库存减一 int afterCount = repoFromDB.getAmount()-1; // 没有库存剩余了 if (afterCount < 0) { throw new RuntimeException("product storage is no remaining!"); } Repo repo = Repo.builder() .id(productId) .amount(afterCount) .build(); repoMapper.updateAmount(repo); log.info("deduct product[{}] storage, current amount is {}", productId, afterCount); return true; }

StorageController:


/** * 模拟对商品库存减一 * @param productId * @return */ @PostMapping("/deduct") public Boolean deduct(Long productId){ try { storageService.deduct(productId); } catch (Exception e) { return false; } return true; }

账户服务

    microService:seata-account-service

    RM(DB Resources):jdbc:mysql:127.0.0.1/account

AccountService:


public void deduct(Long accountId, BigDecimal payAmount){ // 这里先检查有没有账户存在, 生产环境下这里是需要for update数据库锁,或者分布式锁 UserAccount userAccountFromDB = userAccountMapper.selectByPrimaryKey(accountId); if (userAccountFromDB == null) { throw new RuntimeException("account not exist!"); } // 检查余额是否足够 BigDecimal afterBalance = userAccountFromDB.getBalance().subtract(payAmount); if (afterBalance.compareTo(BigDecimal.ZERO) < 0) { throw new RuntimeException("the balance is not enough!"); } UserAccount userAccount = UserAccount.builder() .id(accountId) .balance(afterBalance) .build(); log.info("deduct account[{}] , current balance is {}", accountId, afterBalance); userAccountMapper.updateBalance(userAccount); }

AccountController:


/** * 模拟账户扣款 * @param accountId * @param payAmount * @return */ @PostMapping("/deduct") public Boolean deduct(Long accountId, BigDecimal payAmount){ try { accountService.deduct(accountId, payAmount); } catch (Exception e) { return false; } return true; }

2、Seata服务器,即TC角色

  首先初始化seata的sql脚本(sql脚本参考官方wiki),并开启seata库,之后开启Seata Server(具体的配置与启动前nacos配置,事务分组等相关概念请参考官方wiki)

    

3、检查Nacos服务与配置列表

  微服务模块启动后快速注册到dev命名空间下的SEATA_GROUP分组,此时TM、RM、TC都已经具备 

       

启动微服务模块后可以看到日志输出,说明启动成功并且已经成功注册


RM will register :jdbc:mysql://127.0.0.1:3306/account
nacos registry, SEATA_GROUP seata-account-service 192.168.99.1:6009 register finished Started SeataAccountApplication in 30.115 seconds (JVM running for 33.158) ....... NettyPool create channel to transactionRole:TMROLE,address:169.254.6.29:8091,msg:< RegisterTMRequest{applicationId='seata-account-service', transactionServiceGroup='my_test_tx_group'} > register TM success. client version:1.4.0, server version:1.4.0,channel:[id: 0xa77dc065, L:/169.254.6.29:52794 - R:/169.254.6.29:8091] register success, cost 4 ms, version:1.4.0,role:TMROLE,channel:[id: 0xa77dc065, L:/169.254.6.29:52794 - R:/169.254.6.29:8091]

三、运行测试

1、模拟购买支付成功情况

运行启动所有的微服务后,在TC Serve的日志可以看到所有的TM、RM都已经注册了

此时productId=1库存还剩998

accountId=1的用户余额还剩1000元

 接下来就是模拟用户购买商品环节,调用http://localhost:6008/order/create,表示用户想买商品ID=1,价格为12.25的商品

清空日志,并发起请求查看日志:


16:10:45.167 INFO --- [rverHandlerThread_1_4_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136139747123908608 16:10:45.964 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/storage,lockKey=repo:1 ,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:10:46.086 INFO --- [rverHandlerThread_1_5_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139750928142336, resourceId = jdbc:mysql://127.0.0.1:3306/storage ,lockKeys = repo:1 16:10:46.788 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/account,lockKey=user_account:1 ,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:10:46.918 INFO --- [rverHandlerThread_1_6_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139754342305793, resourceId = jdbc:mysql://127.0.0.1:3306/account ,lockKeys = user_account:1 16:10:47.015 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/order,lockKey=product_order:6,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:10:47.073 INFO --- [rverHandlerThread_1_7_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139755294412801, resourceId = jdbc:mysql://127.0.0.1:3306/order ,lockKeys = product_order:6 16:10:47.184 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136139747123908608,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:10:48.084 INFO --- [ AsyncCommitting_1_1] io.seata.server.coordinator.DefaultCore : Committing global transaction is successfully done, xid = 169.254.6.29:8091:136139747123908608. 16:10:53.908 INFO --- [ TxTimeoutCheck_1_1] i.s.s.coordinator.DefaultCoordinator : Global transaction[169.254.6.29:8091:136139530647490560] is timeout and will be rollback. 16:10:54.947 INFO --- [ RetryRollbacking_1_1] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136139530647490560.

从日志中我们可以看到:

1)全局事务XID已经生成,各个分支注册成功,

2)branchId也已经生成并在全局事务XID下,资源已被锁住

3)全局事务提交成功

查看此时的库存与余额,都已经进行了减扣

2、模拟库存不足情况

修改productId=1的商品库存为0:

再次发起请求,查看TC Server日志,可以查出明显发生了全局事务的回滚


16:20:24.258 INFO --- [verHandlerThread_1_12_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136142176250875904 16:20:24.279 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136142176250875904,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:20:24.420 INFO --- [verHandlerThread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136142176250875904.

查看库存与余额情况,库存仍然是0,余额仍然是987.75

3、模拟余额不足情况

修改accountId=1的账户余额小于12.25

再次发起请求,查看日志


16:27:41.811 INFO --- [verHandlerThread_1_14_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136144011456008192 16:27:41.836 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136144011456008192,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/storage,lockKey=repo:1 ,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:27:41.889 INFO --- [verHandlerThread_1_15_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136144011456008192, branchId = 136144011762192385, resourceId = jdbc:mysql://127.0.0.1:3306/storage ,lockKeys = repo:1 16:27:42.088 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136144011456008192,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group 16:27:42.632 INFO --- [verHandlerThread_1_16_500] io.seata.server.coordinator.DefaultCore : Rollback branch transaction successfully, xid = 169.254.6.29:8091:136144011456008192 branchId = 136144011762192385 16:27:42.754 INFO --- [verHandlerThread_1_16_500] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136144011456008192.

不同于库存不足的情况的是,这里库存服务分支事务是先注册TC Server的,因为有异常的并不是库存服务,需要注意的是因为我模拟的是下单之后立马支付,支付失败的话订单也是不会存在,实际生活中应该是订单显示“支付失败”。

查看库存与余额情况,库存仍然是997,余额仍然是10.75

标签:Seate,Seata,8091,16,seata,---,169.254,分布式,6.29
来源: https://blog.csdn.net/weixin_46217160/article/details/121546462

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有