标签:String 队列 RabbitMq 交换机 消息 public channel
MQ介绍和安装
基于AMQA协议 ,erlang语言开发,和Spring整合很好,数据一致性 (消息的丢失,错误处理)处理的很好,
生产者将消息发送到服务的的虚拟主机内的交换机 交换机将消息通过特定规则放入特定的消息队列 消息队列再将消息发送给消费者 (Kafka是消费者去消息队列去读取消息)
安装的话我自己就直接用docker安装 启动
geust/geust登录
rabbitmqctl 介绍
查看用户命令 查看插件 6中模型
“Hello Word 模型”
新建模块 新建一个虚拟主机 新建一个用户 新建的用户绑定虚拟主机
生产者生产消息
发送消息代码
package com.luyi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 卢意
* @create 2020-11-25 18:57
*/
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建连接mq的连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置链接rabbitmq主机
connectionFactory.setHost("192.168.216.138");
// 设置主机端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 通过连接获取连接中的通道对象
Channel channel = connection.createChannel();
// 通道绑定对应的消息队列
// 参数 队列名称(不存在的时候自动创建)
// 用来定义队列特征是要持久化
// 是否独占队列(true 就只能被当前连接使用)
// 是否在消费完成后自动删除队列
// 附加参数
channel.queueDeclare("hello",false,false,false,null);
//发布消息
// 参数 交换名称
// 对接名称
// 传递参数的额外设置
// 消息的具体内容
channel.basicPublish("","hello" , null,"hello rabbitmq".getBytes());
channel.close();
}
}
有消息进来
消费者消费消息
消费者类
package com.luyi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 卢意
* @create 2020-11-25 19:15
*/
public class Consumer {
// 消费消息
@Test
public void testGetMessage() throws IOException, TimeoutException {
// 创建连接mq的连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置链接rabbitmq主机
connectionFactory.setHost("192.168.216.138");
// 设置主机端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 通过连接获取连接中的通道对象
Channel channel = connection.createChannel();
// 通道绑定对应的消息队列
// 参数 队列名称(不存在的时候自动创建)
// 用来定义队列特征是要持久化
// 是否独占队列(true 就只能被当前连接使用)
// 是否在消费完成后自动删除队列
// 附加参数
channel.queueDeclare("hello",false,false,false,null);
// 消费消息
// 参数 消费哪个对列的消息 队列名称
// 开启消息的自动确认机制
// 消费时的回调接口
DefaultConsumer defaultConsumer = new DefaultConsumer(channel);
channel.basicConsume("hello", true,new DefaultConsumer(channel));
channel.close();
connection.close();
}
}
消费了 consumer里不关闭通道和连接的话 会一直消费
RabbitMQ简介工具类封装
package com.luyi.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author 卢意
* @create 2020-11-25 19:56
*/
public class RabbitMQUtil {
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("192.168.216.138");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("ems");
factory.setPassword("ems");
}
/**
* 获取RabbitMQ的连接
* @return 连接对象
*/
public static Connection getConnection(){
try {
Connection connection = factory.newConnection();
return connection;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 关闭连接 注意:一般消费者不用关闭连接 只用于生产者关闭连接
* @param channel
* @param connection
*/
public static void closeConnection(Channel channel, Connection connection){
try {
if (channel != null){
channel.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
RabbitMQ的API细节参数
不持久化时则在rabbitmq重启的时候就会删除队列
设置为true这里会变成D 但是只是队列持久化 消息还是会被删除 设置队列的消息也持久化 当然消息持久化了 队列必须设置为持久化 设置队列自动删除 消费者不在占用队列时 队列自动删除 (消费者和生成者队列的配置要一样)
"Worker"模型
平均消费消息
生产者
package com.luyi.workerqueue;
import com.luyi.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.cache.annotation.Cacheable;
import java.io.IOException;
/**
* @author 卢意
* @create 2020-11-25 19:53
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++){
// 生成消息
channel.basicPublish("", "work", null, (i+" hello Worker").getBytes());
}
// 关闭资源
RabbitMQUtil.closeConnection(channel, connection);
}
}
运行发送 消费者1
package com.luyi.workerqueue;
import com.luyi.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.springframework.http.StreamingHttpOutputMessage;
import java.io.IOException;
/**
* @author 卢意
* @create 2020-11-25 20:08
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, IOException {
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只给一个消息处理 防止 消息丢失
channel.queueDeclare("work", true, false, false, null);
//第二个参数 向rabbitmq自动确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1: " + new String(body));
//手动确认 发送ack 代表当前消息消费结束 可以去消费下一个消息
//参数二 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
消费者2
package com.luyi.workerqueue;
import com.luyi.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author 卢意
* @create 2020-11-25 20:08
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, IOException {
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2: " + new String(body));
}
});
}
}
1 和 2 都run起来 再发布消息 发现是轮循
消息确认机制和能者多劳实现
在平均消费模式中,消费者只要从队列中拿到消息,就立刻发送确认机制,有可能在处理消息的时候就突然宕机了或者出现意外了,这样消息还没来得及消费就遗失了,就造成业务数据的丢失。
另外,也有可能两个消费者处理消息的效率不一样,就有可能造成一个消费者已经消费完消息然后闲着,而另外一个消费者拿到了消息,却一直处于处理消息的状态,造成资源的浪费。
当前是自动确认消息接收 收到消息就当做消费完成 但是业务不一定结束了 还没执行完所有消息的业务就出事宕机了 就会导致消息丢失 所以将当前两个消费者的该参数设为false 且设置 消费者一次只能消费一个消息 模拟消费者1消费时间比较长 启动 1 2 和生产者
FanOut(扇出 广播 )模型的使用
生产者
package com.luyi.fanout;
import com.luyi.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author 卢意
* @create 2020-11-25 20:54
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 将通道声明指定交换机 参数1: 交换机的名称 参数2 交换机的类型
channel.exchangeDeclare("register", "fanout");
// 发送消息
channel.basicPublish("register", "", null, "fanout type message".getBytes());
// 关闭资源
RabbitMQUtil.closeConnection(channel, connection);
}
}
启动查看 自动帮忙创建 交换机 消费者1
package com.luyi.fanout;
import com.luyi.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author 卢意
* @create 2020-11-25 21:08
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 连接对象
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("register","fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "register", "");// 参数三 路由key 暂时在fanout模式没有作业
// 消费消息
channel.basicConsume(queue, true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+new String(body));
}
});
}
}
消费者 2 3 和1差不多 启动 1 2 3 生产者 都消费到了这条消息
"Direct"模型(路由)
生成者
package com.luyi.direct; import com.luyi.util.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; /** * @author 卢意 * @create 2020-11-25 21:31 */ public class Provider { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 将通道声明指定交换机 参数1: 交换机的名称 参数2 交换机的类型 channel.exchangeDeclare("logs_direct", "direct"); // 发送消息 String routingKey = "info"; // 路由key channel.basicPublish("logs_direct", routingKey, null, ("direct type message routingKey="+routingKey).getBytes()); // 关闭资源 RabbitMQUtil.closeConnection(channel, connection); } }
consumer1 值接收路由key为error的消息
package com.luyi.direct; import com.luyi.util.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * @author 卢意 * @create 2020-11-25 21:08 */ public class Consumer1 { public static void main(String[] args) throws IOException { // 连接对象 Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs_direct","direct"); // 临时队列 String queue = channel.queueDeclare().getQueue(); // 通过router key 绑定交换机和队列 channel.queueBind(queue, "logs_direct", "error");// 参数三 路由key // 消费消息 channel.basicConsume(queue, true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: "+new String(body)); } }); } }
consumer2 接收多个路由key的消息
package com.luyi.direct; import com.luyi.util.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * @author 卢意 * @create 2020-11-25 21:08 */ public class Consumer2 { public static void main(String[] args) throws IOException { // 连接对象 Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs_direct","direct"); // 临时队列 String queue = channel.queueDeclare().getQueue(); // 通过router key 绑定交换机和队列 channel.queueBind(queue, "logs_direct", "info");// 参数三 路由key channel.queueBind(queue, "logs_direct", "error");// 参数三 路由key channel.queueBind(queue, "logs_direct", "warning");// 参数三 路由key // 消费消息 channel.basicConsume(queue, true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: "+new String(body)); } }); } }
生产者先发送路由key为 info的消息 由2接收 生产者先发送路由key为 error的消息 1和2都可以接收
"topic"模型(可以叫做动态路由)
如果有大量的路由key 就会要写一堆绑定代码 不够灵活 生产者
package com.luyi.topic; import com.luyi.util.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; /** * @author 卢意 * @create 2020-11-25 21:59 */ public class Provider { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 将通道声明指定交换机 参数1: 交换机的名称 参数2 交换机的类型 channel.exchangeDeclare("logs_direct", "topic"); // 发送消息 String routingKey = "user.save"; // 路由key channel.basicPublish("topics", routingKey, null, ("topic type message routingKey="+routingKey).getBytes()); // 关闭资源 RabbitMQUtil.closeConnection(channel, connection); } }
消费者
package com.luyi.topic; import com.luyi.util.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * @author 卢意 * @create 2020-11-25 21:08 */ public class Consumer1 { public static void main(String[] args) throws IOException { // 连接对象 Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("topics","topic"); // 临时队列 String queue = channel.queueDeclare().getQueue(); // 通过动态通配符router key 绑定交换机和队列 channel.queueBind(queue, "topics", "user.*");// 参数三 路由key // 消费消息 channel.basicConsume(queue, true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: "+new String(body)); } }); } }
这样是没问题的
将生产者的路由key变为user.save,findAll 就无法消费到了 修改 消费者的通配符 router key 这就可以消费到了 user也可以消费到
1.概念
1.1名词.
1.2准备
以下皆为springboot版代码,安装过程不做详解
1.3Pom文件
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
1.4application.yml
spring: rabbitmq: host: 192.168.140.211 port: 5672 username: admin password: admin
2.各大模式下的生产者和消费者
1.简单模式
1.1队列声明
@Bean public Queue helloWorld(){ //非持久化 return new Queue("HelloWorld",false); }
PS:固定的队列最好现在RabbitMq Management创建,如果没有在客户端创建,springboot自动配置下也会自动创建队列并且创建对应的实例对象,但是有概率启动异常或者发生些迷之错误导致消息收不到,原因不明
1.2生产者
@Component public class Produdcer { @Autowired private AmqpTemplate amqpTemplate; //向指定队列发送 public void send(){ amqpTemplate.convertAndSend("HelloWorld","我真牛"); amqpTemplate.convertAndSend("HelloWorld",222); } }
这个生产者使用的是交换机为MQ的默认交换机 convertAndSend表示自动将发送的消息转为byte数组并发送 参数1为要发送的队列名,表示将消息发送到HelloWorld队列 参数2是要发送的消息,如果想发送对象,最好提前转化为json字符串或xml形式 字符串
1.3消费者
@RabbitListener(queues = "HelloWorld") @Component public class Listen { @RabbitHandler public void consumer(String msg){ System.out.println("打印字符:"+msg); } @RabbitHandler public void consumer1(Integer num){ System.out.println("打印数字:"+num); } }
@RabbitListener 表示该类为消费者 queues 指定监听的队列名 @RabbitHandler 指定消费接受到的消息的方式,根据形参的不同可以监听不同类型的消息,在上面生产者发送的两个消息的情况下,consumer会收到"我真牛",而consumer1会收到222的消息
1.4消费者的其他写法
@Component public class SimpleReceiver { @RabbitListener(queues = "helloworld") public void receive(String msg) { System.out.println("收到: "+msg); } }
或者
@RabbitListener(queuesToDeclare = @Queue(name = "helloworld",durable = "false"))
2.工作模式
工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。
2.1队列声明
队列持久化是指当没有消费者监听该队列时,保持该队列
@Bean public Queue helloWorld(){ //持久化队列 return new Queue("task_queue",true); }
2.2生产者
@Component public class Produdcer { @Autowired private AmqpTemplate amqpTemplate; //向指定队列发送 public void send(){ while (true){ System.out.println("输入消息"); String msg=new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("task_queue", msg); } } }
生产者发送消息到task_queue队列,且消息为持久化 pring boot封装的 rabbitmq api 中, 发送的消息默认是持久化消息. 如果希望发送非持久化消息, 需要在发送消息时做以下设置: 使用 MessagePostProcessor 前置处理器参数 从消息中获取消息的属性对象 在属性中把 DeliveryMode 设置为非持久化
//如果需要设置消息为非持久化,可以取得消息的属性对象,修改它的deliveryMode属性 t.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties props = message.getMessageProperties(); props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } });
2.3消费者
@Component public class Listen { @RabbitListener(queues = "task_queue") public void consumer(String msg) throws InterruptedException { System.out.println("消费者1,打印字符:"+msg); for (int i=0;i<msg.length();i++){ if('.'==(msg.charAt(i))){ Thread.sleep(1000); } } } @RabbitListener(queues = "task_queue") public void consumer2(String msg) throws InterruptedException { System.out.println("消费者2,打印字符:"+msg); for (int i=0;i<msg.length();i++){ if('.'==(msg.charAt(i))){ Thread.sleep(1000); } } } }
默认情况下,发送到task_queue队列的消息是以轮询机制的,二者不会同时消费
3 ack模式
在 spring boot 中提供了三种确认模式: NONE - 使用rabbitmq的自动确认 AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认) MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作 默认的 AUTO 模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送. 以下是在2.2上的修改,未提及则表示未修改
3.1 application.yml
spring: rabbitmq: listener: simple: # acknowledgeMode: NONE # rabbitmq的自动确认 # acknowledgeMode: AUTO # rabbitmq的手动确认, springboot会自动发送确认回执 (默认) acknowledgeMode: MANUAL # rabbitmq的手动确认, springboot不发送回执, 必须自己编码发送回执
3.2 消费者
@RabbitListener(queues="task_queue") public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { System.out.println("receiver1 - 收到: "+s); // 手动发送确认回执 c.basicAck(tag, false); }
其中: deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel c.basicAck(tag, false);其中的false表示只确认单条消息,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 //拒绝消息channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
3.3抓取数量
工作模式中, 为了合理地分发数据, 需要将 qos 设置成 1, 每次只接收一条消息, 处理完成后才接收下一条消息. spring boot 中是通过 prefetch 属性进行设置, 该属性的默认值是 250.
spring: rabbitmq: listener: simple: prefetch: 1 # qos=1, 默认250
4 发布和订阅模式
发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。
4.1 Bean声明(一)
通过@Configuration单独文件声明
@Configuration public class FanoutRabbitConfig { //创建一个fanout类型的交换机 @Bean public FanoutExchange fanout() { return new FanoutExchange("exchange.fanout"); } //消费队列一 @Bean public Queue fanoutQueue1() { return new AnonymousQueue(); } //消费队列二 @Bean public Queue fanoutQueue2() { return new AnonymousQueue(); } //绑定队列和交换机 @Bean public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanout); } @Bean public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanout); } //消费者声明 @Bean public FanoutReceiver fanoutReceiver() { return new FanoutReceiver(); } //生产者声明 @Bean public FanoutSender fanoutSender() { return new FanoutSender(); } }
4.2 生产者(一)
public class FanoutSender { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class); @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.fanout"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); ... String message = builder.toString(); template.convertAndSend(exchangeName, "", message); LOGGER.info(" [x] Sent '{}'", message); } }
4.3 消费者(一)
public class FanoutReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class); @RabbitListener(queues = "#{fanoutQueue1.name}") public void receive1(String in) { receive(in, 1); } @RabbitListener(queues = "#{fanoutQueue2.name}") public void receive2(String in) { receive(in, 2); } private void receive(String in, int receiver) { ... } }
4.4 Bean声明(二)
个人更习惯的写法:
@Bean public FanoutExchange logsExchange(){ return new FanoutExchange("logs",false,false); }
4.5 生产者(二)
@Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; //向指定队列发送 public void send(){ while (true){ System.out.println("输入消息"); String msg=new Scanner(System.in).nextLine(); //群发无路由键 rabbitTemplate.convertAndSend("logs","", msg); } } }
其中的空串为路由键,fanout模式下无效
4.6 消费者(二)
@Component public class Listen { @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机,所以declare 为false exchange = @Exchange(name = "logs",declare = "false")) ) public void consumer(String msg) throws InterruptedException { System.out.println("消费者1,打印字符:"+msg); } @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机 exchange = @Exchange(name = "logs",declare = "false")) ) public void consumer2(String msg) throws InterruptedException { System.out.println("消费者2,打印字符:"+msg); } }
@Queue ,服务器自动创建随机队列,非持久,独占,自动删除 非持久:服务器宕机后队列会消失 独占:其他消费者不能连接该队列 自动删除:无消费者时队列自动删除
5路由模式
路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。 PS:以(二)中的写法为主 与发布和订阅模式代码类似, 只是做以下三点调整: 1.使用 direct 交换机 2.队列和交换机绑定时, 设置绑定键 3.发送消息时, 指定路由键
5.1Bean声明
@Bean public DirectExchange directExchange(){ return new DirectExchange("direct_logs",false,false); }
5.2生产者
@Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; //向指定队列发送 public void send(){ while (true){ System.out.println("输入消息"); String msg=new Scanner(System.in).nextLine(); System.out.print("输入路由键:"); String key = new Scanner(System.in).nextLine(); rabbitTemplate.convertAndSend("direct_logs",key, msg); } } }
5.3消费者
@Component public class Listen { @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机 exchange = @Exchange( name = "direct_logs" ,declare = "false" ), //路由键 key = {"error"})) public void consumer(String msg) throws InterruptedException { System.out.println("消费者1,打印字符:"+msg); } @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机 exchange = @Exchange(name = "direct_logs",declare = "false"), //路由键 key = {"error","info","warning"}) ) public void consumer2(String msg) throws InterruptedException { System.out.println("消费者2,打印字符:"+msg); } }
6主题模式/通配符模式
通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。 PS: 主题模式不过是具有特殊规则的路由模式, 代码与路由模式基本相同, 只做如下调整: 1.使用 topic 交换机 2.使用特殊的绑定键和路由键规则
6.1Bean声明
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topic_logs",false,false); }
6.2 生产者
@Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; //向指定队列发送 public void send(){ while (true){ System.out.println("输入消息"); String msg=new Scanner(System.in).nextLine(); System.out.println("输入路由键:"); String key = new Scanner(System.in).nextLine(); //topic需要路由键 rabbitTemplate.convertAndSend("topic_logs",key, msg); } } }
6.3消费者
@Component public class Listen { @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机 exchange = @Exchange( name = "topic_logs" ,declare = "false" ), //路由键 key = {"*.orange.*"})) public void consumer(String msg) throws InterruptedException { System.out.println("消费者1,打印字符:"+msg); } @RabbitListener(bindings = @QueueBinding( //服务器自动创建随机队列,非持久,独占,自动删除 value = @Queue, //由main定义交换机 exchange = @Exchange(name = "topic_logs",declare = "false"), //路由键 key = {"*.*.rabbit","lazy.#"}) ) public void consumer2(String msg) throws InterruptedException { System.out.println("消费者2,打印字符:"+msg); } }
7Rpc模式
基于RabbitMq的RPC异步调用,由于楼主基本不用,贴下两个链接,看起来还行,未验证: https://blog.csdn.net/ws_kfxd/article/details/93753048 https://blog.csdn.net/weixin_38305440/article/details/104807062
8消息丢失的解决办法
一般情况下一旦抛异常直接try-catch重发即可,但是依旧偶尔不行,所以一下链接可解决,本文不做介绍 https://blog.csdn.net/ws_kfxd/article/details/93753048 https://blog.csdn.net/weixin_38380858/article/details/93227652
9延迟消息写法
rabbitMq有两者实现发送延迟消息的方式,多用于取消订单,验证码等用redis的过期时间设置即可
9.1 基于死信队列的延迟消息
以写法(一)为主,便于理解
9.1.1application.yml
rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /my # rabbitmq的虚拟host username: guest # rabbitmq的用户名 password: guest # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true
9.1.2消息队列枚举类
用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称
@Getter public enum QueueEnum { /** * 消息通知队列 */ QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"), /** * 消息通知ttl队列 */ QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl"); /** * 交换名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
编号A : “mall.order.direct” : 实际接收消息的交换机 编号B : “mall.order.cancel” : 实际接收消息的队列名 编号C : “mall.order.cancel” : 实际接收消息的路由键 编号D : “mall.order.direct.ttl” : 开始发送消息的交换机 编号E : “mall.order.cancel.ttl” : 开始发送消息的队列 编号F : “mall.order.cancel.ttl” : 开始发送消息的路由键 消息路径如下: 消息发送到 D交换机,交换机根据E路由键发送到F死信队列,当消息过期后,会发送到A交换机,再次根据路由键C发送到实际消费队列B从而取消订单
9.1.3Bean声明
@Configuration public class RabbitMqConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect(){ return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) //持久化,当这个交换机上没有注册队列时,这个交换机是否删除 .durable(true) .build(); } /** * 订单延迟队列队列所绑定的交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue orderQueue(){ return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue(){ return QueueBuilder //设置为持久化,并给定队列名 .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) //到期后转发的交换机 .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) //到期后转发的路由键 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) .build(); } /** * 将订单队列绑定到交换机 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
9.1.4 生产者
@Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
9.1.5 消费者
@Component @RabbitListener( queues = "mall.order.cancel" ) @Slf4j public class CancelOrderReceiver { @RabbitHandler public void handle(Long orderId){ log.info("receive delay message orderId:{}",orderId); } }
9.2基于插件实现的延迟消息
9.2.1 application.yml
rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /my # rabbitmq的虚拟host username: guest # rabbitmq的用户名 password: guest # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true
9.2.2 队列枚举类
@Getter public enum QueueEnum { /** * 消息通知队列 */ QUEUE_ORDER_PLUGIN_CANCEL("mall.order.plugin.direct","mall.order.plugin.cancel","mall.order.plugin.cancel"); /** * 交换名称 */ private String exchange; /** * 队列名 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
9.2.3 Bean声明
@Configuration public class RabbitMqConfig { /** * 订单延迟插件消息队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { //创建一个自定义交换机,可以发送延迟消息 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args); } /** * 订单延迟插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单延迟插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); } }
9.2.4生产者
@Component @Slf4j public class CancelOrderSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(Long orderId,final long delayTime){ //给延迟队列发送消息 rabbitTemplate.convertAndSend( QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange() , QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey() , orderId , new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay",delayTime); return message; } } ); log.info("send delay message : {}",orderId); } }
PS: 方式一中是message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); 方式二中是 message.getMessageProperties().setHeader(“x-delay”,delayTime); 容易混淆
9.2.5 消费者
@Component @RabbitListener( queues = "mall.order.plugin.cancel" ) @Slf4j public class CancelOrderReceiver { @RabbitHandler public void handle(Long orderId){ log.info("receive delay message orderId:{}",orderId); } } 1234567891011
消费者无差别
标签:String,队列,RabbitMq,交换机,消息,public,channel 来源: https://www.cnblogs.com/xmlkk/p/14455751.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。