ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

彻底解决分布式环境下Redisson消息队列监听重复执行问题

2022-03-03 11:34:17  阅读:213  来源: 互联网

标签:redisson idLock 监听 Redisson org import com idBucket 分布式


问题现象:测试环境单台部署,没有问题,生产环境多台部署订单都是2条重复数据。

问题描述:我们把每个服务都部署了2台,订单产生后,有redisson的mq发布,如果MQListener监听到就会执行后面的业务逻辑。现实的问题是2台MQListener都会监听到,会重复处理我们的逻辑,插入数据库或修改数据库或写入ES等都会执行2遍。

本文的DEMO中使用的是redisson的mq来测试的,同时RabbitMQ,ActiveMQ,RocketMQ也会有同样的问题,处理逻辑大家可以参照,应该都是大同小异。

解决方法:redisson公平锁【谁先抢到谁先锁,其余需要等待】加锁处理,只需要一台来处理。

------------------------------------------------核心代码----------------------------------------------------------

1、pom依赖及redisson插件

<!-- 插件地址https://gitee.com/ztp/redisson-spring-boot-starter -->
<dependency>
  <groupId>com.zengtengpeng</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>1.0.8</version>
</dependency>

2、application.yml配置文件

// 集群配置,先进先出原则
redisson:
  multiple-server-config:
    node-addresses[0]: 192.168.1.57:7000
    node-addresses[1]: 192.168.1.57:7001
    node-addresses[2]: 192.168.1.117:7000
    node-addresses[3]: 192.168.1.57:7002
    node-addresses[4]: 192.168.1.117:7001
    node-addresses[5]: 192.168.1.117:7002
    loadBalancer: org.redisson.connection.balancer.RoundRobinLoadBalancer
    readMode: MASTER
    subscriptionMode: MASTER
  password: redismima123
  model: CLUSTER




// 单节点配置
redisson:
  singleServerConfig:
    address: 192.168.1.119:6380
    database: 3
  password: redisMiMa123
  model: SINGLE

3、springboot启动程序

package com;

import com.zengtengpeng.annotation.EnableMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;


@Slf4j
// 这里的注解要加上,不然MQ不生效
@EnableMQ
public class OrderJobApplication {

	public static void main(String[] args) {
		SpringApplication.run(OrderJobApplication.class, args);
		log.info("OrderJob项目启动成功!!!");
	}

}

4、mq生产者

package com.services.impl;

/**
 *
 * @author: renkai721@163.com
 * @date: 2021年09月16日 20:41:54
 * @description:
 */

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.pagehelper.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.*;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {


    @Resource
    private RedissonClient redissonClient;
   

    @Override
    public void createOrder(OrderParamReqVo vo)  {
        // 虽然order的服务也部署了2台,但是从用户点击创建订单到后台网关,
        // 所有的nginx都只会转发到一台机器上,所以生产者不需要单独处理。
        Long orderId = 0L;
        // 查询ID
        RLock idLock = redissonClient.getFairLock("orderIdLock");
        try{
            idLock.lock();
            RBucket<Long> idBucket = redissonClient.getBucket("orderId");
            synchronized (idBucket){
                Long redisId = idBucket.get();
                if(redisId != null){
                    redisId+=1;
                }else{
                    redisId = 1L;
                }
                orderId = redisId;
                idBucket.set(redisId);
                log.info("生成的的orderId="+orderId);
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            idLock.unlock();
        }
        // topic名字和监听中的名字要一致,写法也有很多,大家按照自己喜欢的方式去写
        RTopic orderMq = redissonClient.getTopic("orderTopic");
        OrderParamRespVo obj = OrderParamRespVo.builder()
                .id(orderId)
                .userId(vo.getUserId())
                .status(0)
                .createTime(new Date())
                .build();
        orderMq.publish(obj);
        log.info("order订单的MQ生成了,快快接收处理吧,obj={}",obj);
    }

  

}

5、消费者

package com.mq.listener;

import com.alibaba.druid.util.StringUtils;
import com.zengtengpeng.annotation.MQListener;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.TimeUnit;

/**
 *
 * @Description:
 * @author: renkai721@163.com
 * @date: 2021年09月19日 11:28 上午
 */

@Component
@Slf4j
public class OrderMqListener {

    @Resource
    private RedissonClient redissonClient;


    @MQListener(name = "orderTopic")
    public void orderTopicSave(CharSequence charSequence, OrderParamRespVo vo, Object object){
        // 这里的MQListener大家要注意,部署了3台,那么3台都会监听,
        // 如果有数据下发,3台会同时触发。
        String value = getProcessId();
        log.info("value={}",value);
        RBucket<String> idBucket = redissonClient.getBucket("mqOrderId"+vo.getOrderId);
        // redisson公平锁,谁先锁住谁使用
        RLock idLock = redissonClient.getFairLock("mqLockOrderId"+vo.getOrderId);
        // 锁2秒,其余的处于等待,2秒后锁会自动解锁,也就是finally不需要单独处理
        idLock.lock(2,TimeUnit.SECONDS);
        try {
            if(idLock.isLocked()){
                synchronized (idBucket) {
                    if(StringUtils.isEmpty(idBucket.get())){
                        // 这里的逻辑是使用了机器的进程ID+机器名来判断唯一标识的
                        // 如果最简单的就是idBucket.set("1");
                        // 只要idBucket有值就说明有人已经锁住在处理了。
                        idBucket.set(value,5, TimeUnit.MINUTES);
                    }
                    log.info("idBucket.get()={}",idBucket.get());
                }
            }
            if(value.equals(idBucket.get())){
                log.info("让我来处理吧,其它小伙伴休息一下吧!");
                // 自己的写库或写redis逻辑处理
                OrderDao.save(vo);
                log.info("orderJob MQ收到消息, 处理完毕。");
                idBucket.delete();
            }else {
                log.info("已经有人处理啦");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(idLock.isLocked() && idLock.isHeldByCurrentThread()){
                idLock.unlock();
            }
        }

    }

    public static final String getProcessId() {
        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
        return runtimeMXBean.getName();
    }

}

标签:redisson,idLock,监听,Redisson,org,import,com,idBucket,分布式
来源: https://blog.csdn.net/renkai721/article/details/123247198

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有