ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

消息队列 64式 : 1、rabbitmq qos原理分析

2019-07-20 09:08:48  阅读:546  来源: 互联网

标签:qos rabbit self driver rabbitmq 64 conf ._


python 64式: 第32式、rabbitmq性能调优

1 rabbit qos基础
rabbit qos可以设置一个整数值N,表示的意思就是一个消费者最多只能一次拉取N条消息,
一旦N条消息没有处理完,就不会从队列中获取新的消息,直到有消息被ack。
设置qos的作用就是放置消费者从队列中一下拉取所有消息,从而导致
击穿服务,导致服务崩溃或异常。
 

2 rabbit qos在openstack组件中的应用
可以在组件的配置文件中设置类似如下内容,其中数字100可以根据实际需要修改。
[oslo_messaging_rabbit]
rabbit_qos_prefetch_count = 100

3 oslo_messaging中rabbitmq qos源码分析
查看oslo_messaging
newton版本:
oslo.messaging===5.10.2
代码:
https://github.com/openstack/oslo.messaging
https://github.com/openstack/oslo.messaging/tree/5.10.2
下载:
cd /home/machao/myproject/task/385_rabbitmq_qos原理/oslo.messaging-5.10.2

3.1 查找相关代码
grep -r "rabbit_qos_prefetch_count" ./*
对应输出内容如下:
[root@localhost oslo.messaging-5.10.2]# grep -r "rabbit_qos_prefetch_count" ./*
./oslo_messaging/_drivers/impl_rabbit.py:    cfg.IntOpt('rabbit_qos_prefetch_count',
./oslo_messaging/_drivers/impl_rabbit.py:        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
./oslo_messaging/_drivers/impl_rabbit.py:        if self.rabbit_qos_prefetch_count > 0:
./oslo_messaging/_drivers/impl_rabbit.py:                              self.rabbit_qos_prefetch_count,
./oslo_messaging/_drivers/impl_rabbit.py:            conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
./oslo_messaging/tests/drivers/test_impl_rabbit.py:        self.config(rabbit_qos_prefetch_count=prefetch,
./oslo_messaging/tests/test_config_opts_proxy.py:                    rabbit_qos_prefetch_count=0,
./oslo_messaging/tests/test_config_opts_proxy.py:                       "?rabbit_qos_prefetch_count=2"
./oslo_messaging/tests/test_config_opts_proxy.py:                         conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)


3.2 总入口在
oslo_messaging/_drivers/impl_rabbit.py
中的Connection类中,具体代码如下:

class Connection(object):
    """Connection object."""

    pools = {}

    def __init__(self, conf, url, purpose):
        # NOTE(viktors): Parse config options
        driver_conf = conf.oslo_messaging_rabbit

        self.max_retries = driver_conf.rabbit_max_retries
        self.interval_start = driver_conf.rabbit_retry_interval
        self.interval_stepping = driver_conf.rabbit_retry_backoff
        self.interval_max = driver_conf.rabbit_interval_max

        self.login_method = driver_conf.rabbit_login_method
        self.fake_rabbit = driver_conf.fake_rabbit
        self.virtual_host = driver_conf.rabbit_virtual_host
        self.rabbit_hosts = driver_conf.rabbit_hosts
        self.rabbit_port = driver_conf.rabbit_port
        self.rabbit_userid = driver_conf.rabbit_userid
        self.rabbit_password = driver_conf.rabbit_password
        self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
        self.rabbit_transient_queues_ttl = \
            driver_conf.rabbit_transient_queues_ttl
        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
      ......


    def _set_qos(self, channel):
        """Set QoS prefetch count on the channel"""
        if self.rabbit_qos_prefetch_count > 0:
            channel.basic_qos(0,
                              self.rabbit_qos_prefetch_count,
                              False)

分析:
3.2.1) 其中_set_qos方法被如下方法调用
    def _set_current_channel(self, new_channel):
        """Change the channel to use.

        NOTE(sileht): Must be called within the connection lock
        """
        if new_channel == self.channel:
            return

        if self.channel is not None:
            self._declared_queues.clear()
            self._declared_exchanges.clear()
            self.connection.maybe_close_channel(self.channel)

        self.channel = new_channel

        if new_channel is not None:
            if self.purpose == rpc_common.PURPOSE_LISTEN:
                self._set_qos(new_channel)
            self._producer = kombu.messaging.Producer(new_channel)
            for consumer in self._consumers:
                consumer.declare(self)
3.2.2) 分析_set_current_channel方法
其中_set_current_channel方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的ensure_connection方法调用
    def ensure_connection(self):
        # NOTE(sileht): we reset the channel and ensure
        # the kombu underlying connection works
        self._set_current_channel(None)
        self.ensure(method=lambda: self.connection.connection)
        self.set_transport_socket_timeout()


3.2.3)分析ensure_connection方法
该ensure_connection方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的__init__方法调用,具体代码如下
class Connection(object):
    """Connection object."""

    pools = {}

    def __init__(self, conf, url, purpose):
        # NOTE(viktors): Parse config options
        driver_conf = conf.oslo_messaging_rabbit

        self.max_retries = driver_conf.rabbit_max_retries
        self.interval_start = driver_conf.rabbit_retry_interval
        self.interval_stepping = driver_conf.rabbit_retry_backoff
        self.interval_max = driver_conf.rabbit_interval_max

        self.login_method = driver_conf.rabbit_login_method
        self.fake_rabbit = driver_conf.fake_rabbit
        self.virtual_host = driver_conf.rabbit_virtual_host
        self.rabbit_hosts = driver_conf.rabbit_hosts
        self.rabbit_port = driver_conf.rabbit_port
        self.rabbit_userid = driver_conf.rabbit_userid
        self.rabbit_password = driver_conf.rabbit_password
        self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
        self.rabbit_transient_queues_ttl = \
            driver_conf.rabbit_transient_queues_ttl
        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
        self.heartbeat_timeout_threshold = \
            driver_conf.heartbeat_timeout_threshold
        self.heartbeat_rate = driver_conf.heartbeat_rate
        self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
        self.amqp_durable_queues = driver_conf.amqp_durable_queues
        self.amqp_auto_delete = driver_conf.amqp_auto_delete
        self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
        self.kombu_missing_consumer_retry_timeout = \
            driver_conf.kombu_missing_consumer_retry_timeout
        self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
        self.kombu_compression = driver_conf.kombu_compression

        if self.rabbit_use_ssl:
            self.kombu_ssl_version = driver_conf.kombu_ssl_version
            self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
            self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
            self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs

        # Try forever?
        if self.max_retries <= 0:
            self.max_retries = None

        if url.virtual_host is not None:
            virtual_host = url.virtual_host
        else:
            virtual_host = self.virtual_host

        self._url = ''
        if self.fake_rabbit:
            LOG.warning(_LW("Deprecated: fake_rabbit option is deprecated, "
                            "set rpc_backend to kombu+memory or use the fake "
                            "driver instead."))
            self._url = 'memory://%s/' % virtual_host
        elif url.hosts:
            if url.transport.startswith('kombu+'):
                LOG.warning(_LW('Selecting the kombu transport through the '
                                'transport url (%s) is a experimental feature '
                                'and this is not yet supported.'),
                            url.transport)
            if len(url.hosts) > 1:
                random.shuffle(url.hosts)
            for host in url.hosts:
                transport = url.transport.replace('kombu+', '')
                transport = transport.replace('rabbit', 'amqp')
                self._url += '%s%s://%s:%s@%s:%s/%s' % (
                    ";" if self._url else '',
                    transport,
                    parse.quote(host.username or ''),
                    parse.quote(host.password or ''),
                    self._parse_url_hostname(host.hostname) or '',
                    str(host.port or 5672),
                    virtual_host)
        elif url.transport.startswith('kombu+'):
            # NOTE(sileht): url have a + but no hosts
            # (like kombu+memory:///), pass it to kombu as-is
            transport = url.transport.replace('kombu+', '')
            self._url = "%s://%s" % (transport, virtual_host)
        else:
            if len(self.rabbit_hosts) > 1:
                random.shuffle(self.rabbit_hosts)
            for adr in self.rabbit_hosts:
                hostname, port = netutils.parse_host_port(
                    adr, default_port=self.rabbit_port)
                self._url += '%samqp://%s:%s@%s:%s/%s' % (
                    ";" if self._url else '',
                    parse.quote(self.rabbit_userid, ''),
                    parse.quote(self.rabbit_password, ''),
                    self._parse_url_hostname(hostname), port,
                    virtual_host)

        self._initial_pid = os.getpid()

        self._consumers = {}
        self._producer = None
        self._new_tags = set()
        self._active_tags = {}
        self._tags = itertools.count(1)

        # Set of exchanges and queues declared on the channel to avoid
        # unnecessary redeclaration. This set is resetted each time
        # the connection is resetted in Connection._set_current_channel
        self._declared_exchanges = set()
        self._declared_queues = set()

        self._consume_loop_stopped = False
        self.channel = None
        self.purpose = purpose

        # NOTE(sileht): if purpose is PURPOSE_LISTEN
        # we don't need the lock because we don't
        # have a heartbeat thread
        if purpose == rpc_common.PURPOSE_SEND:
            self._connection_lock = ConnectionLock()
        else:
            self._connection_lock = DummyConnectionLock()

        self.connection_id = str(uuid.uuid4())
        self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
                                  os.getpid(),
                                  self.connection_id)
        self.connection = kombu.connection.Connection(
            self._url, ssl=self._fetch_ssl_params(),
            login_method=self.login_method,
            heartbeat=self.heartbeat_timeout_threshold,
            failover_strategy=self.kombu_failover_strategy,
            transport_options={
                'confirm_publish': True,
                'client_properties': {
                    'capabilities': {
                        'authentication_failure_close': True,
                        'connection.blocked': True,
                        'consumer_cancel_notify': True
                    },
                    'connection_name': self.name},
                'on_blocked': self._on_connection_blocked,
                'on_unblocked': self._on_connection_unblocked,
            },
        )

        LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
                  ' %(hostname)s:%(port)s',
                  self._get_connection_info())

        # NOTE(sileht): kombu recommend to run heartbeat_check every
        # seconds, but we use a lock around the kombu connection
        # so, to not lock to much this lock to most of the time do nothing
        # expected waiting the events drain, we start heartbeat_check and
        # retrieve the server heartbeat packet only two times more than
        # the minimum required for the heartbeat works
        # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
        # heartbeat_rate is 2)
        self._heartbeat_wait_timeout = (
            float(self.heartbeat_timeout_threshold) /
            float(self.heartbeat_rate) / 2.0)
        self._heartbeat_support_log_emitted = False

        # NOTE(sileht): just ensure the connection is setuped at startup
        self.ensure_connection()
      ......


总结:
oslo.messaging库中设置rabbitmq的rabbit_qos_prefetch_count整体调用路径是:
1 RabbitDriver的__init__方法中使用
        connection_pool = pool.ConnectionPool(
            conf, max_size, min_size, ttl,
            url, Connection)
2 Connection的__init__方法中调用ensure_connection方法
3 ensure_connection方法中调用_set_current_channel方法
4 _set_current_channel方法中_set_qos方法
5 _set_qos方法中
    def _set_qos(self, channel):
        """Set QoS prefetch count on the channel"""
        if self.rabbit_qos_prefetch_count > 0:
            channel.basic_qos(0,
                              self.rabbit_qos_prefetch_count,
                              False)
也就是说oslo.messaging在使用rabbitmq作为后端实现时,设置的
rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。实际就是一个消息限流的处理,避免同时处理大量消息
导致的问题。


发现:
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0 
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别

4 总结
oslo_messaging中设置的rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。

参考
https://www.rabbitmq.com/consumer-prefetch.html
https://www.itsvse.com/thread-4667-1-1.html

标签:qos,rabbit,self,driver,rabbitmq,64,conf,._
来源: https://blog.csdn.net/qingyuanluofeng/article/details/96270140

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有