标签:127.0 6379 队列 Redis 0.1 mq 消息
一、消息队列
消息队列的基本需求
- 消息保序:消息的产生与消费有先后顺序,消息队列需保证消费的有序性。
- 重复消息处理:生产者可能会发送重复消息,需保证消费的幂等性。
- 消息可靠性保证:消息不能丢失,消费失败后,需保证有机制可以重新消费。
消息队列可靠性的保证
- 生产者丢数据
- 消费者丢数据
- 消息队列自身丢数据
二、基于list的消息队列
Redis 的sub/pub机制,可以实现部分的消息队列机制,但遇到网络连接中断,数据库宕机等情况,数据易丢失无法重复消费。基于list的消费队列可以简单的实现消费队列的部分功能。
实现指令
- LPUSH:队列插入一条消息;
- RPOP:队列消费一条消息;
- BRPOP:阻塞式的消费一条消息;
- BRPOPLPUSH:阻塞式的消费一条消息的同时将此消息插入另一个消费队列(用于消费失败时的可重复消费);
使用示例
# 消费队列mq中插入一条消息序列为00001,消息值为msg:5的数据
127.0.0.1:6379> lpush mq 00001:msg:5
(integer) 1
# 消费一条消息
127.0.0.1:6379> rpop mq
"00001:msg:5"
127.0.0.1:6379> lpush mq 00002:msg:7 00003:msg:4
(integer) 2
# 阻塞式(3s 超时)消费一条消息
127.0.0.1:6379> brpop mq 3
1) "mq"
2) "00002:msg:7"
(2.51s)
127.0.0.1:6379> brpop mq 3
1) "mq"
2) "00003:msg:4"
127.0.0.1:6379> lpush mq 00004:msg:9
(integer) 1
# 阻塞式(3s 超时)消费一条消息,并将消费的数据放入mq2队列,预防消费失败时数据丢失。
127.0.0.1:6379> brpoplpush mq mq2 3
"00003:msg:9"
# 消费成功后再删除掉mq2中的消息
127.0.0.1:6379> rpop mq2
"00003:msg:9"
存在问题
- 业务需自行保证序列ID唯一,避免重复消费。
- BRPOPLPUSH为了防止消费失败后可重新消费,会多产生的一条消费队列,占用内存。
- 并不支持消费组概念,当消费速度较慢时,会导致消息队列数据积压。
- 消息队列自身基于redis,本身存在数据丢失风险。
三、基于streams的消息队列
Redis5.0之后新支持了streams数据结构用于支持完备功能的消息队列,新增消费组消费,ACK确认等完备机制。
实现指令
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING:用来查询每个消费组内所有消费者已读取但尚未确认的消息;
- XACK:用于向消息队列确认消息处理已完成;
使用示例
- xadd key ID field string [field string …]
- ID :可自己指定唯一序列号,一般用* 指定系统自动生成,生成ID序列号:[毫秒时间戳-0开头序列],如"1633939083516-0"
127.0.0.1:6379> xadd mq * money -5
"1633939083516-0"
127.0.0.1:6379> xadd mq * money -5 money +10
"1633953837863-0"
- xrange key start end [COUNT count]
127.0.0.1:6379> xrange mq - +
1) 1) "1633939083516-0"
2) 1) "money"
2) "-5"
2) 1) "1633953837863-0"
2) 1) "money"
2) "-5"
3) "money"
4) "+10"
- xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
- count :读取数目
- block :阻塞毫秒数,不设置则代表非阻塞式读取
- key :消息队列名称
- id :消息序列号,0-0 代表最小开始,$ 代表从现在开始。
127.0.0.1:6379> xread count 2 block 5000 streams mq 0-0
1) 1) "mq"
2) 1) 1) "1633939083516-0"
2) 1) "money"
2) "-5"
2) 1) "1633953837863-0"
2) 1) "money"
2) "-5"
3) "money"
4) "+10"
- xgroup [CREATE key groupname id-or-$ ] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
# 创建消费mq队列的group1组,从ID 0-0开始消费。
127.0.0.1:6379> xgroup create mq group1 0-0
OK
- xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
# group1组的consumer1开始从0-0消费mq队列
127.0.0.1:6379> xreadgroup GROUP group1 consumer1 STREAMS mq 0-0
1) 1) "mq"
2) 1) 1) "1633939083516-0"
2) 1) "money"
2) "-5"
2) 1) "1633953837863-0"
2) 1) "money"
2) "-5"
3) "money"
4) "+10"
- xpending key group [start end count] [consumer]
# 查询当前读取但为确认消费的消息
127.0.0.1:6379> xpending mq group1 - + 2 consumer1
1) 1) "1633939083516-0"
2) "consumer1"
3) (integer) 672137
4) (integer) 1
2) 1) "1633953837863-0"
2) "consumer1"
3) (integer) 672137
4) (integer) 1
- xack key group ID [ID …]
# 确认1633939083516-0消息
127.0.0.1:6379> xack mq group1 1633939083516-0
(integer) 1
127.0.0.1:6379> xpending mq group1 - + 2 consumer1
1) 1) "1633953837863-0"
2) "consumer1"
3) (integer) 840477
4) (integer) 1
存在问题
- Redis5.0之后版本开始支持。
- 消息队列自身基于redis,依然存在数据丢失风险。
- 轻量级消息队列,适用于丢失数据不敏感业务(发短信,发通知)。
标签:127.0,6379,队列,Redis,0.1,mq,消息 来源: https://blog.csdn.net/YL_max/article/details/120722045
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。