ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ使用 prefetch_count优化队列的消费,使用死信队列和延迟队列实现消息的定时重试,golang版本

2022-01-07 23:34:11  阅读:173  来源: 互联网

标签:count false err exchange 队列 rabbitmq 死信 消息


RabbitMQ 的优化

channel

生产者,消费者和 RabbitMQ 都会建立连接。为了避免建立过多的 TCP 连接,减少资源额消耗。

AMQP 协议引入了信道(channel),多个 channel 使用同一个 TCP 连接,起到对 TCP 连接的复用。

不过 channel 的连接数是有上限的,过多的连接会导致复用的 TCP 拥堵。

const (
	maxChannelMax = (2 << 15) - 1
	defaultChannelMax = (2 << 10) - 1
)

通过http://github.com/streadway/amqp这个client来连接 RabbitMQ,这里面定义了最大值65535和默认最大值2047。

prefetch Count

什么是prefetch Count,先举个栗子:

假定 RabbitMQ 队列有 N 个消费队列,RabbitMQ 队列中的消息将以轮询的方式发送给消费者。

消息的数量是 M,那么每个消费者得到的数据就是 M%N。如果某一台的机器中的消费者,因为自身的原因,或者消息本身处理所需要的时间很久,消费的很慢,但是其他消费者分配的消息很快就消费完了,然后处于闲置状态,这就造成资源的浪费,消息队列的吞吐量也降低了。

这时候prefetch Count就登场了,通过引入prefetch Count来避免消费能力有限的消息队列分配过多的消息,而消息处理能力较好的消费者没有消息处理的情况。

RabbitM 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的"滑动窗口"。

所以消息不会被处理速度很慢的消费者过多霸占,能够很好的分配到其它处理速度较好的消费者中。通俗的说就是消费者最多从 RabbitMQ 中获取的未消费消息的数量。

prefetch Count数量设置为多少合适呢?大概就是30吧,具体可以参见Finding bottlenecks with RabbitMQ 3.3

谈到了prefetch Count,我们还要看了 global 这个参数,RabbitMQ 为了提升相关的性能,在 AMQPO-9-1 协议之上重新定义了 global 这个参数

global 参数 AMQPO-9-1 RabbitMQ
false 信道上所有的消费者都需要遵从 prefetchC unt 的限 信道上新的消费者需要遵从 prefetchCount 的限定值定值
true 当前通信链路(Connection) 上所有的消费者都要遵从 prefetchCount 的限定值,就是同一Connection上的消费者共享 信道上所有的消费者都需要遵从 prefetchCunt 的上限,就是同一信道上的消费者共享

prefetchSize:预读取的单条消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限,单位为 B。

如果prefetch Count为 0 呢,表示预读取的消息数量没有上限。

举个错误使用的栗子:

之前一个队列的消费者消费速度过慢,prefetch Count为0,然后新写了一个消费者,prefetch Count设置为30,并且起了10个pod,来处理消息。老的消费者还没有下线也在处理消息。

但是发现消费速度还是很慢,有大量的消息处于 unacked 。如果明白prefetch Count的含义其实就已经可以猜到问题的原因了。

老的消费者prefetch Count为0,所以很多 unacked 消息都被它持有了,虽然新加了几个新的消费者,但是都处于空闲状态,最后停掉了prefetch Count为0的消费者,很快消费速度就正常了。

死信队列

什么是死信队列

一般消息满足下面几种情况就会消息变成死信

  • 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时 requeue 属性被设置为false;

  • 消息过期,消息在队列的存活时间超过设置的 TT L时间;

  • 队列达到最大长度,消息队列的消息数量已经超过最大队列长度。

当一个消息满足上面的几种条件变成死信(dead message)之后,会被重新推送到死信交换器(DLX ,全称为 Dead-Letter-Exchange)。绑定 DLX 的队列就是死信队列。

所以死信队列也并不是什么特殊的队列,只是绑定到了死信交换机中了,死信交换机也没有什么特殊,我们只是用这个来处理死信队列了,和别的交换机没有本质上的区别。

对于需要处理死信队列的业务,跟我们正常的业务处理一样,也是定义一个独有的路由key,并对应的配置一个死信队列进行监听,然后 key 绑定的死信交换机中。

使用场景

当消息的消费出现问题时,出问题的消息不被丢失,进行消息的暂存,方便后续的排查处理。

代码实现

死信队列的使用,可参看下文,配合延迟队列实现消息重试的机制。

延迟队列

什么是延迟队列

延迟队列就是用来存储进行延迟消费的消息。

什么是延迟消息?

就是不希望消费者马上消费的消息,等待指定的时间才进行消费的消息。

使用场景

1、关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭;

2、清理过期数据业务上。比如缓存中的对象,超过了空闲时间,需要从缓存中移出;

3、任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求;

4、下单之后如果三十分钟之内没有付款就自动取消订单;

5、订餐通知:下单成功后60s之后给用户发送短信通知;

6、当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存;

7、定期检查处于退款状态的订单是否已经退款成功;

8、新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信;

9、定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。

总结下来就是一些延迟处理的业务场景

实现延迟队列的方式

RabbitMQ 中本身并没有直接提供延迟队列的功能,可以通过死信队列和 TTL 。来实现延迟队的功能。

先来了解下过期时间 TTL,消息一旦超过设置的 TTL 值,就会变成死信。这里需要注意的是 TTL 的单位是毫秒。设置过期时间一般与两种方式

  • 1、通过队列属性设置,队列中的消息有相同的过期时间;

  • 2、通过消息本身单独设置,每条消息有自己的的过期时间。

如果两种一起设置,消息的 TTL 以两者之间较小的那个数值为准。

上面两种 TTL 过期时间,消息队列的处理是不同的。第一种,消息一旦过期就会从消息队列中删除,第二种,消息过期了不会马上进行删除操作,删除的操作,是在投递到消费者之前进行判断的。

第一种方式中相同过期时间的消息是在同一个队列中,所以过期的消息总是在头部,只要在头部进行扫描就好了。第二种方式,过期的时间不同,但是消息是在同一个消息队列中的,如果要清理掉所有过期的时间就需要遍历所有的消息,当然这也是不合理的,所以会在消息被消费的时候,进行过期的判断。这个处理思想和 redis 过期 key 的清理有点神似。

Queue TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-expires":                 3000,
		},
	); err != nil {
		return err
	}
Message TTL

对于 Message TTL 设置有两种方式

  • Per-Queue Message TTL

通过在 queue.declare 中设置 x-message-ttl 参数,可以控制在当前队列中,消息的过期时间。不过同一个消息被投到多个队列中,设置x-message-ttl的队列,里面消息的过期,不会对其他队列中相同的消息有影响。不同队列处理消息的过期是隔离的。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-message-ttl":             3000,
		},
	); err != nil {
		return err
	}
  • Per-Message TTL

通过 expiration 就可以设置每条消息的过期时间,需要注意的是 expiration 是字符串类型。

	delayQ := "delay.3s.test"
	if _, err := channel.QueueDeclare(delayQ,
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
		},
	); err != nil {
		return err
	}

	if err := channel.Publish("", delayQ, false, false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   "3000",
	}); err != nil {
		return err
	}

通过延迟队列来处理延迟消费的场景,可以借助于死信队列来处理

延迟队列通常的使用:消费者订阅死信队列 deadQueue,然后需要延迟处理的消息都发送到 delayNormal 中。然后 delayNormal 中的消息 TTL 过期时间到了,消息会被存储到死信队列 deadQueue。我们只需要正常消费,死信队列 deadQueue 中的数据就行了,这样就实现对数据延迟消费的逻辑了。

使用 Queue TTL 设置过期时间

举个线上处理消息重传的的栗子:

消费者处理队列中的消息,一个消息在处理的过程中,会出现错误,针对某些特性的错误,希望这些消息能够退回到队列中,过一段时间在进行消费。当然,如果不进行 Ack,或者 Ack 之后重推到队列中,消费者就能再次进行重试消费。但是这样会有一个问题,消费队列中消息消费很快,刚重推的消息马上就到了队列头部,消费者可能马上又拿到这个消息,然后一直处于重试的死循环,影响其他消息的消费。这时候延迟队列就登场了,我们可以借助于延迟队列,设置特定的延迟时间,让这些消息的重试,发生到之后某个时间点。并且重试一定次数之后,就可以选择丢弃这个消息了。

来看下流程图:

mq

具体的处理步骤:

1、生产者推送消息到 work-exchange 中,然后发送到 work-queue 队列;

2、消费者订阅 work-queue 队列,这是正常的业务消费;

3、对于需要进行延迟重试的消息,发送到延迟队列中;

4、延迟队列会绑定一个死信系列,死信队列的 exchange 和 routing-key,就是上面正常处理业务 work-queue 消息队里的 exchange 和 routing-key,这样过期的消息就能够重推到业务的队列中,每次重推到延迟队列的时候会记录消息重推的次数,如果达到我们设定的上限,就可以丢弃数据,落库或其他的操作了;

5、所以消费者只需要监听处理 work-queue 队列就可以了;

6、无用的延迟队列,到了删除的时间节点,会进行自动的删除。

上代码,文中 Demo 的地址

标签:count,false,err,exchange,队列,rabbitmq,死信,消息
来源: https://www.cnblogs.com/ricklz/p/15777193.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有