ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ-消息队列的收发模式(二)

2022-07-04 00:00:29  阅读:183  来源: 互联网

标签:String 队列 RabbitMQ 收发 IOException import public channel NAME


为了测试方便代码复用这里封装了一个简单的连接mq的工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName RabbitMqUtil
 * @Author ZC
 * @Date 2022/7/2 21:21
 * @Version 1.0
 * @Description
 */
public class RabbitMqUtil {
    private final String host = "192.168.232.119";
    private final int port = 5672;
    private final String username = "test";
    private final String password = "test12";
    private final String virtualHost = "test_mq";

    /**
     * 连接RabbitMq
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public Connection getConnection() throws IOException, TimeoutException {
        //连接Mq服务器、主机、端口、用户名、密码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        //设置虚拟主机
        factory.setVirtualHost(virtualHost);
        Connection connection = factory.newConnection();
        return connection;
    }
}

1、简单队列模式

默认 direct 类型交换机,队列名充当路由键。

默认模式-------------消费者间交替接收消息

  • 弊端:

    无法做到性能好的消费者多消费、性能差点的少消费(资源的浪费)

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName SimpleProvider
 * @Author ZC
 * @Date 2022/7/2 21:33
 * @Version 1.0
 * @Description  消息提供者
 */
public class SimpleProvider {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqUtil rabbitMqUtil = new RabbitMqUtil();
        //1、连接mq服务
        Connection connection = rabbitMqUtil.getConnection();
        //2、通过连接对象获得一个连接通道
        Channel channel = connection.createChannel();
        //3、声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments)
         * queue: 队列名称
         * durable: 是否持久化(即服务重启时是否还存在)
         * exclusive: 是否独占(即当前队列是否只被这一个队列消费)
         * autoDelete: 是否自动删除(即当该队列没有被连接使用后是否删除)
         * arguments: 队列其他参数的设置
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4、发送信息到mq
        String message  = "hello rabbitMq";
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机
         * routingKey: 如果交换机是默认交换机那么这个相当于队列名称
         * props:消息相关参数信息
         * body: 具体消息
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("send: message<"+message+">发送成功");
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName SimpleConsumer
 * @Author ZC
 * @Date 2022/7/2 22:02
 * @Version 1.0
 * @Description
 */
public class SimpleConsumer {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连接
        Connection connection = new RabbitMqUtil().getConnection();
        //2、获取一个连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //3、获取队列中的消息
        //3。1、处理消息-回调之后使用
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //监听队列
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         *
         * queue: 队列名称
         * autoAck:
         * deliverCallback: 消息传递时回调
         * cancelCallback: 消费者取消时回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

        //非lambada表达式接收消息
//        Consumer consumer = new DefaultConsumer(channel){
//            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String message = new String(body,"utf-8");
//                System.out.println(" [x] Received '" + message + "'");
//            }
//        };
//        //监听队列
//        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

2、work-模式

同一队列不同消费者的消息分发机制。

解决简单队列模式中无法实现能者多劳的问题

简单队列和work 模式的不同:

  • 简单队列只要消息从队列中获取,无论消费者获取到消息后是否成功消费,比如遇到状况:断电,都认为是消息已经成功消费;

  • work模式消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者反馈,如果消费这一直没有反馈,则该消息一直处于不可用状态。

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkProvider
 * @Author ZC
 * @Date 2022/7/3 10:11
 * @Version 1.0
 * @Description
 */
public class WorkProvider {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        for (int i=0;i<30;i++){
            String message = "hellwork"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            Thread.sleep(1000);
            System.out.println("send ["+(i+1)+"]:"+message);
        }
    }
}

消费者

消费者1,接收一次消息休息1s

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.sql.SQLOutput;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkConsumer_1
 * @Author ZC
 * @Date 2022/7/3 10:24
 * @Version 1.0
 * @Description
 */
public class WorkConsumer_1 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //工作模式的话,设置一直只接收一条,接收后手动回复
        channel.basicQos(1);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String message = new String(body,"utf-8");
                System.out.println("消费者(1)接收到的消息:"+message);
                //接收消息完成后需要手动回复
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听消息队列-手动回复时候关闭自动回复
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2,接收消息后休息2s

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkConsumer_2
 * @Author ZC
 * @Date 2022/7/3 10:38
 * @Version 1.0
 * @Description
 */
public class WorkConsumer_2 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);//跟简单模式的区别
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String message = new String(body,"utf-8");
                System.out.println("消费者(2)接收到的消息:"+message);
                //手动回复
                channel.basicAck(envelope.getDeliveryTag(),false);//跟简单模式的区别
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,false,consumer); //跟简单模式的区别
     }
}

消费者1比消费者2性能高,所以消费者处理消息快

3、发布订阅模式[*]

同一个交换机绑定多个队列,实现广播消息。

交换机类型为:fanout

一个消息同时发送给多个消费者消费,

消息发送到交换机(变换机中绑定了多个队列)服务消费者绑定了队列,

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Provider
 * @Author ZC
 * @Date 2022/7/3 16:17
 * @Version 1.0
 * @Description 发布订阅模式下的
 */
public class Provider {
    private final static String EXCHANGE_NAME="test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //发布订阅模式,消费者的消息发送到交换机上,routingkey为空
        //声明交换机,RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。
        /**
         * 路由模式使用: direct
         * 通配符模式使用: topic
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //发送消息
        for(int i=0;i<5;i++){
            String message = "测试订阅/发布模式"+i;
            System.out.println("send [x]"+message);
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        }
    }
}

消费者

消费者1

import com.rabbitmq.client.*;
import com.sun.javaws.jnl.RContentDesc;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer1
 * @Author ZC
 * @Date 2022/7/3 16:35
 * @Version 1.0
 * @Description
 */
public class Consumer1 {
    private final static String QUEUE_NAME = "ex_queue_1";
    private final static String EXCHANGE_NAME="test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer2
 * @Author ZC
 * @Date 2022/7/3 16:45
 * @Version 1.0
 * @Description
 */
public class Consumer2 {
    private final static String QUEUE_NAME="ex_queue_2";
    private final static String EXCHANGE_NAME = "test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

4、路由模式

交换机根据分发规则将消息分发到不同的队列。

交换机类型为:direct

发布订阅模式下:提供者发送消息时的routingkey为“ ”,消费者队列绑定交换机时也没有指定routingkey,那么绑定在交换机上的所有队列都将消费消息

ps: 路由模式下,指定routingkey ,消费者消费绑定的队列与发布时绑定的routingkey对应时才能消费消息

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import jdk.nashorn.internal.ir.CallNode;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName RoutingProvider
 * @Author ZC
 * @Date 2022/7/3 17:31
 * @Version 1.0
 * @Description
 */
public class RoutingProvider {
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //发送消息
        for(int i = 0;i<5;i++){
            String message = "测试routing模式"+i;
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
            System.out.println("send [x]:"+message);
        }
    }
}

消费者

消费者1 routingkey = rk,根据routingkey指定绑定交换机中的队列

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Routingconsumer1
 * @Author ZC
 * @Date 2022/7/3 17:32
 * @Version 1.0
 * @Description
 */
public class Routingconsumer1 {
    private final static String QUEUE_NAME = "rk";
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2 routingkey=rk2,

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Routingconsumer2
 * @Author ZC
 * @Date 2022/7/3 17:32
 * @Version 1.0
 * @Description
 */
public class Routingconsumer2 {
    private final static String QUEUE_NAME = "rk1";
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2接收到的消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

5、通配符模式[主题模式]

交换机可以根据模糊的分发规则将消息分发到不同的队列。

交换机类型为: topic

消费者routingKey中使用*或#,实现交换机跟队列中的模糊匹配

实例:test.add、test.del、test.other.other。

  • *单层匹配

    • test.*,只能匹配到test.add、test.del
  • 多成匹配

    • test.# 能匹配所有以test开头的

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName provider
 * @Author ZC
 * @Date 2022/7/3 21:02
 * @Version 1.0
 * @Description
 */
public class Provider {
    private final static String EXCHANGE_NAME="ex_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String message = "测试通配符模式";
        channel.basicPublish(EXCHANGE_NAME,"test.topic",null,message.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"test.routing",null,message.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"test.other.other",null,message.getBytes());
        System.out.println("send[x]:"+message);
    }
}

消费者

消费者1,*匹配

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer1
 * @Author ZC
 * @Date 2022/7/3 21:13
 * @Version 1.0
 * @Description
 */
public class Consumer1{
    private final static String EXCHANGE_NAME="ex_topic";
    private final static String QUEUE_NAME="queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.*"); //*匹配
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2,#匹配

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer2
 * @Author ZC
 * @Date 2022/7/3 21:13
 * @Version 1.0
 * @Description
 */
public class Consumer2 {
    private final static String EXCHANGE_NAME="ex_topic";
    private final static String QUEUE_NAME="queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.#"); //#匹配
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

6、远程调用模式

客户端发送请求到请求队列,并设置临时的响应队列,服务端订阅请求队列,并发送响应到临时的响应队列。

7、发布确认模式

消息可靠发送。

总结

交换机类型

  • fanout 直连交换机:交换机忽略路由键,直接将消息分发到队列中。
  • direct 路由交换机:交换机通过路由键,将消息分发到不同的队列中。(默认交互机使用队列名作为路由键)
  • topic 主题交换机:交换机可以和队列绑定模糊的分发规则,以匹配不同的路由键。
  • headers 头部交换机:不依赖于路由键,而是绑定时指定一组键值对,并提取消息的键值对是否完全匹配绑定的键值对,如果完全匹配则会把消息路由到该队列中。

标签:String,队列,RabbitMQ,收发,IOException,import,public,channel,NAME
来源: https://www.cnblogs.com/zceng/p/16441364.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有