ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

阿里云RocketMQ定时/延迟消息队列实现

2021-08-04 22:33:17  阅读:175  来源: 互联网

标签:return String 队列 private msg import public RocketMQ 延迟


新的阅读体验:http://www.zhouhong.icu/post/157

一、业务需求

  需要实现一个提前二十分钟通知用户去做某件事的一个业务,拿到这个业务首先想到的最简单得方法就是使用Redis监控Key值:在排计划时候计算当前时间与提前二十分钟这个时间差,然后使用一个唯一的业务Key压入Redis中并设定好过期时间,然后只需要让Redis监控这个Key值即可,当这个Key过期后就可以直接拿到这个Key的值然后实现发消息等业务。

  关于Redis实现该业务的具体实现在之前我已经记过一篇笔记,有兴趣的可以直接去瞅瞅,但是现在感觉有好多不足之处。

       Redis实现定时: http://www.zhouhong.icu/post/144

二、Redis实现定时推送等功能的不足之处

  由于Redis不止你一个使用,其他业务也会使用Redis,那么最容易想到的一个缺点就是:1、如果在提醒的那一刻有大量的其他业务的Key也过期了,那么就会很长时间都轮不到你的这个Key,就会出现消息推送延迟等缺点;2、还有一个缺点就是像阿里云他们的Redis根本就不支持对 Redis 的 Key值得监控(我也是因为公司使用阿里云的Redis没法对Key监控才从之前使用Redis监控转移到使用RocketMQ的延时消息推送的。。。)

三、阿里云RocketMQ定时/延迟消息队列实现

  其实在实现上非常简单

1、首先去阿里云控制台创建所需消息队列资源,包括消息队列 RocketMQ 的实例、Topic、Group ID (GID),以及鉴权需要的 AccessKey(AK),一般公司都有现成的可以直接使用。
2、在springboot项目pom.xml添加需要的依赖。
<!--阿里云MQ TCP-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.7.1.Final</version>
</dependency>
3、在对应环境的application.properties文件配置参数
console:
  rocketmq:
    tcp:
      accessKey: XXXXXXXX使用自己的
      secretKey: XXXXXXXXXXXXX使用自己的
      nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的
      topic: XXXXXXX使用自己的
      groupId: XXXXXXX使用自己的
      tag: XXXXXXXXX使用自己的
4、封装MQ配置类
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.Properties;
/**
* @Description: MQ配置类
* @Author: zhouhong
* @Date: 2021/8/4
*/
@Configuration
@EnableConfigurationProperties({PatrolMqConfig.class})
@ConfigurationProperties(prefix = "console.rocketmq.tcp")
@Primary
public class PatrolMqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
    public String getAccessKey() {
        return accessKey;
    }
    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }
    public String getSecretKey() {
        return secretKey;
    }
    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }
    public String getNameSrvAddr() {
        return nameSrvAddr;
    }
    public void setNameSrvAddr(String nameSrvAddr) {
        this.nameSrvAddr = nameSrvAddr;
    }
    public String getTopic() {
        return topic;
    }
    public void setTopic(String topic) {
        this.topic = topic;
    }
    public String getGroupId() {
        return groupId;
    }
    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
    public String getTag() {
        return tag;
    }
    public void setTag(String tag) {
        this.tag = tag;
    }
    public String getOrderTopic() {
        return orderTopic;
    }
    public void setOrderTopic(String orderTopic) {
        this.orderTopic = orderTopic;
    }
    public String getOrderGroupId() {
        return orderGroupId;
    }
    public void setOrderGroupId(String orderGroupId) {
        this.orderGroupId = orderGroupId;
    }
    public String getOrderTag() {
        return orderTag;
    }
    public void setOrderTag(String orderTag) {
        this.orderTag = orderTag;
    }
}
5、配置生产者
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PatrolProducerClient {

    @Autowired
    private PatrolMqConfig mqConfig;
    @Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}
6、消费者订阅
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
@Slf4j
public class PatrolConsumerClient {

    @Autowired
    private PatrolMqConfig mqConfig;

    @Autowired
    private MqTimeMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.set);
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置
        consumerBean.setSubscriptionTable(subscriptionTable);
        System.err.println("订阅成功!");
        return consumerBean;
    }
}
7、定时延时MQ消息监听消费
/**
 * @Description: 定时/延时MQ消息监听消费
 * @Author: zhouhong
 * @Create: 2021-08-03 09:16
 **/
@Component
public class MqTimeMessageListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public Action consume(Message message, ConsumeContext context) {
        System.err.println("收到消息啦!!");
        logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));
        try {
            String msgTag = message.getTag(); // 消息类型
            String msgKey = message.getKey(); // 业务唯一id
            switch (msgTag) {
                case "XXXX":
                    // TODO 具体业务实现,比如发消息等操作
                    System.err.println("推送成功!!!!");
                    break;
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
            //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
            return Action.ReconsumeLater;
        }
    }
}
8、封装一个发延时/定时消息的工具类
/**
 * @Description: MQ发送消息助手
 * @Author: zhouhong
 * @Create: 2021-08-03 09:06
 **/
@Component
public class ProducerUtil {
    private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
    @Autowired
    private PatrolMqConfig config;
    @Resource(name = "ConsoleProducer")
    ProducerBean producerBean;
    public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
        Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
        msg.setStartDeliverTime(delayTime);
        return this.send(msg,Boolean.FALSE);
    }
    /**
     * 普通消息发送发放
     * @param msg 消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg,Boolean isOneWay) {
        try {
            if(isOneWay) {
                //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producerBean.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            }else {
                //可靠同步发送
                SendResult sendResult = producerBean.send(msg);
                //获取发送结果,不抛异常即发送成功
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                }else {
                    error(msg,null);
                    return null;
                }
            }
        } catch (Exception e) {
            error(msg,e);
            return null;
        }
    }
    private ExecutorService threads = Executors.newFixedThreadPool(3);
    private void error(Message msg,Exception e) {
        logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
        logger.error("errorMsg --- {}",e.getMessage());
    }
    private void success(Message msg,String messageId) {
        logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
    }
}
9、接口测试(10000表示延迟10秒,可以根据自己的业务计算出)
// 测试MQ延时
    @Autowired
    ProducerUtil producerUtil;
    @PostMapping("/patrolTaskTemp/mqtest")
    public void mqTime(){
        producerUtil.sendTimeMsg(
                "SMARTPATROL",
                "你好鸭!!!".getBytes(),
                "红红火火恍恍惚惚!!",
                System.currentTimeMillis() + 10000
        );
    }
10、结果
2021-08-04 22:07:12.677  INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil     : 发送MQ消息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:红红火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鸭!!!
收到消息啦!!
推送成功!!!!
2021-08-04 22:07:22.179  INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener  : 接收到MQ消息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:红红火火恍恍惚惚!!, body:你好鸭!!!

标签:return,String,队列,private,msg,import,public,RocketMQ,延迟
来源: https://www.cnblogs.com/Tom-shushu/p/15101109.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有