ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMq 的理论及应用示例(一)

2021-07-26 18:04:08  阅读:212  来源: 互联网

标签:示例 exchange RabbitMq rabbitmqctl springframework 应用 org import 消息


什么是RabbitMq

RabbitMQ是一个基于AMQP协议的开源的消息代理和队列服务器。
优点:

  • 采用Erlang语言进行开发作为底层语言实现:Erlang有着和原生Socket一样的延迟,所以性能非常高
  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美整合,API丰富
  • 集群模式比较丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性,可用性

AMOP 专有名词:

  • Server:又称为 Broker
  • Connection:连接,应用程序和broker之间的网络连接
  • Channel:网络信道,一个网络会话的任务
  • Message:消息
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchage和Queue,但同一个Virtual host里面不能有相同名称的Exchage和Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定队列
  • Binding:Exchage和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则
  • Queue:保存具体的消息的容器

RabbitMQ 消息的流转过程:
在这里插入图片描述

应用场景/作用

  • 异步缓冲
    有些业务可以进行异步的,只要做到最终一致性,不用强一致性,即可用MQ

  • 服务解耦

    • 强依赖:使用 dubbo 或 springCloud 进行服务的调用和连接都是强依赖。【比如注册、发现都需要依赖其他服务】

    • 弱依赖:MQ 中间件

      • 不代表弱依赖就可以失败
      • 如果不能失败就要保证上游的消息发布端数据投递的可靠性

      场景举例:用户下单后,订单需要更新库存

      强依赖下会出现的问题:
      1)假如库存系统无法访问,则订单减库存失败,从而导致订单生成失败
      2)订单模块和库存模块是强耦合的
      3)如果启用一个线程做离线操作,只是做了异步访问,访问只是提升速度,是否正 常调用成功是无法保证的

      通过弱依赖来解决以上问题:
      1)订单生产成功写入消息到消息队列(保证消息的可靠投递)
      2)库存系统通过订阅消息获取下单信息,库存系统根据下单信息进行库存操作
      3)如果库存系统出现异常,库存消费消息失败的情况下消息就重回队列了,等待下次发送

  • 削峰和填谷

    • 当我们下游服务处理不过来的时候,就可以将这些消息缓存在一个地方,逐步处理
    • 将短暂一段时间的业务积压在后面缓慢执行就是削峰和填谷的过程

思考

  1. 生产端的可靠性投递;
    • 如果消息和钱有关,这个消息一定不能丢失
    • 需要做到生产端100%投递,就需要和业务数据保证原子性
  2. 消费端的幂等;
    • 生产端如果要做到可靠性投递,可能会有重复投递
    • 消费端消费了两次或多次这个数据可能会不一致
    • 所以消费端一定要做到同一个请求消费多次得到的结果一样
  3. MQ 本身需要考虑
    • HA:高可用
    • 低延迟
    • 可靠性:确保数据是完整的
    • 堆积能力:这是MQ能扛下你的业务量级的保证
    • 扩展性:是否能够天然支持横向扩展无感知扩容

RabbitMq 集群架构原理解析

1)主备模式
master-slave结构,可以理解为热备份,master负责读写,master宕机后切换到slave

2)镜像模式
业界主流使用比较多的模式;
RabbitMQ集群非常经典的就是镜像模式,保证数据100%不丢失;
高可用、数据同步低延迟、奇数个节点。

缺点:
镜像队列集群的缺陷是无法进行很好的横向扩容,因为每个节点都是一个完整的互相复制的节点,并且镜像节点过多也会增加MQ的负担,一个数据写入就要复制到多个节点,吞吐量也会降低
在这里插入图片描述

RabbitMq 的安装和使用

RabbitMq-3.8.19安装详解 前面我已经介绍了一篇,再此省略。

修改用户登录和连接心跳

  • 将 loopback_users.guest = false,前面的注释去掉
  • 将 {heartbeat, 60} 修改为 {heartbeat, 10}

查看MQ端口是否启用:yum -y install lsof

  • lsof -i:5672

启动插件:

  • rabbitmq-plugins enable rabbitmq_management

查看管理后台是否启动

  • lsof -i:15672

常用命令

# 启动服务
systemctl start rabbitmq-server
# 或者
rabbitmq-server -detached
# 开启web管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户密码
rabbitmqctl add_user username password

# 修改用户密码
rabbitmqctl change_password username password

# 列出所有用户
rabbitmqctl list_users

# 删除用户
rabbitmqctl delete_user username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 创建虚拟主机
rabbitmqctl add_vhost vhostpath

# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName

# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作

springboot 整合 rabbitmq

发送

搭建一个 SpringBoot 工程及准备工作,我在另一篇博客写了,这里不再重复。SpringBoot全局异常处理、集成Swagger和参数必填校验 ,我们现在需要的准备工作,都在这篇准备了。

新增 application.yaml

server:
  port: 8088

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000

消息实体

package cn.com.springboot.vo;

import lombok.Data;

import java.io.Serializable;

@Data
public class OrderInfo implements Serializable {

    private String id;

    private String orderName;

    private String messageId;
}

发送类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String ORDER_EXCHANGE = "order_exchange";

    private static final String ORDER_ROUTING_KEY = "order_r_key";

    public void sendOrder(OrderInfo orderInfo){
        //correlationData:消息唯一id
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(orderInfo.getMessageId());

        //String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData
        rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);
    }
}

rabbitmq 准备工作

创建 exchange
在这里插入图片描述
Exchange的相关属性说明

  • Name:exchange的名称

  • Type类型说明

    • direct:exchange在和queue进行binding时会设置routingkey,只有routingkey完全相同,exchange才会将消息转发到对应的queue上,相当于点对点

    • fanout:直接将消息路由到所有绑定的队列中,无需对消息的routingkey进行匹配操作,因为不绑定routingkey,所有也是消息转发最快的(广播方式)

    • topic:此类型的exchange和direct差不多,但direct类型要求routingkey完全相同,而topic可以使用通配符:‘*’,‘#’

      其中‘*’表示匹配一个单词,‘#’则表示匹配没有或者多个单词

    • header:路由规则是根据header来判断

    • 总结:一般direct和topic用来具体的路由信息,如果用广播就使用fanout,header类型用的比较少

  • Durability:Durable是持久化到磁盘的意思

  • Auto Delete:如果设置为yes,那么最后一个绑定到exchange上的队列被删除后,exchange也会自动删除

  • Internal:如果为yes则表明该exchange是rabbitmq内部使用,不提供给外部系统应用,一般是在自己编写erlang语言做定制化扩展时使用

  • Arguments这个是扩展AMQP协议时自定义使用的内容

创建Queue
在这里插入图片描述
Exchange和Queue通过Binding关联,由routingkey进行路由
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试发送

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@Api("发送消息")
@RestController
public class RabbitMqController {
    @Autowired
    private OrderSender orderSender;

    @PostMapping("/sendOrder")
    public ResultVo sender(@RequestBody OrderInfo orderInfo){
        orderSender.sendOrder(orderInfo);
        return ResultVo.success();
    }
}

用 swagger 测试
在这里插入图片描述
消息成功发送到队列
在这里插入图片描述
注意:

  • 一个exchange可以绑定多个queue,只要routingkey一样,一个消息就会发送到多个queue上
  • exchange绑定一个queue,无论binding多少个routingkey,只要符合这个routingkey规则的消息都会发送到这个队列中,接收的时候无论从哪个routingkey过来的消息,连接这个队列的消费端都会消费掉,相当于多个消息规则对应一个队列

消息接收

创建另一个工程:consumer-and-producer,搭建步骤和上面的基本一样。
application.yaml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000
    listener:
      simple:
        concurrency: 5              # 初始化并发数
        max-concurrency: 10         # 最大并发数
        auto-startup: true          # 自动开启监听
        prefetch: 1                 # 每个连接同一时间最多处理几个消息,限流设置
        acknowledge-mode: manual    # 签收模式为手动签收

添加消费类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Log4j2
@Component
public class OrderReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order_queue", durable = "true"),
            exchange = @Exchange(value = "order_exchange", type = "topic"),//durable默认是true
            key = "order_r_key"//我的routingKey是order_r_key
    ))
    @RabbitHandler
    public void receiveOrderInfo(@Payload OrderInfo orderInfo,
                                 @Headers Map<String, Object> headers,
                                 Channel channel) throws IOException {
        log.info("开始消费");
        log.info("orderName:{}, messageId:{}", orderInfo.getOrderName(), orderInfo.getMessageId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);

        channel.basicAck(deliveryTag, false);
    }
}

启动测试
在这里插入图片描述

暂时分享到这,欢迎指正!

标签:示例,exchange,RabbitMq,rabbitmqctl,springframework,应用,org,import,消息
来源: https://blog.csdn.net/u011731053/article/details/118930159

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有