ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ

2022-02-10 22:00:54  阅读:104  来源: 互联网

标签:String 队列 rabbitmq 交换机 RabbitMQ import 消息


Rabbitmq基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。


 

搭建Rabbitmq服务器:

rabbitmq安装

1.使用docker环境,下载rabbitmq:management镜像

有压缩包的直接使用即可

docker pull rabbitmq:management

将压缩包放入root目录下并进行导入镜像:

docker load -i rabbit-image.gz   #导入rabbit镜像

docker images   #查看 

2.关闭防火墙

systemctl stop firewalld
systemctl disable firewalld
 
# 重启 docker 系统服务
systemctl restart docker

3.配置管理员用户名和密码

mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

4.启动Rabbitmq

docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management

访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin

 

 Rabbitmq 六中工作模式 在idea中应用

添加依赖

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>

简单模式:只有一个消费者

生产者发送消息:

package rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		//创建连接工厂,并设置连接信息
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.141");
		f.setPort(5672);//可选,5672是默认端口
		f.setUsername("admin");
		f.setPassword("admin");

		/*
		 * 与rabbitmq服务器建立连接,
		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
		 * 并开辟多个信道与客户端通信
		 * 以减轻服务器端建立连接的开销
		 */
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();

		/*
		 * 声明队列,会在rabbitmq中创建一个队列
		 * 如果已经创建过该队列,就不能再使用其他参数来创建
		 * 
		 * 参数含义:
		 *   -queue: 队列名称
		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
		 *   -exclusive: 排他,true表示限制仅当前连接可用
		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
		 *   -arguments: 其他参数
		 */
		ch.queueDeclare("helloworld", false,false,false,null);

		/*
		 * 发布消息
		 * 这里把消息向默认交换机发送.
		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
		 * 
		 * 参数含义:
		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
		 * 	-props: 其他参数,例如头信息
		 * 	-body: 消息内容byte[]数组
		 */
		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());

		System.out.println("消息已发送");
		c.close();
	}
}

消费者接收消息:

 

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		//连接工厂
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.141");
		f.setUsername("admin");
		f.setPassword("admin");
		//建立连接
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();
		//声明队列,如果该队列已经创建过,则不会重复创建
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume("helloworld", true, callback, cancel);
	}
}

FANOUT群发模式

生产者发送消息时会发送给每一个接收者

生产者:

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道
        //创建Fanout交换机: logs为消息队列的名字   发布和订阅模式 群发机制
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        while (true){
            System.out.println("输入传递的信息:");
            String s = new Scanner(System.in).nextLine();
            c.basicPublish("logs","",null,s.getBytes());
        }

    }
}

消费者:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //1.创建队列 2.创建交换机 3.进行消息队列的绑定
        String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
                    //队列名     非持久     独占     自动删除
        c.queueDeclare(queue,false,true,true,null);
        //创建交换机  生产者是什么消息模式就创造什么消息模式  fanout交换机
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //进行绑定  对fanout交换机来说 第三个参数是无效的
        c.queueBind(queue,"logs","");

        //正常接收消息 创建回调对象
        DeliverCallback deliverCallback =(consumerTag,message) -> {

            //此处处理消息
            String s = new String(message.getBody());
            System.out.println("收到:"+s);
        };
        CancelCallback cancelCallback =consumerTag -> {
        };
        //开始接受消息,把消息传递给一个回调对象进行处理
        c.basicConsume(queue,true,deliverCallback,cancelCallback);

    }

}

 路由模式  Direct

路由模式,设定关键词,会根据生产者发送的关键词进行接收消息。如果关键词不匹配是不会接收消息的。

生产者:

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //创建路由模式交换机 : Direct
        c.exchangeDeclare("direct_logs",BuiltinExchangeType.DIRECT);
        //向交换机发送消息,并携带路由关键词
        while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            System.out.println("输入路由键:");
            String k = new Scanner(System.in).nextLine();
            //对默认交换机“”,会自动使用队列名作为路由键
            c.basicPublish("direct_logs",k,null,s.getBytes());
        }


    }
}

 消费者:


import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //1.创建队列 2.创建交换机 3.进行消息队列的绑定
        String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
                    //队列名     非持久     独占     自动删除
        c.queueDeclare(queue,false,true,true,null);
        c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//创建路由交换机
        System.out.println("输入绑定的关键词,用空格隔开");
        String s = new Scanner(System.in).nextLine();
        String[] a = s.split("\\s+");  //  \s是空白格 + 指一到多个
        for (String k : a){
            c.queueBind(queue,"direct_logs",k);  //进行遍历绑定
        }
        //从队列接收消息
        DeliverCallback deliverCallback =(consumerTag, message) -> {

            //此处处理消息
            String msg = new String(message.getBody());
            String key = message.getEnvelope().getRoutingKey();//得到路由键
            System.out.println(key+"----"+msg);
        };
        CancelCallback cancelCallback =consumerTag -> {
        };
        //开始接受消息,把消息传递给一个回调对象进行处理
        c.basicConsume(queue,true,deliverCallback,cancelCallback);

    }
}

 主题模式:

发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

* 可以通配单个单词。
# 可以通配零个或多个单词

生产者:

public class Producer5 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //生产者创建交换机   消费者创建队列
        //连接

        //连接
        ConnectionFactory f = new ConnectionFactory();//连接工厂
        f.setHost("192.168.64.129"); //进行连接对应服务的ip
        f.setPort(5672);            //访问的消息服务端口号
        f.setUsername("admin");     //账号
        f.setPassword("admin");    //密码
        Connection con = f.newConnection();
        Channel c = con.createChannel();//通信通道

        //创建Topic交换机 :topic_logs    会自动使用队列作为关键词
        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //向交换机发送消息,并携带路由关键词
        while (true) {
            System.out.println("输入信息:");
            String s = new Scanner(System.in).nextLine();
            System.out.println("输入路由键:");
            String k = new Scanner(System.in).nextLine();
            //对默认交换机“”,会自动使用队列名作为路由键
            c.basicPublish("topic_logs", k, null, s.getBytes());

        }

    }
}

消费者:

public class Consumer5 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //连接
        ConnectionFactory f = new ConnectionFactory();//连接工厂
        f.setHost("192.168.64.141"); //进行连接对应服务的ip
        f.setPort(5672);            //访问的消息服务端口号
        f.setUsername("admin");     //账号
        f.setPassword("admin");    //密码
        Connection con = f.newConnection();
        Channel c = con.createChannel();//通信通道

        //1.创建随机队列  2.创建交换机 3.使用绑定建关键词绑定
        String queue = UUID.randomUUID().toString();
        //非持久,独占,自动删除
        c.queueDeclare(queue, false, true, true, null);
        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//创建交换机
        System.out.println("输入绑定建关键词,用空格隔开:");
        String s = new Scanner(System.in).nextLine();
        String[] a = s.split("\\s+");  //   \s是空白字符  + 指一到多个
        for (String k : a) {
            c.queueBind(queue, "topic_logs", k);  //进行循环遍历绑定
        }

        //正常接收消息
        //正常从队列接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //此处处理消息
            String msg = new String(message.getBody());
            String key = message.getEnvelope().getRoutingKey();//得到路由键
            System.out.println(key + "---" + msg);
        };
        CancelCallback cancelCallback = consumerTag -> {
        };
        c.basicConsume(queue, true, deliverCallback, cancelCallback);

    }
}

 队列的持久化,消息持久化

将创建的队列第二个false改成true即可变成持久操作。

已经创建的队列参数是不能修改的,因为已经上传到服务器中了,可以创建一个新的队列再进行修改后面的参数。生产者与消费者必须一直。

标签:String,队列,rabbitmq,交换机,RabbitMQ,import,消息
来源: https://blog.csdn.net/Java_Mr_Jin/article/details/122867486

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有