ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Docker安装RabbitMQ Java实战详解

2022-07-06 01:32:54  阅读:168  来源: 互联网

标签:Java String 队列 RabbitMQ connection 消息 Docker public channel


下载 RabbitMQ 镜像

rabbitmq:3-management 默认安装并启用 rabbitmq_management

docker pull rabbitmq:3.10-management 

创建并运行 RabbitMQ 容器

docker run -d -p 5672:5672 -p 15672:15672 \
	-e RABBITMQ_DEFAULT_USER=admin \
	-e RABBITMQ_DEFAULT_PASS=admin \
	--hostname myRabbit \
	--name rabbitmq \
	rabbitmq:3.10-management 

参数说明:

  • -d:表示在后台运行容器;
  • -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
    • 5672:RabbitMQ 提供给编程语言客户端链接的端口
    • 15672:RabbitMQ 管理界面的端口
    • 25672:RabbitMQ 集群的端口
  • -e:指定环境变量:
    • RABBITMQ_DEFAULT_USER:默认的用户名;
    • RABBITMQ_DEFAULT_PASS:默认的用户密码;
  • --hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
  • --name rabbitmq:设置容器名称;
  • rabbitmq:3.10-management :容器使用的镜像名称;

查看启动情况:

➜  bin docker ps -l
CONTAINER ID   IMAGE                      COMMAND                  CREATED              STATUS              PORTS                                                                                                                                                 NAMES
21dec23292a9   rabbitmq:3.10-management   "docker-entrypoint.s…"   About a minute ago   Up About a minute   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq

设置 docker 启动的时候自动启动(可选):

docker update rabbitmq --restart=always

访问 RabbitMQ 后台管理

  • 浏览器输入地址:http://ip:15672 即可访问后台管理页面,这里的 ip 为运行 RabbitMQ 所在的服务器的 IP 地址;

  • 账号密码是你创建容器时指定的账号密码

  • 如果访问失败,请尝试关闭防火墙

    image-20220612172434949

    image-20220612172459818

创建 RabbitMq 连接

  1. 创建虚拟主机目录
public class ConnectionUtils {

    /**
     * 获取 RabbitMq 连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception{
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("dev.lzscxb.cn");
        factory.setPort(5672);
        factory.setVirtualHost("/lzscxb");
        factory.setUsername("admin");
        factory.setPassword("admin");

        return factory.newConnection();

    }

    public static void main(String[] args) throws Exception {
        Connection connection = getConnection();
        System.out.println(connection);
        connection.close();
    }

}

输出结果:

amqp://admin@1.14.160.174:5672//lzscxb

RabbitMQ模式

简单模式

image-20220612184805825

消息发送者

/**
 * 发送者
 */
public class Sender {

    public static void main(String[] args) throws Exception {
        String msg = "hello 你好";
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建信道
        Channel channel = connection.createChannel();

        /**
         * 声明队列
         * 参数1:队列的名称
         * 参数2:队列中的数据是否持久化
         * 参数了:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
         * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
         * 参数5:队列参数(没有参数为nuLL)
         */
        channel.queueDeclare("queue1", false, false, false, null);
        /**
         * 向指定队列发送消息
         * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
         * 参数2:目标队列的名称
         * 参数了:设置消息的属性(没有属性则为nuLL)
         * 参数4:消息的内容(只接收字节数组)
         */
        channel.basicPublish("", "queue1", null, msg.getBytes());

        // 释放资源
        channel.close();
        connection.close();
    }

}

运行结果:

image-20220612182622469

消息接收者

public class Receiver {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue1", false, false, false, null); // 声明队列
        // 从信道中获取信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * @param consumerTag
             * @param envelope
             * @param properties 协议
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("接收 = " + msg);
            }
        };
        // 监听队列 true 自动确认消息
        channel.basicConsume("queue1",true,consumer);
    }
}

运行结果:

image-20220612182656228

消息确认机制 ACK

通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除

RabbitMQ如何得知消息被消费者接收?

  • 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得 知,这样消息就丢失了
  • 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
  • ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功
  • 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
  • 不过这种回执ACK分为两种情况:
    • 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
    • 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
  • 两种情况如何选择,需要看消息的重要性:
    • 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
    • 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把 消息从队列中
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);

完整代码:

public class Receiver {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 从信道中获取信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * @param consumerTag
             * @param envelope
             * @param properties 协议
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("接收 = " + msg);
                // 手动确认(收件人信息,是否同时确认多个消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列 false手动确定消息
        channel.basicConsume("queue1", false, consumer);
    }
}

工作队列模式

image-20220612184818887

  • 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能 力有限,就会产生消息在队列中堆积(生活中的滞销)
  • 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
  • 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共 享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被 一个人吃)

消费者

如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它 只是盲目地将每条第n个消息分派给第n个消费者。

为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker。

/**
 * 消息接收者1
 */
public class Receiver2 {
    static Integer i = 0; // 统计吃掉羊肉串的数量

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", false, false, false, null);
        // 可以理解为快递一个一个送,送完在送下一个
        channel.basicQos(1);
        // 从信道中获取信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag
             * @param envelope
             * @param properties 协议
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.printf("【顾客2】吃掉 = %s!总共吃[%d]串 \n", msg, ++i);
                // 模拟网络延迟
                try {
                    Thread.sleep(900);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // 手动确认(收件人信息,是否同时确认多个消息)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 手动确定消息
        channel.basicConsume("work_queue", false, consumer);
    }
}

必须使用 ACK 确认消息才能生效

生产者

/**
 * 消息发送者
 */
public class Sender {

    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建信道
        Channel channel = connection.createChannel();

        /**
         * 创建队列
         * 参数1:队列的名称
         * 参数2:队列中的数据是否持久化
         * 参数了:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
         * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
         * 参数5:队列参数(没有参数为nuLL)
         */
        channel.queueDeclare("work_queue", false, false, false, null);
        // 发送 100 条数据
        for (int i = 0; i < 100; i++) {
            String msg = "羊肉串 --> " + i;
            /**
             * 向指定队列发送消息
             * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
             * 参数2:目标队列的名称
             * 参数了:设置消息的属性(没有属性则为nuLL)
             * 参数4:消息的内容(只接收字节数组)
             */
            channel.basicPublish("", "work_queue", null, msg.getBytes());
            System.out.println("新鲜出炉:" + msg);
        }
        // 释放资源
        channel.close();
        connection.close();
    }

}

订阅模式

生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视 频通知

image-20220613093559688

  • 上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)

  • P生产者发送信息给X路由,X将信息转发给绑定X的队列

image-20220613093626635

X队列将信息通过信道发送给消费者,从而进行消费

整个过程,必须先创建路由

  • 路由在生产者程序中创建
  • 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没 有队列,路由并不知道将信息发送给谁

运行程序的顺序:

​ 1. MessageSender

  1. MessageReceiver1和MessageReceiver2

  2. MessageSender

生产者

/**
 * 消息发送者
 */
public class Sender {

    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建信道
        Channel channel = connection.createChannel();

        // 声明路由 (路由名,路由类型)
        // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上
        channel.exchangeDeclare("exchange_fanout", "fanout");


        String msg = "Hello Java!";
        /**
         * 向指定队列发送消息
         * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
         * 参数2:目标队列的名称
         * 参数了:设置消息的属性(没有属性则为nuLL)
         * 参数4:消息的内容(只接收字节数组)
         */
        channel.basicPublish("exchange_fanout", "", null, msg.getBytes());
        System.out.println("生产者:" + msg);
        // 释放资源
        channel.close();
        connection.close();
    }

}

消费者

/**
 * 消息接收者1 (粉丝1)
 */
public class Receiver1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_fanout_queue_1", false, false, false, null);
        // 绑定队列(关注)
        channel.queueBind("test_exchange_fanout_queue_1", "exchange_fanout", "");
        // 从信道中获取信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag
             * @param envelope
             * @param properties 协议
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.printf("【消费者1】 %s \n", msg);
            }
        };
        // 监听队列 true 自动确认消息
        channel.basicConsume("test_exchange_fanout_queue_1", true, consumer);
    }
}

路由模式

路由会根据类型进行定向分发消息给不同的队列,如图所示

可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货

image-20220613095800902

生产者

public class Sender {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明路由(路由名,路由类型)
        // direct:根据路由键进行定向分发消息
        channel.exchangeDeclare("test_exchange_direct", "direct");
        String msg = "用户注册,【userid=S101】";
        channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());
        System.out.println("[用户系统]:" + msg);
        channel.close();
        connection.close();
    }
}

消费者1

public class Receiver1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_direct_queue_1", false, false, false, null);
        // 绑定路由(如果路由键的类型是 添加,删除,修改 的话,绑定到这个队列1上)
        //        2.3.4.3 消费者2
        //        1. 记住运行程序的顺序,先运行一次sender(创建路由器),
        //        2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
        //        3. 再次运行sender,发出消息
        //        2.3.5 通配符模式
        channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert");
        channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "update");
        channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "delete");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】 = " + s);
            }
        };
        // 4.监听队列 true:自动消息确认
        channel.basicConsume("test_exchange_direct_queue_1", true, consumer);
    }
}

消费者2


/**
 * 消息接收者2
 */
public class Receiver2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_direct_queue_2", false, false, false, null);
        // 绑定路由(如果路由键的类型是 查询 的话,绑定到这个队列2上)
        channel.queueBind("test_exchange_direct_queue_2", "test_exchange_direct", "select");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者2】 = " + s);
            }
        };
        // 4.监听队列 true:自动消息确认
        channel.basicConsume("test_exchange_direct_queue_2", true, consumer);
    }
}
  1. 记住运行程序的顺序,先运行一次sender(创建路由器),
  2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
  3. 再次运行sender,发出消息

通配符模式 topic

image-20220613100829225

  • 和路由模式90%是一样的。

  • 唯独的区别就是路由键支持模糊匹配

  • 匹配符号

    • *:只能匹配一个词(正好一个词,多一个不行,少一个也不行) #:匹配0个或更多个词 看一下官网案例: Q1绑定了路由键 .orange. Q2绑定了路由键 ..rabbit 和 lazy.#

持久化

  • 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?
    • 消费者的ACK确认机制,可以防止消费者丢失消息
    • 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
  • 想要将消息持久化,那么 路由和队列都要持久化 才可以

生产者

public class Sender {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明路由(路由名,路由类型,持久化)
        channel.exchangeDeclare("test_exchange_topic", "topic", true);
        String msg = "商品降价";
        // 发送消息(第三个参数作用是让消息持久化)
        channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
        System.out.println("[用户系统]:" + msg);
        channel.close();
        connection.close();
    }
}

消费者

public class Receiver {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列( 第二个参数为true:支持持久化)
        channel.queueDeclare("test_exchange_topic_queue_1", true, false, false, null);
        channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("【消费者1】 = " + s);
            }
        };
        channel.basicConsume("test_exchange_topic_queue_1", true, consumer);
    }
}

标签:Java,String,队列,RabbitMQ,connection,消息,Docker,public,channel
来源: https://www.cnblogs.com/LzsCxb/p/16449220.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有