ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

springboot-rabbitmq

2021-05-03 19:02:23  阅读:216  来源: 互联网

标签:fanout springboot args rabbitmq rabbitmqctl 交换机 rabbit public


提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

springboot-rabbitmq


使用步骤

1.引入依赖

核心依赖(示例):

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置yml

代码如下(示例):

server:
  port: 9001
spring:
  rabbitmq:
    host: 192.168.249.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /

3.声明交换机,队列,并绑定关系

第一种:通过config(示例):

@Configuration
public class RabbitmqConfig {
    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout-order-exchange",true,false);
    }
    //2.声明队列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue", true);
    }
    //3.完成绑定关系
    @Bean
    public Binding  smsBinding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding  emailBinding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding  duanxinBinding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }
}

第二种:通过注解绑定关系
注意:这里只是绑定关系,并没有声明交换机,队列(推荐使用config)

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.fanout.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "fanout-order-exchange",type = ExchangeTypes.FANOUT),
        key = ""
))
@Service
public class FanoutEmailConsumer {
    @RabbitHandler
    public void getMessage(String message){
        System.out.println("email fanout----接收到的订单信息是:"+message);
    }
}

4.业务类(provider=“发送消息”)

代码如下(示例):

@Service
public class OrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(){
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId=UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        //3.通过mq来完成消息的分发
        //参数1.交换机 参数2.路由key/队列名 参数3.消息内容
        String exchangeName = "fanout-order-exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}

5.业务类(consumer=“消费消息”)

代码如下(示例):

@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class FanoutEmailConsumer {
    @RabbitHandler
    public void getMessage(String message){
        System.out.println("email fanout----接收到的订单信息是:"+message);
    }
}

高级特性

1.TTL(设置过期时间)

x-message-ttl(示例):

	@Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        return new Queue("email.fanout.queue", true,false,false,args);
    }

设置单条消息过期时间(示例):

@Service
public class OrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(){
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId=UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        //3.通过mq来完成消息的分发
        //参数1.交换机 参数2.路由key/队列名 参数3.消息内容
        String exchangeName = "fanout-order-exchange";
        String routingKey = "";
        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
    }
}

TTL队列消息过期会被转移到死信队列中
而单独设置的过期消息则会被直接移除

2.DLX(死信队列)

x-dead-letter-exchange(死信交换机)
x-dead-letter-routing-key(死信routingkey)(示例):

    @Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
        args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
        return new Queue("email.fanout.queue", true,false,false,args);
    }

3.LIM(最大队列长度)

x-max-length(示例):

    @Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        args.put("x-max-length", 5);
        args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
        args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
        return new Queue("email.fanout.queue", true,false,false,args);
    }

死信交换机其实也是一个普通的交换机,类型可以按需设置
当当前交换机消息过期时,会被转入死信交换机,并发送给死信队列

单机集群搭建

搭建集群(示例):

#启动第一个节点rabbit-1
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
#启动第一个节点rabbit-2
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
#停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
#启动应用
sudo rabbitmqctl -n rabbit-1 start_app
#重复步骤
sudo rabbitmqctl -n rabbit-2 stop_app
sudo rabbitmqctl -n rabbit-2 reset
#将rabbit-2作为从节点连接rabbit-1	@后面为服务器的主机名[root@localhost ~]
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@localhost
#启动rabbit-2
sudo rabbitmqctl -n rabbit-2 start_app
#查看集群信息
sudo rabbitmqctl cluster_status -n rabbit-1

开启web监控(示例):

#开启web查件
rabbitmq-plugins enable rabbitmq_management
#创建用户
rabbitmqctl -n rabbit-1 add_user admin admin
#赋予身份
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
#赋予权限
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
#重复rabbit-2(这里显示已经存在,可能集群user共通的吧)
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"

总结

无论发送消息还收消费消息,都需要连接rabbitmq
声明交换机,队列,绑定关系,在provider或者consumer都是可以的
这里的示例是fanout模式 FanoutExchange
direct模式则是 DirectExchange(绑定队列得加上路由key)
示例: BindingBuilder.bind(duanxinQueue()).to(directExchange()).with(“duanxin”);

标签:fanout,springboot,args,rabbitmq,rabbitmqctl,交换机,rabbit,public
来源: https://blog.csdn.net/qq_51138261/article/details/116354134

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有