ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMq 入门实例详解+实例代码

2022-07-03 12:01:37  阅读:141  来源: 互联网

标签:connectionFactory return springframework 实例 RabbitMq import org public 详解


RabbitMq 入门实例详解+实例代码 最近有项目又有使用RabbitMQ,使用过程中看到有使用 “AmqpAdmin” 后进行详细研究为什么会用 创建 Queue、Exchange 还用AmqpAdmin.delcareQueue,经过深入的查资料和思考发现 其实没必要 在@Configuration 类中 Return new Queue()中使用; 经过这次遇到的问题也正好对RabbitMq整体梳理,下面是干货满满的说明和实例代码。 RabbitMQ 按装请自已查找,这里不在赘述。   1、 实例中有生产者:
confirmpusher:发送方
confirmreceive:接收方
实例confirmpusher 详细搭建
1)Pom 文件节点:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.liyanbomq</groupId>
    <artifactId>confirmpusher</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>confirmpusher</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2)yml文件

spring:
  rabbitmq:
    host: 192.168.88.129
    username: liyanbo
    password: liyanbo
    #开启发送确认机制,将来消息到达交换机以后有一个回调
    publisher-confirm-type: correlated
    #消息到达消息队列回调(如果消息没有成功到达队列,会触发回调方法)
    publisher-returns: true
    template:
      retry:
        enabled: true  # 开启重发机制
        initial-interval: 1000ms #间隔 1秒
        max-attempts: 6    #最多发6次
        multiplier: 1.2 #每次间隔 时间*1.2
        max-interval: 10000ms  #每次最大间隔时间
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual


server:
  port: 7004

3)读取RabbitMQ相关配置后声明 队列、交换机,进行交换机、队列绑定

@Configuration

RabbitConfigurationSure
package com.liyanbomq.confirmpusher.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;


/**
 * 用于mq消息确认
 *
 * 总结
 *   一、确认消息发送成功到交换机和消息队列
 *     1、调用回调方法
 *       利用 yml文件中参数进行设置发送失败消息进行重新发送
 *          template:
 *              retry:
 *                  enabled: true  # 开启重发机制
 *                  initial-interval: 1000ms #间隔 1秒
 *                  max-attempts: 6    #最多发6次
 *                  multiplier: 1.2 #每次间隔 时间*1.2
 *                  max-interval: 10000ms  #每次最大间隔时间
 *
 *
 *      二、 确认消息发送成功到交换机和消息队列 调用回调方法
 *         1、在回调方法中 记录发送失败的数据记录 (如用mysql存储)
 *         2、制作定时任务处理未成功发送的消息
 *
 */
@Configuration
public class RabbitConfigurationSure implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    public  static final  String CONFIRM_QUEUE_NAME="confirm_queue";
    public  static final String EXCHANGE_NAME="confirm_exchange";


   public final Logger logger= LoggerFactory.getLogger(RabbitConfigurationSure.class);

   @Autowired
   RabbitTemplate rabbitTemplate;

    /**
     * 这样设置 setReturnsCallback、setConfirmCallback才会起作用
     */
   @PostConstruct
   public  void  initRabbitMq(){
       rabbitTemplate.setReturnsCallback(this);
       rabbitTemplate.setConfirmCallback(this);
   }

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue getConfirmQueue(){
        /**
         * parameter 1 队列名称
         * durable:持久化
         * exclusive: 排他其它连接也可操作
         * autodelete:没有消费连接时不会自动删除
         */
        return new Queue(CONFIRM_QUEUE_NAME,true,false,false);
    }

    /**
     * 交换机
     * @return
     */
    @Bean
    public DirectExchange getConirmExchange(){
        return  new DirectExchange(EXCHANGE_NAME,true,false);
    }


    /**
     * 绑定消息队列到交换机上
     * @return
     */
    @Bean
    public Binding  getConfirmBinding(){
        return BindingBuilder.bind(getConfirmQueue()).to(getConirmExchange()).with(CONFIRM_QUEUE_NAME);
    }


    /**
     * 消息成功到达交换机会触发回调该方法
     * @param correlationData
     * @param ack 是否成功到达交换机
     * @param cause 如果未成功到达,原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if(ack){
            System.out.println("成功到达交换机");
            logger.info("{}成功到达交换机",correlationData.getId());
        }else{
            System.out.println("未成功到达交换机,原因:"+cause);
            logger.info("{}未成功到达交换机,原因:{}",correlationData.getId(),cause);
        }

    }

    /**
     * 消息未成功到达该队列会触发回调该方法
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("消息成功到达队列");
        logger.info("{}消息成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());

    }


}

  4) 发起调用 往消息队列写信息

package com.liyanbomq.confirmpusher.controller;

import com.liyanbomq.confirmpusher.configuration.RabbitConfigurationSure;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class pushmqController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("send")
    public String sendmqInfo(@RequestParam("msg") String msg){

//        try {
            rabbitTemplate.convertAndSend(RabbitConfigurationSure.EXCHANGE_NAME,RabbitConfigurationSure.CONFIRM_QUEUE_NAME,msg,new CorrelationData(UUID.randomUUID().toString()));
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        return "it is msg:"+msg;
    }


    @RequestMapping("sendfanout")
    public String sendmqInfofanout(@RequestParam("msg") String msg){

//        try {
        rabbitTemplate.convertAndSend(RabbitConfigurationSure.EXCHANGE_NAME+"fanout",RabbitConfigurationSure.CONFIRM_QUEUE_NAME+"fanout",msg,new CorrelationData(UUID.randomUUID().toString()));
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        return "it is msg:"+msg;
    }
}

  4)Springboot 启动项,运行

package com.liyanbomq.confirmpusher;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConfirmpusherApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConfirmpusherApplication.class, args);
	}

}

  

5)通过Controller发起调用

http://localhost:7004/send?msg=1234

 

2、接收都 代码搭建 1)pom文件、2)yml文件、3)配置信息读取与 发送者搭建一样

1、

2、

3、

4、接收者代码

package com.liyanbomq.confirmreceive.receive;

import com.liyanbomq.confirmreceive.configuration.RabbitConfigurationSure;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class ReceiverService {

    @RabbitListener(queues = RabbitConfigurationSure.CONFIRM_QUEUE_NAME)
    public void  receiveMsg(Message message, Channel channel){
        // 消息标识
        long deliveryTag= message.getMessageProperties().getDeliveryTag();
        try {
            byte [] bytes=message.getBody();
            String mess=new String(bytes);
            System.out.println("mess = " + mess);
//            int l=2/2/0;
            //  手动确认签收 第一个参数是消息标记 第二个参数fasle 只确认当前消息,true表示之前所有的消息都确认成功
            channel.basicAck(deliveryTag,false);


        } catch (Exception e) {
            try {
                // 标示签收失败,再次放入队列中  第三个参烽 requeue 再次放入队列
                channel.basicNack(deliveryTag,false,true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
            e.printStackTrace();
        }


    }
}

  

二、关于自定义配置RabbitMq 和AmqpAdmin.declare 说明

一、关于 rabbitMq配置是否需要显式写代码
/**
*
*组装配置 rabbitMq - connectionFactory
* 1、如果默认配置节点设置 不用该组装connectionFactory
* spring:
* rabbitmq:
* username: liyanbo
* password: liyanbo
* host: 192.168.88.129
* port: 5672
*2、如果 自定义配置像下面 自定义pos:就需要 组装配置 connectionFactory
* spring:
* * rabbitmq:
* pos:
* * username: liyanbo
* * password: liyanbo
* * host: 192.168.88.129
* * port: 5672
*
* @return
*/
@Bean(name = "connectionFactory")
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("192.168.88.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("liyanbo");
connectionFactory.setPassword("liyanbo");
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}

@Bean(name = "rabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("connectionFactory") CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}

@Bean(name = "amqpAdmin")
public AmqpAdmin amqpAdmin(@Qualifier("connectionFactory") CachingConnectionFactory connectionFactory){
AmqpAdmin amqpAdmin=new RabbitAdmin(connectionFactory);
return amqpAdmin;
}

二、 AmqpAdmin amqpAdmin 是否需要
1、使用了下面 @Configuration 就不需要使用
@Configuration
public class RabbitConfigurationSure {
2、在下面过程中 会自动生成声明 Queue(源码本有逻辑), 不需要使用amqpAdmin.declareQueue(queue); 只有使用测试或普通类不使用 @Configuration 才需要amqpAdmin.declareQueue(queue)
@Bean
public Queue supplyRabbitQueue() {
return new Queue("SupplyQueue");
}


ps:AmqpAdmin用于创建/删除 Exchange、Queue、Binding 和初始化RabbitMQ

如有疑问或问题可以沟通讨论qq:626382542

标签:connectionFactory,return,springframework,实例,RabbitMq,import,org,public,详解
来源: https://www.cnblogs.com/liyanbofly/p/16439531.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有