ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

springboot整合RabbitMQ入门

2022-05-31 01:33:43  阅读:113  来源: 互联网

标签:springboot 队列 springframework public RabbitMQ org import annotation 入门


一、配置文件

1.1 pom文件

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

1.2 yaml配置文件

spring:
  rabbitmq:
    host: 192.168.200.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /

1.3 RabbitMQTemplateConfig文件

package springboot.rabbitmq.config;



import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/9 10:57
 */
@Configuration
@Slf4j
public class RabbitMQTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean
    public MessageConverter jackson2JsonMessageConverter(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }
}

1.4 消息实体

package springboot.rabbitmq.po;

import lombok.Data;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:08
 */
@Data
public class RabbitMqMessage {
    /**
     * rabbitmq工作模式
     */
    String type;
    /**
     * 消息内容
     */
    String content;
}

二、Rabbitmq实例

2.1 简单模式

2.1.1 声明队列

package springboot.rabbitmq.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:04
 */
@Configuration
@Slf4j
@Getter
public class SimpleQueueConfig {
    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getSimpleQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("simple.queue",true);
    }

}

2.1.2 生产者实例

package springboot.rabbitmq.producer.simple;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:06
 */
@Component
@Slf4j
public class SimpleProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendSimple() {
        RabbitMqMessage message = new RabbitMqMessage();
        message.setContent("hello 我是简单队列消息内容");
        message.setType("简单队列(Simple)");
        rabbitTemplate.convertAndSend( "simple.queue",message);
        return "简单队列发送成功";
    }
}

2.1.3 消费者实例

package springboot.rabbitmq.consumer.simple;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:25
 */
@Component
@Slf4j
public class SimpleConsumer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue"))
    public void receive(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("队列模式:"+rabbitMqMessage.getType());
        log.error("消息内容:"+rabbitMqMessage.getContent());
    }
}

2.1.4 测试结果

  • 编写控制类请求接口进行推送消息

    package springboot.rabbitmq.ctl;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import springboot.rabbitmq.producer.simple.SimpleProducer;
    
    /**
     * @author jbjia
     * @version V1.0
     * @title
     * @description TODO
     * @date 2022/5/30 16:11
     */
    
    @Slf4j
    @RestController
    @RequestMapping("/rabbitmq")
    public class RabbitMQProducerCtl {
        @Autowired
        SimpleProducer simpleProducer;
        @GetMapping("/send/simple")
        public String sendSimple(){
         return simpleProducer.sendSimple();
        }
    }
    
  • 接口请求结果

  • Mq监控界面

  • 消费结果

2.2 工作模式

2.2.1 声明队列

package springboot.rabbitmq.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:04
 */
@Configuration
@Slf4j
@Getter
public class WorkQueueConfig {
    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getWorkQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("work.queue",true);
    }

}

2.2.2 生产者实例

package springboot.rabbitmq.producer.work;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:06
 */
@Component
@Slf4j
public class WorkProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendWork() {

        for (int i = 0; i <10 ; i++) {
            RabbitMqMessage message = new RabbitMqMessage();
            message.setContent("hello 我是第"+i+"条工作队列消息内容");
            message.setType("工作队列(Work)");
            rabbitTemplate.convertAndSend( "work.queue",message);
        }
        return "工作队列发送成功";
    }
}

2.2.3 消费者实例

package springboot.rabbitmq.consumer.work;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:25
 */
@Component
@Slf4j
public class WorkConsumer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queuesToDeclare = @Queue(value = "work.queue"))
    public void receive(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("message1队列模式:"+rabbitMqMessage.getType());
        log.error("message1消息内容:"+rabbitMqMessage.getContent());
    }


    @RabbitListener(queuesToDeclare = @Queue(value = "work.queue"))
    public void receive1(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("message2队列模式:"+rabbitMqMessage.getType());
        log.error("message2消息内容:"+rabbitMqMessage.getContent());
    }
}

2.2.4 测试结果

  • 编写控制类请求接口进行推送消息

    package springboot.rabbitmq.ctl;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import springboot.rabbitmq.producer.simple.SimpleProducer;
    import springboot.rabbitmq.producer.work.WorkProducer;
    
    /**
     * @author jbjia
     * @version V1.0
     * @title
     * @description TODO
     * @date 2022/5/30 16:11
     */
    
    @Slf4j
    @RestController
    @RequestMapping("/rabbitmq")
    public class RabbitMQProducerCtl {
        @Autowired
        WorkProducer workProducer;
        @GetMapping("/send/work")
        public String sendWork(){
            return workProducer.sendWork();
        }
    }
    
    
  • 接口请求结果

  • MQ监控界面

  • 消费结果

2.3 订阅模式

2.3.1 声明队列

package springboot.rabbitmq.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:04
 */
@Configuration
@Slf4j
@Getter
public class FanoutQueueConfig {
    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getFanOutQueue1()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("fanout1.queue",true);
    }


    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getFanOutQueue2()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("fanout2.queue",true);
    }

    /**
     * FanoutExchange,持久化、非自动删除
     *
     * @return
     */
    @Bean
    public FanoutExchange getFanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }
    @Bean
    public Binding firstFanoutBinding() {
        return BindingBuilder.bind(getFanOutQueue1()).to(getFanoutExchange());
    }

    @Bean
    public Binding secondFanoutBinding() {
        return BindingBuilder.bind(getFanOutQueue2()).to(getFanoutExchange());
    }
}

2.3.2 生产者实例

package springboot.rabbitmq.producer.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:06
 */
@Component
@Slf4j
public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendFanout() {
        RabbitMqMessage message = new RabbitMqMessage();
        message.setContent("hello 我是订阅队列消息内容");
        message.setType("订阅队列(fanout广播模式)");
        rabbitTemplate.convertAndSend( "fanout.exchange","",message);
        return "订阅模式发送成功";
    }
}

2.3.3 消费者实例

package springboot.rabbitmq.consumer.work;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:25
 */
@Component
@Slf4j
public class WorkConsumer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queuesToDeclare = @Queue(value = "work.queue"))
    public void receive(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("message1队列模式:"+rabbitMqMessage.getType());
        log.error("message1消息内容:"+rabbitMqMessage.getContent());
    }


    @RabbitListener(queuesToDeclare = @Queue(value = "work.queue"))
    public void receive1(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("message2队列模式:"+rabbitMqMessage.getType());
        log.error("message2消息内容:"+rabbitMqMessage.getContent());
    }
}

2.3.4 测试结果

  • 编写控制类请求接口进行推送消息

    package springboot.rabbitmq.ctl;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import springboot.rabbitmq.producer.simple.SimpleProducer;
    import springboot.rabbitmq.producer.work.WorkProducer;
    
    /**
     * @author jbjia
     * @version V1.0
     * @title
     * @description TODO
     * @date 2022/5/30 16:11
     */
    
    @Slf4j
    @RestController
    @RequestMapping("/rabbitmq")
    public class RabbitMQProducerCtl {
        @Autowired
        WorkProducer workProducer;
        @GetMapping("/send/work")
        public String sendWork(){
            return workProducer.sendWork();
        }
    }
    
    
  • 接口请求结果

  • MQ监控界面

  • 消费结果

2.4 路由模式

2.4.1 声明队列

package springboot.rabbitmq.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:04
 */
@Configuration
@Slf4j
@Getter
public class DirectQueueConfig {
    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getDirectErrorQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("direct.error.queue",true);
    }


    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getDirectInfoQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("direct.info.queue",true);
    }

    /**
     * FanoutExchange,持久化、非自动删除
     *
     * @return
     */
    @Bean
    public DirectExchange getDirectExchange() {
        return new DirectExchange("direct.exchange");
    }

    /**
     * 绑定交换机与通配符及队列的关系
     * @return
     */
    @Bean
    public Binding firstDirectBinding() {
        return BindingBuilder.bind(getDirectErrorQueue()).to(getDirectExchange()).with("error_route");
    }

    /**
     * 绑定交换机与通配符及队列的关系
     * @return
     */
    @Bean
    public Binding secondDirectBinding() {
        return BindingBuilder.bind(getDirectInfoQueue()).to(getDirectExchange()).with("info_route");
    }
}

2.4.2 生产者实例

package springboot.rabbitmq.producer.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:06
 */
@Component
@Slf4j
public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendDirect() {


        RabbitMqMessage message = new RabbitMqMessage();
        message.setContent("hello 我是路由模式队列消息内容:error日志");
        message.setType("路由队列(Route)");
        rabbitTemplate.convertAndSend("direct.exchange","error_route", message);

        RabbitMqMessage infoMessage = new RabbitMqMessage();
        infoMessage.setContent("hello 我是路由模式队列消息内容:info日志");
        infoMessage.setType("路由队列(Route)");
        rabbitTemplate.convertAndSend("direct.exchange","info_route", infoMessage);

        RabbitMqMessage warnMessage = new RabbitMqMessage();
        warnMessage.setContent("hello 我是路由模式队列消息内容:warn日志");
        warnMessage.setType("路由队列(Route)");
        rabbitTemplate.convertAndSend("direct.exchange","warn_route", warnMessage);
        return "工作队列发送成功";
    }
}

2.4.3 消费者实例

package springboot.rabbitmq.consumer.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:25
 */
@Component
@Slf4j
public class DirectConsumer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.error.queue"), //创建临时队列
                    exchange = @Exchange(value = "direct.exchange", type = "direct"), //绑定交换机
                    key = "error_route"
            )
    })
    public void receive(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("路由键error队列模式:"+rabbitMqMessage.getType());
        log.error("路由键error消息内容:"+rabbitMqMessage.getContent());
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.info.queue"), //创建临时队列
                    exchange = @Exchange(value = "direct.exchange", type = "direct"), //绑定交换机
                    key = "info_route"
            )
    })
    public void receive1(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("路由键info队列模式:"+rabbitMqMessage.getType());
        log.error("路由键info消息内容:"+rabbitMqMessage.getContent());
    }
}

2.4.4 测试结果

  • 编写控制类请求接口进行推送消息

    package springboot.rabbitmq.ctl;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import springboot.rabbitmq.producer.direct.DirectProducer;
    import springboot.rabbitmq.producer.fanout.FanoutProducer;
    import springboot.rabbitmq.producer.simple.SimpleProducer;
    import springboot.rabbitmq.producer.work.WorkProducer;
    
    /**
     * @author jbjia
     * @version V1.0
     * @title
     * @description TODO
     * @date 2022/5/30 16:11
     */
    
    @Slf4j
    @RestController
    @RequestMapping("/rabbitmq")
    public class RabbitMQProducerCtl {
        @Autowired
        DirectProducer directProducer;
        @GetMapping("/send/direct")
        public String sendDirect(){
            return directProducer.sendDirect();
        }
    }
    
    
  • 接口请求结果

  • MQ监控界面

  • 消费结果

2.5 主题模式

2.5.1 声明队列

package springboot.rabbitmq.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:04
 */
@Configuration
@Slf4j
@Getter
public class TopicQueueConfig {
    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getTopicErrorQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("topic.error.queue",true);
    }


    /**
     * 声明简单队列
     * @return 队列
     */
    @Bean
    public Queue getTopicInfoQueue()
    {
        // 声明队列参数列表:new Queue(String name, boolean durable, boolean exclusive, boolean autoDelete);
        // name 队列名称
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("topic.info.queue",true);
    }

    /**
     * FanoutExchange,持久化、非自动删除
     *
     * @return
     */
    @Bean
    public TopicExchange getTopicExchange() {
        return new TopicExchange("topic.exchange");
    }

    /**
     * 绑定交换机与通配符及队列的关系
     * @return
     */
    @Bean
    public Binding firstTopicBinding() {
        return BindingBuilder.bind(getTopicInfoQueue()).to(getTopicExchange()).with("topic.*.route");
    }

    /**
     * 绑定交换机与通配符及队列的关系
     * @return
     */
    @Bean
    public Binding secondTopicBinding() {
        return BindingBuilder.bind(getTopicErrorQueue()).to(getTopicExchange()).with("topic.error.*");
    }


}

2.5.2 生产者实例

package springboot.rabbitmq.producer.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:06
 */
@Component
@Slf4j
public class TopicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendTopic() {


        RabbitMqMessage message = new RabbitMqMessage();
        message.setContent("hello 我是主体模式队列消息内容:topic.error.route");
        message.setType("主体模式(Topic)");
        rabbitTemplate.convertAndSend("topic.exchange","topic.error.route", message);

        RabbitMqMessage infoMessage = new RabbitMqMessage();
        infoMessage.setContent("hello 我是主体模式队列消息内容:topic.info.route");
        infoMessage.setType("主体模式(Topic)");
        rabbitTemplate.convertAndSend("topic.exchange","topic.info.route", infoMessage);

        RabbitMqMessage warnMessage = new RabbitMqMessage();
        warnMessage.setContent("hello 我是主体模式队列消息内容:topic.warn.route");
        warnMessage.setType("主体模式(Topic)");
        rabbitTemplate.convertAndSend("topic.exchange","topic.warn.route", warnMessage);
        return "主体模式发送成功";
    }
}

2.5.3 消费者实例

package springboot.rabbitmq.consumer.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import springboot.rabbitmq.po.RabbitMqMessage;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:25
 */
@Component
@Slf4j
public class TopicConsumer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "topic.error.queue"), //创建临时队列
                    exchange = @Exchange(value = "topic.exchange", type = "topic"), //绑定交换机
                    key = "topic.error.*"
            )
    })
    public void receive(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("topic.error.*队列模式:"+rabbitMqMessage.getType()+"/n"+"topic.error.*消息内容:"+rabbitMqMessage.getContent());
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "topic.info.queue"), //创建临时队列
                    exchange = @Exchange(value = "topic.exchange", type = "topic"), //绑定交换机
                    key = "topic.*.route"
            )
    })
    public void receive1(Message message) {
        RabbitMqMessage rabbitMqMessage =(RabbitMqMessage) rabbitTemplate.getMessageConverter().fromMessage(message);
        log.error("topic.*.route队列模式:"+rabbitMqMessage.getType()+"/n"+"topic.*.route消息内容:"+rabbitMqMessage.getContent());
    }
}

2.5.4 测试结果

  • 编写控制类请求接口进行推送消息
package springboot.rabbitmq.ctl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import springboot.rabbitmq.producer.direct.DirectProducer;
import springboot.rabbitmq.producer.fanout.FanoutProducer;
import springboot.rabbitmq.producer.simple.SimpleProducer;
import springboot.rabbitmq.producer.topic.TopicProducer;
import springboot.rabbitmq.producer.work.WorkProducer;

/**
 * @author jbjia
 * @version V1.0
 * @title
 * @description TODO
 * @date 2022/5/30 16:11
 */

@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQProducerCtl {
 
    @Autowired
    TopicProducer topicProducer;

  

    @GetMapping("/send/topic")
    public String sendTopic(){
        return topicProducer.sendTopic();
    }
}

  • 接口请求结果

  • MQ监控界面

  • 消费结果

标签:springboot,队列,springframework,public,RabbitMQ,org,import,annotation,入门
来源: https://www.cnblogs.com/jbjia/p/16328965.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有