ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java 实现RabbitMq延时队列和死信队列

2021-04-26 18:02:00  阅读:187  来源: 互联网

标签:Java 队列 死信 交换机 延时 import public


延时队列:实际是不存在直接可用的延时队列,可通过死信消息和死信队列来实现延时队列的功能。

死信交换机: DLX 全称(Dead-Letter-Exchange)。其实它也是一个普通的交换机,但它是设置在队列上某个参数的值对应的交换机。

死信队列:如果某个队列上存在参数:x-dead-letter-exchange, 当这个队列里的消息变成死信消息(dead message)后会被重新Pushlish到 x-dead-letter-exchange 所对应参数值的交换机上,跟这个交换机所绑定的队列就是死信队列。

死信消息

  1. 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  2. 消息TTL过期
  3. 队列达到了最大的长度时

过期消息:RabbitMq 有两种设置消息过期的方式:

  1. 创建队列时通过 x-message-ttl 参数指定该队列消息的过期时间,这种队列里的消息过期时间全部相同。
  2. 生产者Pushlish消息时,通过设置消息的 expiration 参数指定过期时间,每个消息的过期时间都不一样。

  如果两者同时使用,过期时间按照小的一方为准,两种方式设置的时间都是 毫秒。

 

应用场景:延时队列的应用场景很多,在我的项目开发中也涉及到很多,例如:订单五分钟未支付自动取消、订单准备超时30分钟推送提醒给门店、订单完成后两小时推送评价邀请给用户等等,这些间隔指定时间后的操作都可以使用延时队列。

上一篇文章:Java 简单操作 RabbitMq 介绍了RabbitMq的基本操作,要引入的包和配置可以参考上一篇文章。这里就利用RabbitMq的死信队列直接来实现延时队列的功能。

 

首先创建一个自动加载类利用Bean在项目启动时,自动创建延时和死信交换机/延时和死信队列,并将创建好的队列绑定在对应的交换机上。如果交换机和队列存在的情况下,则不会创建或更新。 这一步可减少手动或忘记创建队列带来的麻烦:

package com.demo.www.rabbitmq.config;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * RabbitMq 延时队列实现
 * @author AnYuan
 */

@Slf4j
@Configuration
public class DelayQueueConfig {

    /**
     * 延迟队列
     */
    public static final String DELAY_EXCHANGE = "delay.queue.business.exchange";
    public static final String DELAY_QUEUE = "delay.queue.business.queue";
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey";

    /**
     * 死信队列
     */
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange";
    public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey";
    public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";

    /**
     * 声明 死信交换机
     * @return deadLetterExchange
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明 死信队列 用于接收死信消息
     * @return deadLetterQueueA
     */
    @Bean
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     *  将 死信队列 绑定到死信交换机上
     * @return deadLetterBindingA
     */
    @Bean
    public Binding deadLetterBindingA() {
        return BindingBuilder
                .bind(deadLetterQueueA())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_QUEUE_ROUTING_KEY);
    }

    /**
     * 声明 延时交换机
     * @return delayExchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }

    /**
     * 将 延时队列 绑定参数
     * @return Queue
     */
    @Bean
    public Queue delayQueueA() {
        Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3);
        // 队列绑定DLX参数(关键一步)
        maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 队列绑定 死信RoutingKey参数
        maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
        // 消息过期采用第一种设置队列的 ttl 时间,消息过期时间全部相同。 单位:毫秒,这里设置为8秒
        maps.put("x-message-ttl", 8000);
        return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
    }

    /**
     * 将 延时队列 绑定到延时交换机上面
     * @return delayBindingA
     */
    @Bean
    public Binding delayBindingA() {
        return BindingBuilder
                .bind(delayQueueA())
                .to(directExchange())
                .with(DELAY_QUEUE_ROUTING_KEY);
    }
}

 这里我们定义一个RabbitMq服务接口:

package com.demo.www.service;

/**
 * rabbiMq服务
 * @author AnYuan
 */

public interface RabbitMqService {

    /**
     * 统一发送mq
     *
     * @param exchange   交换机
     * @param routingKey 路由key
     * @param msg       消息
     * @param ttl       过期时间
     */
    void send(String exchange, String routingKey, String msg, Integer ttl);
}

服务接口的实现类:

package com.demo.www.service.impl;

import com.demo.www.service.RabbitMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * rabbitmq服务
 * @author AnYuan
 */

@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(String exchange, String routingKey, String msg, Integer ttl) {
        MessageProperties messageProperties = new MessageProperties();
        // 第二种方式设置消息过期时间
        messageProperties.setExpiration(ttl.toString());
        // 构建一个消息对象
        Message message = new Message(msg.getBytes(), messageProperties);
        // 发送RabbitMq消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

接着创建一个测试类进行接口测试:

package com.demo.www.service.impl;

import com.google.common.collect.Maps;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import com.demo.www.service.RabbitMqService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.Map;
@Slf4j
@SpringBootTest class RabbitMqServiceImplTest { @Autowired private RabbitMqService rabbitMqService; @Test public void sendTest() { 
      // 手动指定消息过期时间
        int ttl = 10000;

        Map<String, Object> msgMap = Maps.newHashMapWithExpectedSize(3);
        msgMap.put("msg", "Hello RabbitMq");
        msgMap.put("time", LocalDateTime.now());
        msgMap.put("ttl", ttl);

        // 注意这里发送的交换机是 延时交换机
        rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY, JSONObject.toJSONString(msgMap), ttl);
     log.info("消息发送成功:{}", JSONObject.toJSONString(msgMap)); } }

 以上准备就绪后,延时队列其实已经实现了,来看一下项目启动后的情况

在RabbitMq的管理后台,可以看到自动创建的交换机

 

自动创建的队列,在延时队列的Features栏可以看到有: TTl、DLX、DLK。它们分别代表:(x-message-ttl):设置队列中的所有消息的生存周期,也就是过期时间;(x-dead-letter-exchange)绑定了死信交换机,死信消息会重新推送到指定交换机上而不是丢掉;(x-dead-letter-routing-key):死信消息推送到交换机上指定路由键的队列中,也就是说绑定了RoutingKey;

当运行测试类后会显示发送成功:

首先会看到延时队列里面产生了一条数据:

 

8秒后消息变成死信消息,同时会推送到死信队列里面:

这样就实现了延时队列。最后只需要创建一个消费者,消费死信队列里面的消息,注意是消费死信队列!

package com.demo.www.rabbitmq.consumers;

import com.alibaba.fastjson.JSONObject;
import com.demo.www.rabbitmq.config.DelayQueueConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延时队列消息消费者
 *
 * @author AnYuan
 * @date 2020-06-23
 */

@Component
@Slf4j
public class DelayMsgConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
            exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
    public void queueAConsumer(Message message) {

        Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
        LocalDateTime now = LocalDateTime.now();
        Duration duration = Duration.between(msg.getTime(), now);

        log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{},  相差时间:{}秒,消息设置的ttl:{}",
                JSONObject.toJSONString(msg),
                localDateTimeToString(msg.getTime()),
                localDateTimeToString(now),
                duration.getSeconds(),
                msg.getTtl());
    }

    @Data
    public static class Msg {
        private String ttl;
        private String msg;
        private LocalDateTime time;
    }

    private String localDateTimeToString(LocalDateTime localDateTime){
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        return dateTimeFormatter.format(localDateTime);
    }
}

消费者创建好后,项目启动即可看到消费的Mq消息,对比time里面的值确认为同一条消息:

 

最后有一个细节:发送消息时设置的ttl为10秒,但是消息过了8秒后就变成死信消息被消费掉了,这里就是上面说的:当设置过期消息时同时使用两种方式,过期时间按照小的一方计算。

以上就是利用死信消息和死信队列实现了RabbitMq的延时队列功能,实现了间隔指定时间后做指定的逻辑,既保证了消息及时性又能将功能代码进行解耦,开发过程中可以好好利用。

标签:Java,队列,死信,交换机,延时,import,public
来源: https://www.cnblogs.com/anyuan/p/14704713.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有