ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Python RabbitMQ基础知识

2021-09-19 10:00:05  阅读:222  来源: 互联网

标签:pika exchange Python RabbitMQ 基础知识 queue 队列 connection channel


rabbitmq

  1. 概念

    消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,在很多生产环境中需要控制并发量的场景下用到。消息队列可为这些分布式应用程序提供通信和协调。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActivateMQ、Kafka等。

    • Broker:简单来说就是消息队列服务器实体
    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
    • Queue:消息队列载体,每个消息都会被投入到一个或多个队列
    • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    • Routing Key:路由关键字,exchange根据这个关键字进行消息投递
    • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
    • producer:消息生产者,就是投递消息的程序
    • consumer:消息消费者,就是接受消息的程序
    • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

image

image

  1. 应用场景

    • 应用解耦:多用用间通过消息队列对同一个消息处理,避免调用接口失败导致整个过程失败。
    • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
    • 限流削峰:双十一,618等抢购活动。
    • 消息驱动系统:业务体量不断扩大,采用微服务设计思想,分布式的部署方式。

    成本:

    • 应用复杂度:需要对消息队列进行管理
    • 暂时不一致性

    使用消息队列满足条件:

    • 生产者不需要立刻从消费者出获得反馈
    • 容许短暂的不一致性
    • 起到解耦,提速,广播,削峰等作用

    消息队列的使用场景是怎样的?

Docker安装

rabbitmq是在portain平台上安装rabbitmq-management。

  1. 访问dockerhub,搜索rabbitmq,点击进去rabbitmq获取rabbitmq-management的dockerfile链接。

image


FROM rabbitmq:3.9

RUN set eux; \
	rabbitmq-plugins enable --offline rabbitmq_management; \
# make sure the metrics collector is re-enabled (disabled in the base image for Prometheus-style metrics by default)
	rm -f /etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf; \
# grab "rabbitmqadmin" from inside the "rabbitmq_management-X.Y.Z" plugin folder
# see https://github.com/docker-library/rabbitmq/issues/207
	cp /plugins/rabbitmq_management-*/priv/www/cli/rabbitmqadmin /usr/local/bin/rabbitmqadmin; \
	[ -s /usr/local/bin/rabbitmqadmin ]; \
	chmod +x /usr/local/bin/rabbitmqadmin; \
	apt-get update; \
	apt-get install -y --no-install-recommends python3; \
	rm -rf /var/lib/apt/lists/*; \
	rabbitmqadmin --version

EXPOSE 15671 15672

  1. 在Protainer上创建镜像。

image

image

  1. 运行镜像,主要是将容器的15672(management端口)和5672(amqp端口映射出来)。

  2. 最后访问http://[服务器ip]:15672即可到rabbitmq管理界面,输入默认账号密码guest/guest即可访问。

  3. 关于rabbitmq管理界面
    image

    image

    image

    image

点击任意一个Exchange:

image

image

点击任意一个queue:

image

用一个邮局的例子来说明各自的作用。首先邮局表示一个队列,邮筒就是一个channel。channel的作用是建立会话任务。每个地方建立一个邮局很“贵”类似每次建立TCP/IP链接非常“贵”且耗时,用户也无需每次跑到邮局,只需要把信放在邮筒即可。邮局收到用户的信后,根据信封上的地址(exchange)投递给收信方。

python实现

  1. 简单消费者生产者模式

import pika
import json

credentials = pika.PlainCredentials(user, user)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')

for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print('send:'+message)
connection.close()


credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print('receive:'+body.decode())

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

把消费的代码注释掉,我们在rabbitmq management看看


import pika
import json

credentials = pika.PlainCredentials(user, user)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')

for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print('send:'+message)
connection.close()


# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials))
# channel = connection.channel()
# # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
# channel.queue_declare(queue = 'python-test', durable = False)
# # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
# def callback(ch, method, properties, body):
#     ch.basic_ack(delivery_tag = method.delivery_tag)
#     print('receive:'+body.decode())

# # 告诉rabbitmq,用callback来接收消息
# channel.basic_consume('python-test',callback)
# # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
# channel.start_consuming()

image

image

Ack mode选择Automatic Ack模式后,消费消息自动确认

image

  1. 工作模式

image

一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。有两种消息分发机制,轮询分发和公平分发:轮询分发的特点是将消息轮流发送给每个消费者,在实际情况中,多个消费者,难免有的处理得快,有的处理得慢,如果都要等到一个消费者处理完,才把消息发送给下一个消费者,效率就大大降低了。而公平分发的特点是,只要有消费者处理完,就会把消息发送给目前空闲的消费者,这样就提高消费效率了。


# producer

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

# consumer

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 公平分发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

image

image

  1. Publish/Subscribe模式

image

publish.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

Subscribe.py


import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

  1. 路由模式

image

producer.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

consumer.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

  1. Topic模式

image

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
  1. 路由模式

image


import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()


import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

参考

RabbbitMq官网

消息队列rabbitmq

python实现rabbitmq六种模式

pika-Api文档

标签:pika,exchange,Python,RabbitMQ,基础知识,queue,队列,connection,channel
来源: https://blog.csdn.net/u012655441/article/details/120373963

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有