ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

定时任务实现(RabbitMQ 延迟队列)

2022-06-08 15:32:51  阅读:286  来源: 互联网

标签:routing key exchange 队列 dead RabbitMQ letter 延迟


前言

其实rabbit 没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live(TTL)消息超时机制;2、Dead Letter Exchanges(DLX)死信队列。

 

先理解一个概念:

rabbit 中一个消息是有死亡状态的,它会被发送到一个指定的队列中,这个队列是一个普通的队列,根据他的功能,我们叫他死信队列。

当发生下面的情况时,消息会被发送到死信队列:

  1. 消息被消费者接收,并且标记了reject或者nack,拒绝或者未消费成功。
  2. 队列设定了消息存活时间,超过存活时间未被消费,会自动发送到死信队列。
  3. 队列满了,再被分发到队列的消息,会被发送到死信队列。

 

延迟队列原理

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ消息的过期时间有两种方法设置。

  • 通过队列(Queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)
  • 对消息单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为死信(dead letter)

 

死信队列

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

  • x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送

队列中出现死信(dead letter)的情况有:

  • 消息或者队列的TTL过期。(延迟队列利用的特性)
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上面两个特性,将队列设置TTL规则,队列TTL过期后消息会变成死信,然后利用DLX特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。

 

如图:

 

 

理解了概念就知道是使用rabbit 的死信队列 做定时任务了。具体实现如下:

 

生产者

import pika
import json
import time

credentials = pika.PlainCredentials('admin', 'admin')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))
channel = connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
queue_name = "delay_queue_a"
exchange = 'delay_exchange_a'
routing_key = 'delay_routing_key_a'
dead_letter_exchange = 'dead_exchange_a'  # 'amq.direct'#'dead_exchange_a'
dead_letter_routing_key = 'dead_letter_routing_key_a'  # 'dead_queue_a'#'dead_letter_routing_key_a'
arguments = {
    "x-message-ttl": 5000,
    'x-dead-letter-exchange': dead_letter_exchange,
    'x-dead-letter-routing-key': dead_letter_routing_key
}

channel.confirm_delivery()

channel.exchange_declare(exchange=exchange, durable=True, exchange_type='direct')
result = channel.queue_declare(queue=queue_name, durable=False, arguments=arguments)
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
for i in range(10):
    message = json.dumps({'OrderId': i})
    # 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
                          properties=pika.BasicProperties(delivery_mode=2))
    print(message)
    time.sleep(1.5)

connection.close()

 

消费者:

import pika
import json

credentials = pika.PlainCredentials('admin', 'admin')

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))

channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
queue_name = "dead_queue_a"
# dead_letter_exchange = 'amq.direct'#'dead_exchange_a'
dead_letter_exchange = 'dead_exchange_a'
dead_letter_routing_key = 'dead_letter_routing_key_a'
# queue_name = dead_letter_routing_key
channel.exchange_declare(exchange=dead_letter_exchange, durable=False, exchange_type='direct')
result = channel.queue_declare(queue=queue_name, durable=False)

channel.queue_bind(exchange=dead_letter_exchange, queue=queue_name, routing_key=dead_letter_routing_key)


# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    data = json.loads(body.decode())
    print(data)
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 告诉rabbitmq,用callback来接收消息
channel.basic_consume(queue_name, callback,  
                      auto_ack=False)
print('开始监听')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
    connection.close()
    print('close')

看了上面还是模糊: 点击前往原著

 

标签:routing,key,exchange,队列,dead,RabbitMQ,letter,延迟
来源: https://www.cnblogs.com/TF511/p/16355811.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有