ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

使用Topics 通配符模式实现消息的订阅和发布

2022-02-21 12:34:48  阅读:190  来源: 互联网

标签:订阅 exchange Topics 通配符 fanout user import message public


package com.itheima.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.itheima.domain.User;

@Service
public class RabbitMQService {

    /**
     * Publish/Subscribe工作模式接收,处理邮件业务
     * 
     * @param message
     */
    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: " + s);
    }

    /**
     * Publish/Subscribe工作模式接收,处理短信业务
     * 
     * @param message
     */
    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: " + s);
    }

    /**
     * Publish/Subscribe工作模式接收,处理邮件业务
     * 
     * @param message
     */
    @RabbitListener(queues = "my_fanout_queue_email")
    public void my_psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: " + s);
    }

    /**
     * Publish/Subscribe工作模式接收,处理短信业务
     * 
     * @param message
     */
    @RabbitListener(queues = "my_fanout_queue_sms")
    public void my_psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: " + s);
    }

    /**
     * **使用基于注解的方式实现消息服务 1.1、Publish/Subscribe工作模式接收,处理邮件业务
     * 
     * @param user
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("my2_fanout_queue_email"), exchange = @Exchange(value = "my2_fanout_exchange", type = "fanout")))
    public void psubConsumerEmailAno(User user) {
        System.out.println("邮件业务接收到消息: " + user);
    }

    /**
     * 1.2、Publish/Subscribe工作模式接收,处理短信业务
     * 
     * @param user
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("my2_fanout_queue_sms"), exchange = @Exchange(value = "my2_fanout_exchange", type = "fanout")))
    public void psubConsumerSmsAno(User user) {
        System.out.println("短信业务接收到消息: " + user);
    }

    /**
     * 2.1、路由模式消息接收,处理error级别日志信息
     * 
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key"))
    public void routingConsumerError(String message) {
        System.out.println("接收到error级别日志消息: " + message);
    }

    /**
     * 2.2、路由模式消息接收,处理info、error、warning级别日志信息
     * 
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = {
            "error_routing_key", "info_routing_key", "warning_routing_key" }))
    public void routingConsumerAll(String message) {
        System.out.println("接收到info、error、warning等级别日志消息: " + message);
    }

    /**
     * 3.1、通配符模式消息接收,进行邮件业务订阅处理
     * 
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.email.#"))
    public void topicConsumerEmail(String message) {
        System.out.println("接收到邮件订阅需求处理消息: " + message);
    }

    /**
     * 3.2、通配符模式消息接收,进行短信业务订阅处理
     * 
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.sms.#"))
    public void topicConsumerSms(String message) {
        System.out.println("接收到短信订阅需求处理消息: " + message);
    }
}
package com.itheima;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.itheima.domain.User;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ApplicationTest {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 使用AmqpAdmin管理员API定制消息组件
     */
    @Test
    public void amqpAdmin() {
        // 1、定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2、定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3、将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(
                new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
        amqpAdmin.declareBinding(
                new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
    }

    /**
     * 1、Publish/Subscribe工作模式消息发送端
     */
    @Test
    public void psubPublisher() {
        User user = new User();
        user.setId(1);
        user.setUsername("石头");
        rabbitTemplate.convertAndSend("fanout_exchange", "", user);
    }

    /**
     * 1、Publish/Subscribe工作模式消息发送端
     */
    @Test
    public void my_psubPublisher() {
        User user = new User();
        user.setId(1);
        user.setUsername("天生自然");
        rabbitTemplate.convertAndSend("my_fanout_exchange", "", user);
    }

    @Test
    public void my2_psubPublisher() {
        User user = new User();
        user.setId(1);
        user.setUsername("天生自然--天生自然");
        rabbitTemplate.convertAndSend("my2_fanout_exchange", "", user);
    }

    /**
     * 2、Routing工作模式消息发送端
     */
    @Test
    public void routingPublisher() {
        rabbitTemplate.convertAndSend("routing_exchange", "error_routing_key", "routing send  error message");
    }

    @Test
    public void info_routingPublisher() {
        rabbitTemplate.convertAndSend("routing_exchange", "info_routing_key", "routing send  我的info message");
    }

    /**
     * 3、Topcis工作模式消息发送端
     */
    @Test
    public void topicPublisher() {
        // 1、只发送邮件订阅用户消息
        rabbitTemplate.convertAndSend("topic_exchange", "info.email", "topics send  email message");
        // 2、只发送短信订阅用户消息
        rabbitTemplate.convertAndSend("topic_exchange", "info.sms", "topics send  sms message");
        // 3、发送同时订阅邮件和短信的用户消息
        rabbitTemplate.convertAndSend("topic_exchange", "info.email.sms", "topics send  email and sms message");
    }
}

 

 

 

 

 

标签:订阅,exchange,Topics,通配符,fanout,user,import,message,public
来源: https://www.cnblogs.com/tszr/p/15918320.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有