ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ消息确认高级

2021-11-08 10:01:26  阅读:152  来源: 互联网

标签:确认 高级 springframework annotation 交换机 RabbitMQ org import public


当交换机宕机或路由不可达时,为了保证消息不丢失,需要通知到发送者。由此引出rabbitmq的消息回退机制。声明一个组件,继承内部接口,去实现rabbitmq宕机时,消息返回给发送者,消息不会丢失。

package com.zhaoye.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author feng
 * @Date 2021/11/8 8:10
 * 发布确认:交换机回调确认方法
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 仅仅实现了接口中的内部接口,并未存在在RabbitTemplate中,
     * 需要进行注入到该接口的内部接口中。
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }

    /**
     * @param correlationData 保存回调消息的ID和相关信息
     * @param ack             是否接收boolean
     * @param cause           失败的原因
     *                        交换机确认回调方法
     *                        1 成功,true
     *                        2 交换机确认失败,false,失败的原因,数据
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if (ack) {
            log.info("交换机已经收到id为:{}的消息", id);
        } else {
            log.info("交换机还未收到消息,id为:{},失败原因:{}", id, cause);
        }



    }
    // 路由不可达,消息的回退,返回给生产者
    // 只有路由不可达才回退

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
      log.error("消息{},被交换机{}退回,退回的原因:{},路由key是:{}",new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
     }


}

生产者:

package com.zhaoye.springbootrabbitmq.controller;

import com.zhaoye.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author feng
 * @Date 2021/11/7 21:40
 * 发布确认高级
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData1 = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFRIM_ROUTING_KEY, message, correlationData1);
        log.info("发送消息内容:{}", message + "key1");

        //测试队列宕机,消息确认接口回调
        CorrelationData correlationData2 = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFRIM_ROUTING_KEY + "2", message, correlationData2);
        log.info("发送消息内容:{}", message + "key2");

    }
}

消费者:

package com.zhaoye.springbootrabbitmq.consumer;

import com.zhaoye.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author feng
 * @Date 2021/11/7 21:46
 * 接收消息
 */
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到的消息:{}",msg);
    }

}

组件声明:

package com.zhaoye.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author feng
 * @Date 2021/11/7 21:29
 *  发布确认高级
 */
@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confrim_queue";
    public static final String CONFRIM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
     return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,
                                        @Qualifier("confirmExchange")DirectExchange confirmExchange ){
       return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFRIM_ROUTING_KEY);
    }

}

properties配置

#开启发布确认模式,交换机回调确认
spring.rabbitmq.publisher-confirm-type=correlated
#路由不可达回退
spring.rabbitmq.publisher-returns=true

测试结果:

: 发送消息内容:hellokey1
2021-11-08 09:46:21.966  INFO 23928 --- [nio-8080-exec-1] c.z.s.controller.ProducerController      : 发送消息内容:hellokey2
2021-11-08 09:46:21.978 ERROR 23928 --- [nectionFactory1] c.z.s.config.MyCallBack                  : 消息hello,被交换机confirm_exchange退回,退回的原因:NO_ROUTE,路由key是:key12
2021-11-08 09:46:21.978  INFO 23928 --- [nectionFactory1] c.z.s.config.MyCallBack                  : 交换机已经收到id为:1的消息
2021-11-08 09:46:21.981  INFO 23928 --- [ntContainer#0-1] c.z.s.consumer.Consumer                  : 接收到的消息:hello
2021-11-08 09:46:21.981  INFO 23928 --- [nectionFactory1] c.z.s.config.MyCallBack                  : 交换机已经收到id为:1的消息

标签:确认,高级,springframework,annotation,交换机,RabbitMQ,org,import,public
来源: https://blog.csdn.net/qq_57036017/article/details/121202025

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有