ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

分布式事务——最终一致性的保证

2021-04-01 22:05:55  阅读:208  来源: 互联网

标签:事务 队列 springframework org 一致性 import 分布式 com stock


分布式事务可以使用seata实现,但是,对于高并发的场景,使用seata会感觉稍慢,尤其是对一致性要求不那么高的业务完全可以不需要使用seata,这时候,我们可以考虑最终一致性的方案。通过消息队列机制来保证最终一致性,即可。

思想:在MQ中新建两个队列,一个死信队列,一个普通队列,让同一个交换机绑定这两个队列,死信队列不处理任何消息,只是用于存放过期的消息,当指定的时间到了,自动路由到普通队列,在某个服务中单独监听这个队列的消息并处理该消息。

实现步骤:

1. 准备工作

1)引入MQ

<!-- 引入mq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)mq相关配置文件

# 配置MQ基本信息
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3)mq配置类(指定序列化机制、死信队列等的相关绑定关系)

package com.bjc.gulimall.ware.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * @描述:rabbitMQ配置类
 * @创建时间: 2021/3/15
 */
@Configuration
public class MyRabbitConfig {

    /* 使用JSON序列化对象 */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(queues = {"stock.release.stock.queue"})
    public void handle(Message msg){
        System.out.println(Thread.currentThread().getName() + "\t" + "" + msg);
    }

    /*
    * 配置绑定关系等信息
    * */
    // 1. 库存服务默认交换机(topic类型)
    @Bean
    public Exchange stockEventExchange(){
        // public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
        return topicExchange;
    }

    // 2. 普通队列
    @Bean
    public Queue stockReleaseStockQueue(){
        // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return new Queue("stock.release.stock.queue",true,false,false);
    }

    // 3. 创建延时队列
    @Bean
    public Queue stockDeadStockQueue(){
        // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","stock-event-exchange");     // 指定死信路由
        arguments.put("x-dead-letter-routing-key","stock.release");         // 指定死信路由键(消息死了,要通过哪个路由键交出去)
        arguments.put("x-message-ttl",120000);                               // 指定消息过期时间 单位毫秒
        return new Queue("stock.delay.queue",true,false,false,arguments);
    }

    // 4. 创建2个绑定关系
    //      4.1 绑定死信队列与交换机的关系
    @Bean
    public Binding deadBind(){
        // Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
        return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
    }

    //      4.2 绑定普通队列与交换机的关系
    @Bean
    public Binding releaseBind(){
        // Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
        return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
    }

}

4)开启rabbitMQ

启动类上添加如下注解

2. 实现逻辑

这里只是简单的罗列下,记录一下使用方法,具体逻辑具体分析即可

3. 测试

访问测试页面(略),查看MQ控制台,如图:

消息成功的到达指定的死信队列,数据库中也有记录对应的记录,如图;

等会的回退操作,就是根据这两张表来的。

4. 解锁操作

解锁涉及到了消息的消费,以及远程接口的调用,如果此次调用失败,需要重新执行,这时候,消息队列的消息已经被消费了,所以,会导致,重新解锁的时候没有消息了,因此,mq的ACK确认机制需要配置成手动确认。

在application.properties中添加如下配置:

# 手动ack模式开启
spring.rabbitmq.listener.simple.acknowledge-mode=manual

然后,新建一个监听,用于监听MQ的指定队列,代码如下:

package com.bjc.gulimall.ware.listener;

import com.alibaba.fastjson.TypeReference;
import com.bjc.common.to.mq.OrderTo;
import com.bjc.common.to.mq.StockDetailTo;
import com.bjc.common.to.mq.StockLockTo;
import com.bjc.common.utils.R;
import com.bjc.gulimall.ware.dao.WareSkuDao;
import com.bjc.gulimall.ware.entity.WareOrderTaskDetailEntity;
import com.bjc.gulimall.ware.entity.WareOrderTaskEntity;
import com.bjc.gulimall.ware.feign.OrderFeignService;
import com.bjc.gulimall.ware.service.WareOrderTaskDetailService;
import com.bjc.gulimall.ware.service.WareOrderTaskService;
import com.bjc.gulimall.ware.service.WareSkuService;
import com.bjc.gulimall.ware.vo.OrderVo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @描述:库存释放RabbitMQ监听器
 * @创建时间: 2021/3/30
 */
@RabbitListener(queues = {"stock.release.stock.queue"}) // stock.release.stock.queue
@Component
public class StockReleaseListener {

    @Autowired
    private WareSkuService WareSkuService;

    /*
     * 添加解锁库存的功能
     * 库存解锁的场景:
     *   1)下单成功,但是订单过期未支付,被系统自动(用户主动)取消了
     *   2)下单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存需要解锁(使用seata分布式事务太慢了,我们希望可以自动解锁)
     *
     *   注意:只要解锁库存的消息失败,一定要告诉mq服务器,此次解锁失败,消息不要删除,因此,需要设置消息的ACK为手动确认方式
     * */
    @RabbitHandler
    public void handleLockStockRelease(Message msg, StockLockTo stockLockTo, Channel channel) throws IOException {
        System.out.println("收到解锁库存的信息:");
        try{
            WareSkuService.unLockStock(stockLockTo);
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
        }
    }

    @RabbitHandler
    public void handlerOrderCloseRelease(Message msg, OrderTo orderTo, Channel channel) throws IOException {
        System.out.println("收到订单取消消息:" + orderTo);
        try{
            WareSkuService.unLockStock(orderTo);
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

注意:最终一致性的保证可以使用rabbitMQ的延时队列来完成,详情可以参考springBoot高级篇——消息中间件RabbitMQ的延时队列的用法

标签:事务,队列,springframework,org,一致性,import,分布式,com,stock
来源: https://blog.csdn.net/weixin_43318134/article/details/115312860

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有