ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ——入门

2022-01-18 11:06:49  阅读:100  来源: 互联网

标签:String 队列 RabbitMQ QUEUE 消息 message channel 入门


RabbitMQ工作原理

四大核心概念

  • 生产者:产生数据发送消息的程序是生产者
  • 交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  • 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

在这里插入图片描述

Hello World

在本节中将学习使用RabbitMQ创建一个最简单的例子

引入依赖

<!--rabbitmq 依赖客户端-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.6</version>
</dependency>

编写生产者代码

public class Producer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.130");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列中的消息是否支持持久化,false:消息存在内存中,true:消息存在硬盘中
         * 3.该队列是否只供一个消费者消费,false:只提供一个消费者消费
         * 4.最后一个消费者断开连接之后该队列是否自动删除,true为自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";
        /**
         * 发送一个消息
         * 1.发送消息到哪个交换机,简单模式所以不需要交换机,填空字符串即可
         * 2.路由的key是哪个
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

当消息发送完毕后,我们可以查看客户端,发现队列中多了一个名为hello的队列,接下来我们继续编写消费者代码

在这里插入图片描述

编写消费者代码

public class Consumer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.130");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.队列名称
         * 2.消费成功后是否自动应答,true为自动应答,false不自动
         * 3.消费者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
    }
}

当我们执行消费者代码时,会输出hello world,并且客户端中的消息以及被消费掉了

在这里插入图片描述

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。我们通过一张图,来理解一下。

在这里插入图片描述

轮流分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程 是如何工作的。

编写工具类

从helloworld案例中我们发现了创建工厂、获取信道一系列的工作都是重复的,所以将他整合为一个工具类,方便日后的使用

public class RabbitMqUtils {
    public static Channel getChannel(){
        Channel channel = null;
        try {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.200.130");
            factory.setUsername("admin");
            factory.setPassword("admin");
            //创建连接
            Connection connection = factory.newConnection();
            //获取信道
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
}

编写工作线程

public class Work01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("接收到的消息为:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        System.out.println("工作线程B正在等待消息...");
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

idea是支持同一个main方法多次运行的,需要进行一下设置,可以参考这位博主的博客 点击跳转

完成之后我们启动两次,一次为工作线程A,另一次为工作线程B

编写消费者

因为我们需要发送多次消息,可以通过控制台的输入来发送消息

public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("请求发送成功:" + message);
        }
    }
}

测试

我们输入AA,BB,CC,DD来看一下工作线程获取的结果,如果是依次获取,就证明了我们一开始的说法

在这里插入图片描述

消息应答

作用

举例说明:还是有两个工作线程来处理消息,比如工作线程A处理消息的时间较长,而在处理消息的过程中宕机了,如果不做任何处理,就会出现消息丢失的情况。所以我们需要一个消息应答,告诉RabbitMQ消息已经处理完毕之后,MQ才可以将消息删除,否则回到队列中。

在这里插入图片描述

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。

消息应答的方法

  • channel.basicAsk()

    用于肯定应答,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

  • channel.basicNack()

    用于否定应答

  • Channel.basicReject()

    用于否定应答,与channel.basicNack()相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

编写生产者代码

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

编写两个工作线程代码

这里我们让一个工作线程完成速度为1秒,另外一个为30秒,模拟工作任务时长的效果

public class Work02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程A正在等待消息,处理较快...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

public class Work03 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程B正在等待消息,处理较慢...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

测试

启动三个方法,正常的我们输入aa,bb,cc分别交由线程A和线程B处理,当我们再次输入dd时,是交给线程B处理的,这时候手动关闭进程,模拟宕机状况,我们发现线程A接替他处理了dd消息,这样就做到了消息不丢失的效果。

在这里插入图片描述

RabbitMQ持久化操作

队列持久化

之前我们创建的队列都是非持久化的,rabbitmq如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable参数设置为持久化

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        boolean durable = true; //设置队列持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

更改完成重启之后我们发现,该队列的状态被D标志,即持久化

在这里插入图片描述

消息持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        boolean durable = true; //设置队列持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            // MessageProperties.PERSISTENT_TEXT_PLAIN 设置消息持久化
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

不公平分发

在之前的课程中我们说过了,MQ在分发消息时是绝对公平的,从上述例子中我们也可以发现,无论B执行的多慢,所有消息都是A一条,B一条,这个其实照我来说确实不公平的。应该是能者多劳的形式,A做完了应该可以继续做。我们需要修改工作线程代码,如下

public class Work03 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程B正在等待消息,处理较慢...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        //设置不公平分发,1为不公平分发
        channel.basicQos(1);
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

通过测试可以发现,因为B要工作要沉睡30秒,A完成之后可以获取接下来的工作,而不是继续分发给B

在这里插入图片描述

预取值

预取值的概念相当于我作为领导,我已经分配好了,一共7个任务,我给B分配5个任务,给A分配2个任务,通过代码如下设置

public class Work02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程A正在等待消息,处理较快...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        //设置不公平分发,1为不公平分发,大于1代表预取值
        channel.basicQos(2);
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

这里我们需要通过一张图片说一下这个预取值的概念

在这里插入图片描述

标签:String,队列,RabbitMQ,QUEUE,消息,message,channel,入门
来源: https://blog.csdn.net/Yellow_Star___/article/details/122555299

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有