标签:String rabbitmq client 入门篇 五种 RabbitMQ import com channel
目录
rabbitmq的官网中介绍的工作模式有七种,这里我们只介绍五种
我们这里简单介绍下前面五种:
导入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
工具类:
package com.cjian;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtils {
public static Connection getConnection() {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vhost_cjian");
connectionFactory.setUsername("cjian");
connectionFactory.setPassword("111111");
//创建连接
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
1.简单队列
一个生产者一个消费者
package com.cjian.rabbitmq.simple;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,如果没有则会去创建该队列
* durable:是否持久化,当mq重启后,数据还在
* exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列 一般设置为false
* autoDelete:当没有消费者时是否自动删除
* arguments:参数信息
*/
channel.queueDeclare("simpleQueueTest", true, false, false, null);
//发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称,简单模式下会使用默认的交换机
* routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
* props:配置名信息
* body:发送的消息的字节数组
*/
String msg = "Helle RabbitMq~";
channel.basicPublish("", "simpleQueueTest", null, msg.getBytes());
//释放资源
connection.close();
channel.close();
}
}
启动后管理台:
消费者:
package com.cjian.rabbitmq.simple;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simpleQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("simpleQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
输出:
consumerTag:amq.ctag-89i7XZ4piHud6uco4aCg9A
Exchange:
RoutingKey:simpleQueueTest
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:Helle RabbitMq~
2.工作队列
一个生产者,多个消费者
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,如果没有则会去创建该队列
* durable:是否持久化,当mq重启后,数据还在
* exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列 一般设置为false
* autoDelete:当没有消费者时是否自动删除
* arguments:参数信息
*/
channel.queueDeclare("WorkQueueTest", true, false, false, null);
//发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称,简单模式下会使用默认的交换机
* routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
* props:配置名信息
* body:发送的消息的字节数组
*/
for (int i = 0; i < 10; i++) {
String msg = "Helle RabbitMq~ "+i;
channel.basicPublish("", "WorkQueueTest", null, msg.getBytes());
}
//释放资源
connection.close();
channel.close();
}
}
生产者启动后:
消费者1:
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("WorkQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
System.out.println("消费者2睡眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("WorkQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
启动两个消费者:
输出:
由此可见,多个消费者消费消息的数目是一样的
3.交换机-fanout(pub/sub)模式
每个消费者都能获取到同样的消息
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
String queueName1 = "exchange_fanout_queue1";
String queueName2 = "exchange_fanout_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
channel.queueBind(queueName1,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");
//发送消息
String msg = "Hello rabbitmq~";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动生产者后:
消费者1:
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_fanout_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_fanout_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
输出:
4.交换机-direct模式
每个队列只能消费指定格式(routingkey)的消息
生产者发送一条 路由键为 info的消息
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//创建队列
String queueName1 = "exchange_direct_queue1";
String queueName2 = "exchange_direct_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
//消费路由键为error 的消息
channel.queueBind(queueName1,exchangeName,"error");
//消费路由键为info、error、waring 的消息
channel.queueBind(queueName2,exchangeName,"info");
channel.queueBind(queueName2,exchangeName,"error");
channel.queueBind(queueName2,exchangeName,"waring");
//发送消息
String msg = "Hello rabbitmq-info";
channel.basicPublish(exchangeName, "info", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动生产者后:
消费者1:
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_direct_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_direct_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
控制台输出:
发送一条error的:
控制台输出:
5.交换机-topic模式
类似于direct,匹配功能更加强大,具体细节可查看代码中的注释
消费者
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
String queueName1 = "exchange_topic_queue1";
String queueName2 = "exchange_topic_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
//#:匹配0或多个单词 ,*:匹配一个单词
channel.queueBind(queueName1,exchangeName,"#.error");
channel.queueBind(queueName2,exchangeName,"order.*");
channel.queueBind(queueName2,exchangeName,"*.*");
//发送消息
String msg = "Hello rabbitmq-order.error1";
channel.basicPublish(exchangeName, "order.error1", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动后:
消费者1: 消费以error结尾的消息
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_topic_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2: 消费order开头的或者任意两个单词的
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_topic_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
粘过来好看点:
控制台:
后面的就不一一验证了,重点也不在这,后面分析rabbitmq的一些高级特性
标签:String,rabbitmq,client,入门篇,五种,RabbitMQ,import,com,channel 来源: https://blog.csdn.net/cj_eryue/article/details/112850634
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。