ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

消息中间件RabbitMq简单使用

2022-06-27 23:05:38  阅读:148  来源: 互联网

标签:rabbitmq 简单 boot 消息中间件 springframework RabbitMq import org public


测试截图

 

 

 

 

 

Unacked 消息,代表消息已经投递给消费者

Ready消息,代表消息已发送,消费者未接受

 每次都是想学个东西,找了一堆教程没一个能用,等自己写好教程了,结果搜到一堆能用的教程真是头大,坑爹啊

linux服务器安装rabbitmq

yum install socat

https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm

rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm

启用RabbitMQ的管理插件
rabbitmq-plugins enable rabbitmq_management
5、开启RabbitMQ
systemctl start rabbitmq-server
或者
rabbitmq-server
或者后台启动

rabbitmq-server -detached
6、添加用户
rabbitmqctl add_user root 123456
7、给用户添加权限
rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
给root用户在虚拟主机"/"上的配置、写、读的权限
8、给用户设置标签
rabbitmqctl set_user_tags root administrator

 页面登录使用15672端口,地址:15672

用户名  root

密码  123456

 

 

构建生产者

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.java</groupId>
    <artifactId>rabbitmq-service</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>rabbitmq-consumer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--引入junit单元测试依赖-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

  

server.port=15670

spring.rabbitmq.test.host=127.0.0.1
spring.rabbitmq.test.port=5672
spring.rabbitmq.test.username=xx
spring.rabbitmq.test.password=xx

  

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yourheart
 * @Description   消息的生产哲
 * @create 2022-05-24 14:34
 */
@SpringBootApplication
public class RabbitMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMqApplication.class,args);
    }
}


package com.java.config;

/**
 * @author yourheart
 * @Description
 * @create 2022-05-24 23:03
 */

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {
    //交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    //队列名称
    public static final String ITEM_QUEUE = "item_queue";

    //声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    //声明队列
    @Bean("itemQueue")
    public Queue itemQueue(){
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    //绑定队列和交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }


    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        //创建工厂类
        CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
        //用户名
        cachingConnectionFactory.setUsername("xx");
        //密码
        cachingConnectionFactory.setPassword("xx");
        //rabbitMQ地址
        cachingConnectionFactory.setHost("127.0.0.1");
        //rabbitMQ端口
        cachingConnectionFactory.setPort(5672);
        cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        return  cachingConnectionFactory;

    }

    /**
     * 将配置好的信息放入
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return  factory;
    }


    /**
     * 自动声明队列
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return  rabbitAdmin;
    }

}

  使用自定义线程池制造数据

package com.java;

import com.java.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author yourheart
 * @Description
 * @create 2022-06-25 21:54
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqThread {

    private final static Logger log= LoggerFactory.getLogger(RabbitmqThread.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  test001(){
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 16;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //循环提交任务
            for (int i = 0; i < 18; i++) {
                //提交任务的索引
                final int index = (i + 1);
                threadPoolExecutor.submit(() -> {
                    //线程打印输出
                    log.info("大家好,我是线程:{}",index);
                    String key="item.springboot-rabbitmq";
                    String msg="rabbitmq发送消息99999999";
                    log.info("【发送消息】msg:{}",msg);
                    rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE ,key ,msg);
                    try {
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }

}

  构建消费者

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbitmq-service</artifactId>
        <groupId>com.java</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-consumer</artifactId>


</project>

  

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yourheart
 * @Description
 * @create 2022-05-25 22:07
 */
@SpringBootApplication
public class RabbitMqConsumerApplication {
    public static void main(String[] args) {

        SpringApplication.run(RabbitMqConsumerApplication.class,args);

    }
}


package com.java.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author yourheart
 * @Description
 * @create 2022-05-25 22:09
 */
@Component
public class HelloController {
    private final static Logger log= LoggerFactory.getLogger(HelloController.class);

    @RabbitListener(queues = "item_queue")
    public void service(String messgae){
        try {
            Thread.sleep(0L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("【消息队列推送过来的数据】message:{}",messgae);

    }
}

  

server.port=15671

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=xx
spring.rabbitmq.password=xx

  

 

 

 

 

 

 

 

 

标签:rabbitmq,简单,boot,消息中间件,springframework,RabbitMq,import,org,public
来源: https://www.cnblogs.com/q202105271618/p/16290851.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有