ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ实现延时队列

2021-10-27 23:06:36  阅读:148  来源: 互联网

标签:return exchange 队列 RabbitMQ order 死信 延时 public


什么是延时队列

指消息进入队列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果

使用场景

电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等等.

延时队列实现

模拟电商平台下单后,30分钟后未支付,取消订单回滚库存
在这里插入图片描述

创建配置类

@Configuration
public class DelayConfig {

    /**
     * 创建一个正常的队列
     *
     * @return
     */
    @Bean
    public Queue createNormalQueue() {
        return QueueBuilder.durable("order_queue").build();
    }

    /**
     * 创建一个死信队列
     *
     * @return
     */
    @Bean
    public Queue createDeadQueue() {
        return QueueBuilder.durable("order_dead_queue")
                .withArgument("x-dead-letter-exchange", "order_dead_exchange") //设置死信交换机
                .withArgument("x-dead-letter-routing-key", "order_dead")//设置死信路由key
                .withArgument("x-message-ttl", 30000)// 队列中消息30秒过期
                .build();
    }

    /**
     * 创建一个正常的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange createNormalExchange() {
        return new DirectExchange("order_exchange");
    }

    /**
     * 创建死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange createDeadExchange() {
        return new DirectExchange("order_dead_exchange");
    }

    /**
     * 创建绑定:将正常队列绑定到死信交换机上面
     *
     * @return
     */
    @Bean
    public Binding createDeadBinding(@Qualifier(value = "createNormalQueue") Queue queue,
                                     @Qualifier(value = "createDeadExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order_dead");
    }

    /**
     * 创建绑定:将死信队列绑定到正常的交换机上面
     *
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "createDeadQueue")Queue queue,
                           @Qualifier(value = "createNormalExchange")DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order");
    }
}

创建监听类

@Component
public class DelayListener {


    @RabbitListener(queues = "order_queue")
    public void listener(Message message, Channel channel, String msg) throws IOException {
        // 模拟业务代码执行
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "收到消息:" + msg);
        System.out.println("检查订单是否付款操作开始::没有支付就取消订单,回滚库存");
        // 签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

创建controller用于测试

@RestController
public class DelayController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/send")
    public void send(){
        // 模拟业务代码执行
        String orderId = UUID.randomUUID().toString().replace("-","");
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date())+"创建订单:"+orderId);
        // 通过正常的交换机和routingKey把orderId发送到死信队列
        rabbitTemplate.convertAndSend("order_exchange","order",orderId);
    }
}

注意

  • 为了方便测试,我在配置类中的死信队列消息过期时间设置的是30秒,再真实的场景中根据自己的需求来就好了.
  • 发送消息要发送给order_dead_queue(死信队列),监听要监听order_queue(正常队列)

测试

http://localhost:18081/send:再发出信息后,延迟了30秒后,消费到了信息
在这里插入图片描述

标签:return,exchange,队列,RabbitMQ,order,死信,延时,public
来源: https://blog.csdn.net/qq_43135259/article/details/120991966

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有