ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ

2021-08-26 03:00:21  阅读:194  来源: 互联网

标签:pika exchange 队列 RabbitMQ queue 消息 channel


RabbitMQ

安装

docker

# 下载镜像
docker pull rabbitmq
# 运行镜像
docker run -d --name my-rabbitmq -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 25672:25672 rabbitmq

windows

  1. 安装Erlang

    1. http://www.erlang.org/downloads 下载otp_winxx_{版本}.exe

    2. 安装并配置环境变量

      1. %ERLANG_HOME%\bin

  2. 安装RabbitMq

    1. https://www.rabbitmq.com/install-windows.html官网下载rabbitmq-server-{版本}.exe

关键词

  1. Producer:生产者

  2. Consumer:消费者

  3. Channel: 信道是生产者,消费者和 RabbitMQ 通信的渠道,是建立在 TCP 连接上的虚拟连接。一个 TCP 连接上可以建立成百上千个信道,通过这种方式,可以减少系统开销,提高性能。

  4. Broker: 接收客户端连接,实现 AMQP 协议的消息队列和路由功能的进程。

  5. Virtual Host: 虚拟主机的概念,类似权限控制组,一个 Virtual Host 里可以有多个 Exchange 和 Queue,权限控制的最小粒度是 Virtual Host。

  6. Exchange: 交换机,接收生产者发送的消息,并根据 Routing Key 将消息路由到服务器中的队列 Queue。

    1. ExchangeType: 交换机类型决定了路由消息的行为

      1. direct(路由模式)

      2. fanout(发布/订阅模式)

      3. topic(匹配模式)

  7. Message Queue: 消息队列,用于存储还未被消费者消费的消息,由 Headerbody 组成。

    1. Header:由生产者添加的各种属性的集合,包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等

    2. body:真正需要发送的数据内容。

  8. BindingKey: 绑定关键字,将一个特定的 Exchange 和一个特定的 Queue 绑定起来。

注意:生产者、消费者、消息代理可能不在同一主机上

Hello World

  1. pip install pika

  2. 生产者发送消息至消息队列,消费者从消息队列中接收消息

  1. 生产者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的消息队列

    3. 发布消息给Broker

    4. 关闭连接

       1 # producer.py
       2 import pika
       3 
       4 # 建立连接
       5 hostname = 'xxx.xxx.xxx.xxx'
       6 credentials = pika.PlainCredentials('username','password')
       7 parameters = pika.ConnectionParameters(hostname,credentials=credentials)
       8 connection = pika.BlockingConnection(parameters)
       9 channel = connection.channel()  # 声明接口
      10 
      11 # 创建队列
      12 channel.queue_declare(queue="hello")
      13 
      14 # 消息内容
      15 content = 'Hello World'
      16 
      17 # 发布消息
      18 channel.basic_publish(exchange='', routing_key='hello', body=content)
      19 print("[x] Sent Hello World")
      20 
      21 connection.close()
  1. 消费者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的消息队列

    3. 定义接收到消息之后触发的方法

    4. 从Broker中获取消息并消费(接收消息)

       1 # consumer.py
       2 import pika
       3 ​
       4 hostname = 'xxx.xxx.xxx.xxx'
       5 credentials = pika.PlainCredentials('username','password')
       6 parameters = pika.ConnectionParameters(hostname,credentials=credentials)
       7 connection = pika.BlockingConnection(parameters)
       8 channel = connection.channel()
       9 ​
      10 # 声明使用的队列
      11 channel.queue_declare(queue="hello")
      12 ​
      13 ​
      14 # 接受到消息后触发的方法
      15 def callback(ch, method, properties, body):
      16     print("[x] Received {}".format(body))
      17 ​
      18 ​
      19 # 消费消息
      20 channel.basic_consume('hello', callback, auto_ack=True)
      21 print('[*] Waiting for messages. To exit press Ctrl+C')
      22 channel.start_consuming()
    5. 运行结果

      1. 先运行consumer.py再运行producer.py

        python consumer.py
        # => [*] Waiting for messages. To exit press CTRL+C
        # => [x] Received 'Hello World!'
        python producer.py
        # => [x] Sent 'Hello World!'

工作队列

  1. 主要思想:生产者将耗时的任务分发给多个消费者(一个生产者,一个消息队列,多个消费者)

    1. 处理资源密集型任务,并且还要等它完成

    2. 将工作封装为一个消息队列,工作在多个消费者(进程)共享

  2. 两种消息分发机制

    1. 轮询分发:将消息轮流发送给每个消费者,要等到一个消费者处理完,才把消息发送给下一个消费者(效率低)

    2. 公平分发:只要有消费者处理完,就会把消息发送给目前空闲的消费者(效率高)

      # 告诉RabbitMQ一次向消费者只发送1个消息,直到消费者发送消息确认后再发送下一个,消息确认前将下一个消息发送给空闲的消费者
      channel.basic_qos(prefetch_count=1)
  3. 消息确认

    1. 确保消费者死亡,任务也不会丢失,将任务交付给下一个消费者

    2. 在回调函数中添加消息确认,告诉生产者已经接收到消息

      def callback(ch, method, properties, body):
          print("[x] Received %r" % body)
          time.sleep(body.count(b'.'))
          print("[x] Done"
      ​
          # 告诉生产者,消费者已收到消息,一般在处理结尾发送确认 
          ch.basic_ack(delivery_tag=method.delivery_tag)
  4. 消息持久化

    1. 确保即使RabbitMQ服务器崩溃,消息也不会丢失

    2. 确保队列是永久的

      channel.queue_declare(queue='task_queue', durable=True)
    3. 确保消息是永久的

      channel.basic_publish(exchange='',
                            routing_key='task_queue',
                            body=message,
                            properties=pika.BasicProperties(
                                delivery_mode=2,  # 确保消息是持久的
                            ))
  5. 生产者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的持久化消息队列

    3. 发布持久化消息给Broker

    4. 关闭连接

       1 import pika
       2 import sys
       3 ​
       4 hostname = '127.0.0.1'
       5 parameters = pika.ConnectionParameters(hostname)
       6 connection = pika.BlockingConnection(parameters)
       7 channel = connection.channel()
       8 ​
       9 # durable = True 代表消息队列持久化存储,False 非持久化存储
      10 channel.queue_declare(queue='work_queue',durable=True)
      11 ​
      12 message = ''.join(sys.argv[1:]) or 'Hello World'
      13 ​
      14 # delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
      15 channel.basic_publish(exchange='',
      16                       routing_key='work_queue',
      17                       body=message,
      18                       properties=pika.BasicProperties(delivery_mode=2,))
      19 ​
      20 print("[x] Sent %r" % message)
      21 connection.close()
  6. 消费者代码

    1. 与RabbitMQ建立连接

    2. 声明需要使用的持久化消息队列

    3. 定义接收到消息之后触发的方法

      1. 方法结尾发送确认信息

    4. 设置消息公平分发(接收一条,处理一条,处理结束之后再接收)

    5. 从Broker中获取消息并消费(接收消息)

       1 import pika
       2 import time
       3 ​
       4 hostname = '127.0.0.1'
       5 parameters = pika.ConnectionParameters(hostname)
       6 connection = pika.BlockingConnection(parameters)
       7 channel = connection.channel()
       8 ​
       9 # durable = True 代表消息队列持久化存储,False 非持久化存储
      10 channel.queue_declare(queue='work_queue', durable=True)
      11 ​
      12 ​
      13 def callback(ch, method, properties, body):
      14     print("[x] Received %r" % body)
      15     time.sleep(body.count(b'.'))
      16     print("[x] Done")
      17      # 告诉生产者,消费者已收到消息,一般在处理结尾发送确认
      18     ch.basic_ack(delivery_tag=method.delivery_tag)
      19 ​
      20 # 直到消费者发送消息确认后再发送下一个,消息确认前将下一个消息发送给空闲的消费者
      21 channel.basic_consume('work_queue', callback)   # auto_ack=False,自动发送确认消息默认是False
      22 ​
      23 # 告诉RabbitMQ一次向消费者只发送1个消息
      24 channel.basic_qos(prefetch_count=1)
      25 ​
      26 print("[*] Waiting for messages. To exist press Ctrl+C")
      27 channel.start_consuming()

交换机

  1. 通过声明交换机的交换类型,确定消息队列的工作模式

    1. 交换类型有三种:

      1. direct:路由模式

      2. topic:匹配模式

      3. fanout:发布订阅模式

      # 声明交换机名称、类型
      channel.exchange_declare(exchange='exchange_name',
                               exchange_type='fanout') # direct or topic or fanout

发布/订阅模式

  1. 主要思想:消息经过交换机之后,交换机将收到的所有消息广播所有队列(一个生产者,一个交换机,多个消息队列)

  2. 临时队列(因为是广播,无所谓什么队列,所以可以使用临时队列)

    # 使用result记录创建的临时队列
    result = channel.queue_declare(exclusive=True) # exclusive=True,一旦消费者关闭连接,删除临时队列
  3. 绑定(交换机需要和队列绑定)

    channel.queue_bind(exchange='exchange_name',
                      queue=result.method.queue)

    (一个生产者,一个交换机,多个消息队列,一个消息队列对应一个消费者?)

  4. 思考:一个消息队列能对应多个消费者吗?

  5. 生产者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的交换机,类型为fanout

    3. 发布消息给交换机

    4. 关闭连接

       1 # producer.py
       2 import pika
       3 import sys
       4 ​
       5 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
       6 channel = connection.channel()
       7 ​
       8 # 声明交换机是广播类型的
       9 channel.exchange_declare(exchange='logs', exchange_type='fanout')
      10 ​
      11 message = ' '.join(sys.argv[1:]) or 'info: Hello World'
      12 ​
      13 channel.basic_publish(exchange='logs', routing_key='', body=message)
      14 ​
      15 print('[x] Sent %r ' % message)
      16 connection.close()
  6. 消费者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的交换机,类型为fanout

    3. 声明创建临时队列

    4. 获取临时队列名

    5. 绑定临时队列

    6. 定义接收到消息之后触发的方法

    7. 从Broker中获取消息并消费

       1 # consumer.py
       2 import pika
       3 import sys
       4 ​
       5 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
       6 channel = connection.channel()
       7 ​
       8 # 声明交换机
       9 channel.exchange_declare(exchange='logs', exchange_type='fanout')
      10 # 声明只允许当前连接的队列
      11 result = channel.queue_declare('',exclusive=True)
      12 # 获取临时队列名
      13 queue_name = result.method.queue
      14 channel.queue_bind(exchange='logs', queue=queue_name)
      15 print('[*] Waiting for logs. To exit press CTRL+C')
      16 ​
      17 ​
      18 def callback(ch, method, properties, body):
      19     print('[x] %r ' % body)
      20 ​
      21     
      22 channel.basic_consume('', callback)
      23 channel.start_consuming()

路由模式

  1. 主要思想:为了让生产者发布的消息定向的精确发送到指定的队列(严格过滤,完全匹配)

  2. 在绑定的时候使用routing_key参数,即绑定键

    channel.queue_bind(exchange=exchange_name,
                      queue=queue_name,
                      routing_key='black')
  3. 也可以使用多个绑定实现类广播效果(当然之后的匹配模式就是如此)

    例如日志消息

  4. 生产者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的交换机,类型为direct

    3. 通过命令行第2个参数确定交换机的routing_key

    4. 发布消息给交换机

    5. 关闭连接

       1 import sys
       2 import pika
       3 ​
       4 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
       5 channel = connection.channel()
       6 ​
       7 channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
       8 ​
       9 print(sys.argv[1:])
      10 # 根据命令行输入的第2个参数确定routing_key(error,warning,info)
      11 # 将不同类型的日志消息保存到不同的队列
      12 severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
      13 message = ' '.join(sys.argv[2:]) or 'Hello World'
      14 ​
      15 channel.basic_publish(exchange='logs_direct', routing_key=severity, body=message)
      16 ​
      17 print("[x] Sent %r:%r " % (severity, message))
      18 ​
      19 connection.close()
  5. 消费者代码

    1. 与RabbitMQ建立连接

    2. 声明使用的交换机,类型为direct

    3. 声明创建临时队列

    4. 获取临时队列名

    5. 绑定临时队列,通过命令行的参数确定交换机的routing_key

    6. 定义接收到消息之后触发的方法

    7. 从Broker中获取消息并消费

      import pika
      import sys
      ​
      connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
      channel = connection.channel()
      ​
      # 设置路由模式
      channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
      ​
      # 声明临时队列
      result = channel.queue_declare('', exclusive=True)
      queue_name = result.method.queue
      ​
      # 设置队列和交换机绑定的键,通过参数确定接收什么类型的日志消息(即从指定的队列中获取消息)
      severties = sys.argv[1:]
      for severity in severties:
          channel.queue_bind(queue=queue_name, exchange='logs_direct', routing_key=severity)
      ​
      print("[*] Waiting for logs. To exit press CTRL+C")
      ​
      ​
      def callback(ch, method, properties, body):
          print("[x] %r:%r" % (method.routing_key, body))
      ​
      ​
      channel.basic_consume(queue_name, callback)
      ​
      channel.start_consuming()
    8. 运行

      # 如果只想保存'warning'和'error'(而不是'info')将消息记录到文件中,只需打开一个控制台并输入:
      python receive_logs_direct.py warning error > logs_from_rabbit.log
      ​
      # 如果您希望在屏幕上看到所有日志消息,请打开一个新终端并执行以下操作:
      python receive_logs_direct.py info warning error
      ​
      # 例如,要输出error日志消息,只需输入:
      python emit_log_direct.py error "Run. Run. Or it will explode."

匹配模式

  1. 主要思想:路由模式是精确严格的筛选,而匹配模式根据匹配的条件过滤(模糊匹配)

  2. 规范

    1. 必须是由”.“连接单词列表

    2. 最多255个字节

    3. ”*“ 可以代替一个单词

    4. ”#“可以代替0个或者多个单词

  3. 交换机类型为topic

  4. 生产者代码

     1 # emit_log_topic.py
     2 import pika
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     5 channel = connection.channel()
     6 
     7 # 交换机类型为topic
     8 channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
     9 
    10 print(sys.argv[1:])
    11 severity = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World'
    13 
    14 channel.basic_publish(exchange='logs_topic', routing_key=severity, body=message)
    15 
    16 print("[x] Sent %r:%r " % (severity, message))
    17 
    18 connection.close()
  5. 消费者代码

     1 # receive_logs_topic.py
     2 import pika
     3 import sys
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     6 channel = connection.channel()
     7 
     8 # 交换机类型为topic
     9 channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    10 
    11 result = channel.queue_declare('', exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 # 设置队列和交换机绑定的键
    15 severties = sys.argv[1:]
    16 for severity in severties:
    17     channel.queue_bind(queue=queue_name, exchange='logs_topic', routing_key=severity)
    18 
    19 print("[*] Waiting for logs. To exit press CTRL+C")
    20 
    21 
    22 def callback(ch, method, properties, body):
    23     print("[x] %r:%r" % (method.routing_key, body))
    24 
    25 
    26 channel.basic_consume(queue_name, callback)
    27 
    28 channel.start_consuming()

     

  6. 运行

    #要接收所有日志运行:
    python receive_logs_topic.py "#"
    ​
    #要从设施“ kern ” 接收所有日志:
    python receive_logs_topic.py "kern.*"
    ​
    #或者,如果您只想听到关于“ critical ”日志的信息:
    python receive_logs_topic.py "*.critical"
    ​
    #您可以创建多个绑定:
    python receive_logs_topic.py "kern." ".critical"
    ​
    #发布带有路由键“ kern.critical ”类型的日志:
    python emit_log_topic.py "kern.critical" "A critical kernel error"

远程过程调用(RPC)

  1. 主要思想:客户端与服务器之间是完全解耦的,即两端既是消息的发送者也是接受者。

  2. 服务器代码(消费者)

    1. 建立连接

    2. 声明队列

    3. 定义接收请求的回调函数(函数处理之后,向Client发送响应)

      1. properties.reply_to:客户端发来的绑定键

      2. properties.correlation_id:客户端发来的相关ID

    4. 设置公平分发

    5. 接收消息,触发请求

       1 # server.py
       2 import pika
       3 ​
       4 hostname = '192.168.253.129'
       5 credentials = pika.PlainCredentials('admin', 'admin')
       6 parameters = pika.ConnectionParameters(hostname, credentials=credentials)
       7 connecion = pika.BlockingConnection(parameters=parameters)
       8 channel = connecion.channel()
       9 ​
      10 channel.queue_declare(queue='rpc_queue')
      11 ​
      12 ​
      13 def fib(n):
      14     """
      15     返回第n个斐波那契数
      16     :param n:返回数量
      17     :return: 返回第n个斐波那契数
      18     """
      19     if n == 0:
      20         return 0
      21     elif n == 1:
      22         return 1
      23     else:
      24         return fib(n - 1) + fib(n - 2)
      25 ​
      26 ​
      27 def on_request(ch, method, properties, body):
      28     """
      29     接收client发来的请求,处理之后,再向client发送响应信息
      30     :param ch:
      31     :param method:
      32     :param properties:client请求的属性
      33     :param body:响应信息的内容
      34     :return:
      35     """
      36 ​
      37     # 收到请求后的处理过程
      38     n = int(body)
      39     print(f'[.] fib({n})')
      40     response = fib(n)
      41 ​
      42     # 将处理结果发送回client
      43     channel.basic_publish(exchange='',
      44                           routing_key=properties.reply_to,  # 响应的信息是要发回给请求的队列,而不是rpc_queue
      45                           properties=pika.BasicProperties(
      46                               correlation_id=properties.correlation_id  # 请求的唯一值属性
      47                           ),
      48                           body=str(response))
      49     ch.basic_ack(delivery_tag=method.delivery_tag)  # 消息确认
      50 ​
      51 ​
      52 channel.basic_qos(prefetch_count=1)  # 处理过程中只接受一条消息
      53 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)    # 接受Client请求
      54 print(" [x] Awaiting RPC requests")
      55 channel.start_consuming()
    
    
  3. 客户端代码(生产者)

    1. 初始化

      1. 建立连接

      2. 回调处理响应消息

    2. 定义处理响应消息的回调函数

    3. 定义发送请求的函数

    4. 创建对象

    5. 发送请求

    6. 关闭连接

       1 # client.py
       2 import pika
       3 import uuid
       4 ​
       5 ​
       6 class FibRPCClient:
       7     '''
       8     定义一个客户端类,发送信息的Producer
       9     初始化的时候建立连接和渠道
      10     随机生成一个队列作为回调队列
      11     从回调队列中接收server响应的消息
      12     '''
      13 ​
      14     def __init__(self):
      15         hostname = '192.168.253.129'
      16         credentials = pika.PlainCredentials('admin', 'admin')
      17         parameters = pika.ConnectionParameters(hostname, credentials=credentials)
      18         self.connecion = pika.BlockingConnection(parameters=parameters)
      19         self.channel = self.connecion.channel()
      20         result = self.channel.queue_declare(queue='',exclusive=True) # 声明临时的队列
      21         self.callback_queue = result.method.queue  # 记录临时队列的队列号
      22         self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response)  # 接受响应信息
      23         self.response = ''
      24         self.correlation_id = ''
      25 ​
      26     def on_response(self, ch, method, properties, body):
      27         """
      28         处理响应信息
      29         :param ch:
      30         :param method:
      31         :param properties:从属性中获取correlation_id
      32         :param body: 获取响应信息的内容
      33         :return:
      34         """
      35         # 当请求的唯一值相同则获取响应信息
      36         if self.correlation_id == properties.correlation_id:
      37             self.response = body
      38 ​
      39     def call(self, n):
      40         """
      41         Fib客户端发送请求
      42         :param n:
      43         :return:
      44         """
      45         self.response = None
      46         self.correlation_id = str(uuid.uuid4())    # 唯一的ID
      47         # 发送请求到server端
      48         self.channel.basic_publish(exchange='',
      49                                    routing_key='rpc_queue',
      50                                    properties=pika.BasicProperties(
      51                                        reply_to=self.callback_queue, # 指定回调队列
      52                                        correlation_id=self.correlation_id  # 使请求唯一
      53                                    ),
      54                                    body=str(n))
      55         # 等待server的响应
      56         while self.response is None:
      57             self.connecion.process_data_events()
      58 ​
      59         return int(self.response)
      60 ​
      61 ​
      62 fib_rpc = FibRPCClient()
      63 print('[x] Requesting fib(30)')
      64 response = fib_rpc.call(30)
      65 print(f'[.] Got %s ' % response)
      66 fib_rpc.connecion.close()

标签:pika,exchange,队列,RabbitMQ,queue,消息,channel
来源: https://www.cnblogs.com/totopian/p/15187706.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有