ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ入门篇

2021-11-12 20:34:37  阅读:178  来源: 互联网

标签:connectionFactory false 队列 RabbitMQ 入门篇 connection new channel


1 rabbitMQ入门教程-哔站-百知教育

在此感谢哔哩哔哩的up主:编程不良人,视频地址:https://www.bilibili.com/video/BV1dE411K7MG ,于2019-12-30学习后用Typora0.9.98整理的观后感。

1.1 MQ引言

1.1.1 MQ介绍

  • MQ(message quene):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断向队列中获取消息。因为消息的生产者和消费者都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松实现系统间解耦。

  • MQ种类:

    • ActiveMQ
    • Kafka
    • RocketMQ
    • RabbitMQ
  • RabbitMQ:基于AMQP协议,erlang语言开发的,是部署最广泛的开源消息中间件。

1.2 rabbitmq安装

1.3 管理命令行及管理界面初识

1.4 消息发布模式

1.4.1 第一种模型:直连

  1. 引入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    
  2. 开发生产者

    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接rabbitmq的主机
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    //设置连接哪个虚拟主机
    connectionFactory.setVirtualHost("/virHost1");
    //设置访问虚拟主机的用户名及密码
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("root");
    
    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    
    //获取连接中的通道
    Channel channel = connection.createChannel();
    
    //通道绑定对应的消息队列 消费者与生产者的特性要一致
    // argument1:队列名称(不存在会自动创建)
    // argument2:定义队列是否要持久化,仅仅是队列持久化,若想消息持久化还得额外设置(发布消息时) true-持久化
    // argument3:druable 是否独占队列,即通道独占 true-独占
    // argument4:autoDelete 是否在消费完成后自动删除队列 true-自动删除
    // argument5:额外附加参数
    channel.queueDeclare("hello", false, false, false, null);
    
    //发布消息
    // argument1:交换机名称
    // argument2:队列名称
    // argument3:传递消息的额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化
    // argument4:消息的具体内容)
    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello message".getBytes());
    
    channel.close();
    connection.close();
    
  3. 开发消费者

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/virHost1");
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("root");
    
    Connection connection = connectionFactory.newConnection();
    
    Channel channel = connection.createChannel();
    
    channel.queueDeclare("hello", false, false, false, null);
    
    //消费消息 (消费的队列名称, 开始消息的自动确认机制, 消费时的回调接口)
    channel.basicConsume("hello", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println( new String(body) );
        }
    });
    
    /* 不关闭则可以一直监听队列
    channel.close();
    connection.close();
    */
    
  4. 总结:消费者不能用@Test注解和关闭通道连接,否则看不到执行结果,因为多线程问题未等到处理便被关闭了

1.4.2 第二种模型:work queue 任务模型

WorkQueue任务模型,当消息比较耗时,生产消息的速度大于消费者速度,就会导致堆积越来越多无法处理。此时就可以使用该模型:让多个消费者绑定到一个队列,共同消费队列中的消息。

1.4.2.1 work模型之平均消费消息-轮询

  1. rabbitmq中连接工具类封装
package messagequeue.rabbitmq.baizhiedu;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitmqUtils {
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/virHost1");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
    }

    //定义提供连接对象的方法
    public static Connection getConnection() {
        try{
            return connectionFactory.newConnection();
        }catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接
    public static void closeConnectionCannel(Channel channel, Connection connection) {
        try {
            if (channel!=null) channel.close();
            if (connection!=null) connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
  1. 开发生产者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for(int i=0; i<100; i++) {
    channel.basicPublish("", "work", null, ("workMessage"+i).getBytes());
}
RabbitmqUtils.closeConnectionCannel(channel, connection);
  1. 开发两个消费者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1:"+ new String(body));
    }
});
  1. 执行结果:如图示消费者之间采用轮询方式消费,

image-20210127223251649

1.4.2.2 消息确定机制和能者多劳实现

  1. 修改上述中消费者1将其降速
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1:"+ new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
  1. 执行结果如图所示:默认情况下RabbitMQ每个消费者会收到相同数量的消息,消费慢的会慢慢消费

image-20210127224420920

  1. 消息自动确认机制:

channel.basicConsume方法中自动确认autoAck=true时,不关心业务到底是否处理完(宕机即丢失),拿到即告诉队列消费完,队列即删出这些队列,

Connection connection = RabbitmqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);                                                //通道每次只能消费一个消息
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel){   //消费消息时关闭自动确认
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2:"+ new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);          //argument1:手动确认消息标识,argument2:false为每次确认一个
    }
});

image-20210127232534832

标签:connectionFactory,false,队列,RabbitMQ,入门篇,connection,new,channel
来源: https://www.cnblogs.com/TQH-520/p/15546261.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有