ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ的高级特性--TTL、死信队列、延迟队列

2022-02-04 10:02:42  阅读:195  来源: 互联网

标签:exchange 队列 -- 死信 消息 TTL new


目录

1.TTL机制

       1.1 实现方案

       1.2 原生API实现

       1.3 SpringBoot实现

2.死信队列

        2.1 原生API实现

        2.2 SpringBoot实现

3.延迟队列

        3.1 延时队列的使用


1.TTL机制

       1.1 实现方案

         目前的电商业务中订单创建成功,等待支付一般都会给一定的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。

         如何实现这个功能?

         定时轮询(数据库等)

                用户下单成功,将订单数据放入数据库,同时将支付状态放入数据库,用户付款更
改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。

                优点:设计实现简单。

                缺点: 需要对数据库进行大量的IO操作,效率低下。

         Timer

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
    @Override
    public void run() {
        System.out.println("用户没有付款,交易取消:" +
        simpleDateFormat.format(new
        Date(System.currentTimeMillis())));
        timer.cancel();
    }
};
System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后执行timerTask
timer.schedule(timerTask, 10 * 1000);

        

                优点: 没有想到

                缺点: 没有持久化机制。

                            不灵活 (只可以设置开始时间和重复间隔, 对于其它业务不太合适)。

                            不能利用线程池,一个timer一个线程。

                            没有真正的管理计划。

         Scheduler及其它定时器

// 线程工厂
ThreadFactory factory = Executors.defaultThreadFactory();
// 使用线程池
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("用户未付款,交易取消:" +
        format.format(new Date()));
    }// 等待10s 单位秒
}, 10, TimeUnit.SECONDS);

                优点:可多线程执行,一定程度上避免任务互相影响,单个任务异常不影响其它任务

                缺点:高并发下不建议使用定时任务,较浪费服务器性能

         RabbitMQ 的 TTL

                通过对 消息 和 队列 两个维度来设置 TTL

                优点:无需消费过多的服务器性能即可实现定时的功能

                缺点:任何中间件的容量和堆积能力都是有限的,且消息中间件的引入会增加许多问题

        

        由于消息中间件的容量和堆积能力都是有限的,如果有些消息总是无法消费掉,就需要有一种东西进行兜底。

        目前有两种方法设置消息的TTL,两种方法同时使用,则消息的TTL已两者之间 较小数值为准。

                1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。

                2. 对消息自身进行单独设置,每条消息的TTL可以不同。

       1.2 原生API实现

try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
    // 设置队列属性
    Map<String, Object> arguments = new HashMap<>();
    // 设置队列的TTL
    arguments.put("x-message-ttl", 30000);
    // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
    arguments.put("x-expires", 10000);

    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

    channel.basicPublish("", 
                         QUEUE_NAME, 
                         //设置消息本身的过期时间
                         new AMQP.BasicProperties().builder().expiration("30000").build(),
                         message.getBytes())

} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

 此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

        1.如果不设置 x-message-ttl, 则表示消息不会过期。

        2.如果 x-message-ttl 设置为0, 则表示除非消息可以直接将消息投递到消费者,否则消息会被立即丢弃 。

       1.3 SpringBoot实现

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
        // 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
        props.put("x-expires", 1000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }
}
@RestController
public class controller{
    
   @RequestMapping("/pay/msgttl")
   public String sendTTLMessage() throws UnsupportedEncodingException {
       MessageProperties properties = new MessageProperties();
       //设置消息本身过期时间
       properties.setExpiration("5000");
       Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
       rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
       return "msg-ttl-ok";
    }     
}

2.死信队列

       DLX,全称为 Dead-Letter-Excahnge, 死信交换机。

        在各个外卖系统中,用户下单调用订单服务, 然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统 采用 MQ异步通讯。

        用户在下单之后,外卖员接单之后再取消接单应该如何处理,用户下单长时间未被接单如何处理?

        这种场景下我们可以考虑定义一个死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因,从而j进行下一步的处理,是取消订单还是安排其他的外卖员。

        消息在到达队列之后,被重新发送到一个特殊的交换机(DLX)中, 同时,绑定DLX的队列就称为“死信队列”。

        一下几种情况导致消息变成死信:

                1. 消息被拒绝 (Basic.Reject / Basic.Nack), 并且设置 requeue 参数为 false。

                2. 消息过期。

                3. 队列达到最大长度。

        同时我们也可以在处理异常的时候,如果消息不能被消费者正常消费,可以放置到死信队列,后续进行分析程序中的异常情况,进而改善和优化系统。

        2.1 原生API实现

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // 定义一个死信交换器(也是一个普通的交换器)
    channel.exchangeDeclare("exchange.dlx", "direct", true);

    // 定义一个正常业务的交换器
    channel.exchangeDeclare("exchange.biz", "fanout", true);

    Map<String, Object> arguments = new HashMap<>();
    // 设置队列TTL
    arguments.put("x-message-ttl", 10000);
    // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
    arguments.put("x-dead-letter-exchange", "exchange.dlx");
    // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的 routingKey
    arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");

    //定义交换机队列及绑定关系
    channel.queueDeclare("queue.biz", true, false, false, arguments);
    channel.queueBind("queue.biz", "exchange.biz", "");
    channel.queueDeclare("queue.dlx", true, false, false, null);
    // 死信队列和死信交换器
    channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");

    channel.basicPublish("exchange.biz", 
                         "",
                         MessageProperties.PERSISTENT_TEXT_PLAIN,
                         "dlx.test".getBytes());
} catch (Exception e) {
    e.printStackTrace();
}

        2.2 SpringBoot实现

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        Map<String, Object> props = new HashMap<>();
        // 消息在队列中的生存时间 10s
        props.put("x-message-ttl", 10000);
        // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
        props.put("x-dead-letter-exchange", "ex.go.dlx");
        // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
        props.put("x-dead-letter-routing-key", "go.dlx");
        Queue queue = new Queue("q.go", true, false, false, props);
        return queue;
    }
}

一般来说,死信队列都会配合RabbitMQ的TTL来使用,在消息超时之后会被自动路由到死信队列中。 

3.延迟队列

        在一定的业务场景中,我们发送到消息队列中的消息不一定想立即被消费,就比如:

        在12306中购买火车票,选中一个座位中订单未支付的时间段中系统会将这个座位锁定,如果超过时间还没有付款的话系统会自动把座位释放掉,怎么实现类似的功能呢?

        1.可以用定时任务每分钟扫描一次,发现有超过15分钟的就释放掉,但这样非常浪费系统资源。

        2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟之后锁会自动释放,但是这样会长期占用系统的资源。

        3. 可以通过TTL配合死信队列来实现。但是TTL扫描是从队列的头依此往后扫描,加入第一个消息没有超时,后续的消息是不会被扫描的,如果队列中的消息过期时间不是固定的,就会导致后面消息过期了,但是仍然没有被处理。

        因此可以使用延迟队列,锁座成功之后会发送一条延迟消息到延时交换机,延时交换机轮询所有的消息,消息到指定的时间后会被消费,消费的过程就是检查这个座位是否是“已付款”状态;

        RabbitMQ本身并没有提供延时队列的功能,可以用 rabbitmq_delayed_message_exchange 插件来实现,它与TTL最大的不同就是 TTL 存放消息在死信队列中,而它则是存放消息在延时交换机中。

        

1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
2. 延时交换机(exchange)存储消息持续扫描所有的消息,等待消息到期根据路由键(routekey)找到      绑定自己的队列(queue)并把消息给它
3. 队列(queue)再把消息发送给监听它的消费者(customer)

        3.1 延时队列的使用

                下载插件

                下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

                安装插件

、            将插件拷贝到rabbitmq-server的安装路径

# 启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#重启rabbitmq-server
systemctl restart rabbitmq-server
@Configuration
public class RabbitConfig {
    @Bean
    public Exchange exchange() {
        Map<String, Object> props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.FANOUT);
        Exchange exchange = new CustomExchange("ex.delayed", 
                                               "xdelayed-message", 
                                               true,
                                               false, 
                                               props);
        return exchange;
}
public class PublishController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/prepare/{seconds}")
    public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException {
    // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
    // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
    // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
    // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
    MessageProperties properties = new MessageProperties();
    properties.setHeader("x-delay", (seconds - 10) * 1000);
    Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"),properties);
    rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message);
    return "已经定好闹钟了,到时提前告诉大家";
}
}

标签:exchange,队列,--,死信,消息,TTL,new
来源: https://blog.csdn.net/qq_42029989/article/details/122778975

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有