ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rabbitmq-发布订阅模式

2021-03-06 22:57:49  阅读:185  来源: 互联网

标签:订阅 String 队列 模式 FANOUT rabbitmq 交换机 channel 消息


【README】

本文po出 mq的发布订阅模式,及代码示例;

 

【1】intro

1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;

2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;

5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;

6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;

【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)

生产者


/**
 * 发布订阅模式生产者
 * 本文发布订阅模式使用的交换机类型为广播 fanout 
 * @author tang rong 
 */
public class PSProduer {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_1 = "fanout_queue_1";
	/** 队列名称1 */
	static String FANOUT_QUEUE_2 = "fanout_queue_2";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接
		Channel channel = conn.createChannel();  // 创建频道
		/**
		 * 声明交换机
		 * 参数1-交换机名称 
		 * 参数2-交换机类型(fanout, topic, direct, headers)
		 */
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
		/**
		 * 创建队列
		 * @param1 队列名称
		 * @param2  是否持久化队列
		 * @param3 是否独占本次连接 
		 * @param4 是否在不使用的时候自动删除队列 
		 * @param5 队列其他参数  
		 */ 
		channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
		channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
		/**
		 * 队列绑定交换机 
		 */
		channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, 	"");
		channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, 	"");
		/**
		 * 发送消息 
		 */
		long temp = 1; 
		for (int i = 0; i < 1000; i++) { 
			String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();
			/**
			 * 参数1 交换机名称,没有指定则使用默认交换机 Default change 
			 * 参数2 路由key,简单模式可以传递队列名称 
			 * 参数3 消息其他属性 
			 * 参数4 消息内容 
			 */
			channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); 
			System.out.println("生产者发送消息" + msg);  
		}  
		System.out.println("=== 生产者消息发送完成");
		/* 关闭资源 */
		channel.close();
		conn.close(); 
	}
}

消费者1


/**
 * 发布订阅模式消费者1
 * @author tang rong 
 */
public class PSConsumer1 {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_1 = "fanout_queue_1";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接 
		Channel channel = conn.createChannel();  // 创建队列 
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
		/**
		 * 创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
		channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
		/**
		 * 队列绑定交换机
		 */
		channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/**
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者1 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者1 end ===\n"); 
			} 
		};
		/**
		 * 监听消息
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(FANOUT_QUEUE_1, true, consumer); 
	}
	
}

消费者2

/**
 * 发布订阅模式消费者
 * @author tang rong 
 */
public class PSConsumer2 {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_2 = "fanout_queue_2";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接 
		Channel channel = conn.createChannel();  // 创建队列 
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
		/**
		 * 创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
		channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
		/**
		 * 队列绑定交换机
		 */
		channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/**
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者2 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者2 end ===\n"); 
			} 
		};
		/**
		 * 监听消息
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(FANOUT_QUEUE_2, true, consumer); 
	}
	
}

 

【3】小结

1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机),  发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;

2)默认交换机

AMQP default

 

 

 

 

 

 

 

 

标签:订阅,String,队列,模式,FANOUT,rabbitmq,交换机,channel,消息
来源: https://blog.csdn.net/PacosonSWJTU/article/details/114461446

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有