ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

你好RabbitMQ消息队列

2022-03-30 22:31:25  阅读:186  来源: 互联网

标签:String 队列 消息 rabbitmq client RabbitMQ import com 你好


消息队列

RabbitMQ是一个消息队列,它能够接收和转发消息。这个过程就像寄快递一样,把物件打包给快递小哥,快递小哥会负责把物件派送到正确的地址。

生产者和消费者

生产者就是用来生产消息(发送消息)的:

img

消费者就是用来消费消息(接收消息)的:

img

在生产者和消费者之间的就是消息队列

img

它相当于消息缓冲区,最多能存储多少数据只受限于机器的内存和磁盘。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列接收消息。

(P) -> [|||] -> (C)

Windows安装RabbitMQ

参考mall商城学习教程的RabbitMQ部分内容:

http://www.macrozheng.com/#/architect/mall_arch_09?id=rabbitmq

原文中rabbitmq-server-3.7.14.exe下载地址失效了,改从这里下载:

https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14

安装完成后,确认服务已开启:

image-20220326205428623

进入RabbitMQ安装目录下的sbin目录:

image-20220326204901185

在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能:

rabbitmq-plugins enable rabbitmq_management

RabbitMQ运行在本地机器上:

http://localhost:15672/

image-20220326182614819

默认用户名密码为guest / guest:

image-20220326205726347

Java客户端

amqp-client-5.7.1.jar是RabbitMQ官方提供的Java客户端:

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

image-20220326142538038

既可以直接下载jar包,也可以在Maven中添加依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

生产消息

img

导包:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

创建类Send,定义队列名为hello:

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

建立连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}
  • 代码中创建了一个Connection实例和一个Channel实例,它们都用try语句包裹了起来,这是因为Connection和Channel类都实现了java.io.Closeable,try语句会自动关闭连接。

声明消息队列,并发送Hello World!消息到队列中:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
  • 声明消息队列是个幂等操作,重复声明不会重复创建队列。
  • 消息体是字节数组(byte array)。

Send.java完整代码:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费消息

[|||] -> (C)

消费消息的代码跟生产消息的代码类似,也需要导包,建立连接:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}
  • 消费者也声明了一个消息队列,因为有可能消费者比生产者先启动。这样能确保消费消息时,有队列存在。
  • 消费者没有用try语句,因为消费者一直在异步监听消息,如果把连接关闭了,它就没法消费了。

导包中有个DeliverCallback,通过它就能消费消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

因为发送消息和接收消息都是异步的,所以它叫做,callback,回调。

Recv.java完整代码:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

运行代码

运行Send.java生产消息后,能看到RabbitMQ后台已经有1条消息:

image-20220326210013199

和1个消息队列:

image-20220326210233345

并且发送完成后就断开了连接。

运行Recv.java消费消息后,能看到队列中已经没有消息了:

image-20220326210451253

而消费者仍然保持着连接,持续监控新消息。如果把消费者停掉,连接就会断开。

从消息队列中能看到整个过程如下图所示:

image-20220326211204785

参考资料:

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

http://www.macrozheng.com/#/architect/mall_arch_09?id=rabbitmq

标签:String,队列,消息,rabbitmq,client,RabbitMQ,import,com,你好
来源: https://www.cnblogs.com/df888/p/16060908.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有