ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ(1)消息队列中间件使用

2022-07-02 15:34:14  阅读:177  来源: 互联网

标签:队列 中间件 factory rabbitmq RabbitMQ docker erlang


RabbitMQ消息队列中间件使用

RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-MobileRuntastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。

RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

RabbitMQ的介绍

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。

(P) -> [|||] -> (C)

  • 生产者:只负责发送消息到队列缓存中。
  • 队列:队列是位于RabbitMQ内的邮筒的名称。RabbitMQ 和应用程序之间的消息都存储在队列中。队列仅受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。生产者可以发送多个消息到同一个队列中,同样多个消费者可以从同一个队列中获取数据。
  • 消费者:只负责获取队列中的数据。

生产者、使用者和代理不必驻留在同一台主机上。事实上,在大多数应用程序中,它们都没有在一台主机上。因此应用程序也可以既是生产者又是消费者。

RabbitMQ的特点

Docker安装RabbitMQ

一、Docker安装erlang

由于RabbitMQ是erlang语言开发的,所以我们在安装RabbitMQ前先安装erlang,以便运行RabbitMQ,我们本次使用Docker安装因此只需要直接拉取容器即可。通过Docker搜索容器命令docker search --limit 5 erlang搜索到前5条erlang相关容器信息。

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 erlang
NAME                                    DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
erlang                                  Erlang is a programming language used to bui…   330       [OK]       
erlangsolutions/wombatoam               WombatOAM image without a license key           2                    
circleci/erlang                         CircleCI images for Erlang                      0                    
erlang/ubuntu-build                     Ubuntu based build images                       0                    
erlangsolutions/node-with-build-tools   https://github.com/esl/node-with-build-tools    0   

我直接拉取最新容器:docker pull erlang,如需要拉取特定版本的erlang和RabbitMQ则需要版本对应RabbitMQ Erlang 版本对应要求 — RabbitMQ

验证erlang是否安装完成,在拉取容器后:docker run correl/erlang echo "hello word",如果回馈"hello word"则完成拉取。

root@iZ059o7jp1sn1wZ:~# docker run erlang echo "hello word"
hello word

本次运行erlang就不配置映射文件了,直接运行docker run -it --name 别名 端口号,需要设置配置文件挂载等请看另一篇博客Docker使用相关指令

二、Docker安装RabbitMQ

安装完erlang后开始安装RabbitMQ,docker search --limit 5 rabbitmq查询你需要的版本,这边直接拉取最新镜像docker pull rabbitmq

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 rabbitmq
NAME                        DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
rabbitmq                    RabbitMQ is an open source multi-protocol me…   4349      [OK]       
bitnami/rabbitmq            Bitnami Docker Image for RabbitMQ               86                   [OK]
bitnami/rabbitmq-exporter                                                   1                    
circleci/rabbitmq-delayed   https://github.com/circleci/rabbitmq-delayed…   1                    
circleci/rabbitmq           This image is for internal use                  0  

挂载安装RabbitMQ:docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 -v /docker/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq

  • RABBITMQ_DEFAULT_USER:用户账号
  • RABBITMQ_DEFAULT_PASS:用户密码

Web可视化端使用此用户密码登录,如果没设置用户账号和密码,默认的账号和密码为guest/guest。使用docker ps查看是否正常运行RabbitMQ和erlang。

注意: guest用户只能本地(localhost、127.0.0.1)访问Web可视化管理界面,其他IP访问需另创用户并授权

root@iZ059o7jp1sn1wZ:~# docker ps
CONTAINER ID   IMAGE                       COMMAND                  CREATED         STATUS         PORTS                                                                                                                                      NAMES
40d6fe910b2e   rabbitmq                    "docker-entrypoint.s…"   4 minutes ago   Up 4 minutes   4369/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
6a499e7c356c   0b68d51c5f30                "erl"                    3 hours ago     Up 3 hours                                                                                                                                                erlang

安装Web可视化插件

进入RabbitMQ容器docker exec -it 容器ID /bin/bash,安装可视化插件rabbitmq-plugins enable rabbitmq_management

root@iZ059o7jp1sn1wZ:~# docker exec -it 40d6fe910b2e /bin/bash
root@40d6fe910b2e:/# rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@40d6fe910b2e:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@40d6fe910b2e...
The following plugins have been enabled:
  rabbitmq_management

started 1 plugins.

web进入可视化界面IP:15672,密码为上面设定RabbitMQ的账号密码。

image-20220621113758887

登录成功后的页面

image-20220621113819614

RabbitMQ初の体验(Hello Word)

首先安装相关包,我们可以在RabbitMQ官网找到相关依赖包,RabbitMQ连接需要SLF4J依赖,本次简单的RabbitMQ程序SLF4J Simple已经够用,但你应该在生产中使用一个完整的日志记录库,如Logback

image-20220623153417303

发送

(P) -> [|||]

发布者将连接到 RabbitMQ,发送一条消息,然后退出。

代码部分

package Demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        // 1、创建链接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("114.55.34.91");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        // 2、链接
        try (Connection connection = factory.newConnection();
             // 3、获取通道
             Channel channel = connection.createChannel()) {
            /**
             * 1、name:    队列名称
             * 2、durable: 是否持久化
             * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
             * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
             * 5、arguments:参数
             * */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

arguments参数:

  • Message TTL | Auto expire | Overflow behaviour | Single active consumer
    Dead letter exchange | Dead letter routing key
    Max length | Max length bytes
    Maximum priority | Lazy mode | Master locator

运行结果

Connected to the target VM, address: '127.0.0.1:60528', transport: 'socket'
 [x] Sent 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:60528', transport: 'socket'

Process finished with exit code 0

用Debug运行到连接的时候,我们可以在可视化页面看见连接的用户IP和用户名字

image-20220623153923360

接收

[|||]-> (C)

这就是我们的出版商。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以监听消息并将其打印出来。

我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。

设置与发送者相同,我们都需要打开同一个队列,需要和发布者发布的队列一样。

public class Consumer {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

请注意,我们在此处也声明队列。由于我们可能会在发布者之前启动使用者,因此我们希望在尝试使用队列中的消息之前确保队列存在。

我们为什么不使用try-with-resource语句来自动关闭通道和连接?因为我们希望当消费者异步侦听消息到达时,进程保持活动状态。

我们要从服务器队列中拿取消息。由于它将异步推送消息,因此我们以对象的形式提供回调,该回调将缓冲消息,直到我们要使用它们。这就是DeliverCallback子类的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

代码部分

package Demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("114.55.34.91");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // DeliverCallback缓冲服务器推送给我们的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

RabbitMQ工作队列

在上面的教程中,我们学会了接受和发送消息,本章节对MQ的接收者消息进行处理。

img

轮询机制

当多个消费者对MQ存储消息进行接收,每个消费者都分配一条,到消息全部被消费。使用任务队列的优点之一是能够轻松并行化工作。如果我们正在积累积压的工作,我们可以添加更多的消费者来进行扩展。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。将上文发送代码改为,输入来进行轮询测试。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 输入参数
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }

创建2个线程来进行消费,我们可以直接在idea内创建2个线程来进行处理。

image-20220702135152222

image-20220702135400029

创建好2个消费者线程后,启动消费者线程进行消费监听,然后启动发送者发送消息来处理。

发送者:

image-20220702135528889

消费者1:

image-20220702135555780

消费者2:

image-20220702135635700

结论:消息发送线程,发送消息时工作线程会轮询得到消息发送线程发送的消息,这是一种公平的策略,但是这种方式效率较低,在实际工作中一般采用不公平的策略。

标签:队列,中间件,factory,rabbitmq,RabbitMQ,docker,erlang
来源: https://www.cnblogs.com/HeiDaotu/p/16437652.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有