ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RocketMQ分布式消息队列

2022-07-17 01:32:43  阅读:214  来源: 互联网

标签:RocketMQ 队列 java1234 broker 发送 消息 rocketmq 分布式


RocketMQ分布式消息队列


来源作者:java1234_小锋

1RocketMQ简介


RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

官方文档:https://rocketmq.apache.org/docs/quick-start/

github中文主页:https://github.com/apache/rocketmq/tree/master/docs/cn

核心概念

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • Message:生产者向Topic发送并最终传送给消费者的数据消息的载体。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
  • Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
  • Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
  • Producer:也称为消息发布者,负责生产并发送消息至Topic。
  • Consumer:也称为消息订阅者,负责从Topic接收并消费消息。
  • 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
  • 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
  • Group ID:Group的标识。
  • 队列:个Topic下会由一到多个队列来存储消息。
  • Exactly-Once投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费一次。
  • 集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
  • 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
  • 定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
  • 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
  • 事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。
  • 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。
  • 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
  • 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
  • 消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。
  • 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成。
  • 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题。
  • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息。
  • 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。
    消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

应用场景

  • 削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
  • 异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
  • 顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
  • 分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
  • 大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
  • 分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。

架构设计

1 技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

2 RocketMQ Server安装


RocketMQ依赖Java环境,要求有JDK 1.8以上版本;

支持Windows和Linux平台;支持源码方式安装和使用已经编译好的安装包安装;

我们用windows平台安装RocketMQ Server编译好的安装包,来讲解RocketMQ;

下载地址:https://rocketmq.apache.org/dowloading/releases/

我们用最新版本:4.9.0 release:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip

解压后的目录:

benchmark:里面是测试Demo;

bin:可执行脚本;

conf:配置文件;

lib:依赖的jar包;

我们把rocketmq解压包放到D盘根目录,重命名rocketmq

第一步:系统环境变量加两个配置

ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"
image-20220717000442782

第二步:启动Name Server

进入命令行执行:

.\bin\mqnamesrv.cmd

第三步:启动Broker

进入命令行执行:

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

第四步:发送和接收消息测试

进入命令行消息发送执行:

.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Producer

消息发送成功;

进入命令行消息接收执行:

.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

消息接收成功;

第五步:关闭服务

windows下直接关闭命令行窗口即可;

3 RocketMQ可视化控制台安装与使用


RocketMQ提供了一些扩展项目支持,地址:https://github.com/apache/rocketmq-externals

其中一个rocketmq-connect-console项目,就是我们需要的可视化控制台;

我们把整个项目下载下来,打开rocketmq-console项目;项目是SpringBoot开发;

打开application.properties配置文件,我们至少需要修改两个配置项;

server.port=8080,这个是可视化项目启动端口,我们改成8888;

rocketmq.config.namesrvAddr=,这个是指定nameServer地址和端口,我们暂时先搞成localhost:9876,等后面搞集群的话,要再修改;

修改后保存,进入命令行,执行:

mvn clean package -Dmaven.test.skip=true

打包执行完后,在target目录,会生成一个可运行jar rocketmq-console-ng-2.0.0.jar

image-20220717003607234

我们运行这个jar,进入命令行执行:

java -jar rocketmq-console-ng-2.0.0.jar

启动成功后,浏览器输入:http://localhost:8888/

说明一切OK;

4 SpringBoot整合RocketMQ实现消息发送和接收


我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;

最终项目结构如下:

具体步骤如下:

第一步:我们新建一个父项目rocketmq-test,pom类型,主要是依赖管理,包括版本的管理,以及管理module子项目

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>rocketmq-provider</module>
        <module>rocketmq-consumer</module>
    </modules>
    <groupId>com.java1234</groupId>
    <artifactId>rocketmq-test2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <springboot.version>2.3.2.RELEASE</springboot.version>
        <rocketmq.version>2.2.0</rocketmq.version>
    </properties>

    <dependencyManagement>

        <dependencies>

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${springboot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>


            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>${rocketmq.version}</version>
            </dependency>
        </dependencies>

    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

第二步:新建消息生产者rocketmq-provider子项目

pom.xml加下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq-test2</artifactId>
        <groupId>com.java1234</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-provider</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

    </dependencies>

</project>

新建项目配置文件application.yml,指定name-server,以及producer-group组

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer-demo1

新建消息生产者Service类ProducerService

package com.java1234.rocketmq;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息生产者
 * @author java1234_小锋
 * @site www.java1234.com
 * @company 南通小锋网络科技有限公司
 * @create 2021-08-22 22:16
 */
@Component("producerService")
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送简单消息
     */
    public void sendMessage(){
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("java1234-rocketmq","rocketmq大爷,你好!"+i);
        }
    }
}

SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息;

另外这个类我们要加下@Component注解,让Spring来管理实例,方便其他地方获取bean来使用;

发送方法指定Topic主题java1234-rocketmq

启动类获取ProducerService实例,调用发送消息方法

package com.java1234;

import com.java1234.rocketmq.ProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class RocketmqTestApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(RocketmqTestApplication.class, args);
        ProducerService producerService = (ProducerService) run.getBean("producerService");
        producerService.sendMessage();
    }

}

我们获取ProducerService实例,调用sendMessage方法发送消息;

第三步:新建消息消费者rocketmq-consumer子项目

pom.xml加下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq-test2</artifactId>
        <groupId>com.java1234</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-consumer</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

    </dependencies>

</project>

新建项目配置文件application.yml,指定name-server,以及consumer-group组

server:
  port: 8084
  servlet:
    context-path: /

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: consumer-demo1

新建消息消费者Service类ConsumerService,监听消息,消费消息

package com.java1234.rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 消息消费者
 * @author java1234_小锋
 * @site www.java1234.com
 * @company 南通小锋网络科技有限公司
 * @create 2021-08-22 22:40
 */
@RocketMQMessageListener(topic = "java1234-rocketmq",consumerGroup ="${rocketmq.consumer.group}" )
@Component
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("收到消息内容:"+s);
    }
}

消费者类要实现RocketMQListener接口,以及动态指定消息类型String。

类上要加上@RocketMQMessageListener注解,指定topic主题java1234-rocketmq,以及消费者组${rocketmq.consumer.group}

同样这个类上也要加上@Component注解,让Spring来管理bean实例;

启动类:

package com.java1234;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author java1234_小锋
 * @site www.java1234.com
 * @company 南通小锋网络科技有限公司
 * @create 2021-09-05 11:59
 */
@SpringBootApplication
public class RocketmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketmqConsumerApplication.class, args);
    }
}

第四步:测试

先启动rocketmq-consumer项目,监听消息

再启动rockeqmq-provider项目,发送消息

消息消费者端收到消息:

测试OK,成功消费!

5 RocketMQ发送同步消息


发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果;

相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如:短信通知,邮件通知,站内重要信息通知等。

RocketMQTemplate给我们提供了syncSend方法(有多个重载),来实现发送同步消息;

下面给一个实例:

/**
 * 发送同步消息
 */
public void sendSyncMessage(){
	for(int i=0;i<10;i++){
		SendResult sendResult = rocketMQTemplate.syncSend("java1234-rocketmq","rocketmq同步消息!"+i);
		System.out.println(sendResult);
	}
}

这里执行完发送同步消息返回执行结果SendResult

image-20210824081817314

运行测试OK;

6 RocketMQ发送异步消息


发送异步消息是指producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。

RocketMQTemplate给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;

下面给一个实例:

/**
 * 发送异步消息
 */
public void sendAsyncMessage(){
	for(int i=0;i<10;i++){
		rocketMQTemplate.asyncSend("java1234-rocketmq", "rocketmg异步消息!"+i, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				System.out.println("发送成功!");
			}

			@Override
			public void onException(Throwable throwable) {
				System.out.println("发送失败!");
			}
		});
	}
}

类似发送同步消息,多了一个SendCallback回调接口参数,实现onSuccess和onException方法,分别表示异步发送成功和失败;

image-20210824082022593

运行测试OK!

7 RocketMQ发送单向消息


发送单向消息是指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

这种方式主要用在不特别关心发送结果的场景,举例:日志发送;

RocketMQTemplate给我们提供了sendOneWay方法(有多个重载),来实现发送单向消息;

下面给一个实例:

/**
 * 发送单向消息
 */
public void sendOneWayMessage(){
	for(int i=0;i<10;i++){
		rocketMQTemplate.sendOneWay("java1234-rocketmq", "rocketmg单向消息!"+i);
	}
}

image-20210824082205664

运行测试OK!

8 RocketMQ消费者广播模式和负载均衡模式


如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢?还是通过一些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?

这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;

广播模式是每个消费者,都会消费消息;

负载均衡模式是每一个消息只会被某一个消费者消费一次;

我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;

我们可以通过@RocketMQMessageListenermessageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡模式;

我们先集群负载均衡测试,加上messageModel=MessageModel.CLUSTERING

我们启动两个实例,先启动一个RocketmqConsumer消费者实例,端口8084

编辑配置,

Single instance only的勾选去掉,然后点OK按钮;

然后修改代码,启动端口改成8085

消费者输出信息改下;

最后我们再启动一个消费者实例;

我们启动消息生产者测试:

启动后,两个消费者控制台分别输出:

image-20210826144930076

根据实验,我们发现消息被两个消费者负载均衡随机消费掉了。

我们再来测试下广播模式;修改messageModel=MessageModel.BROADCASTING

测试发现,两个消费者客户端把消息都各自消费了一遍。广播模式测试OK;

9 RocketMQ实现顺序消息


rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;

有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;

RocketMQTemplate给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:

syncSendOrderly,发送同步顺序消息;

asyncSendOrderly,发送异步顺序消息;

sendOneWayOrderly,发送单向顺序消息;

一般我们用第一种发送同步顺序消息;

第三个参数hashKey,方法点进去:

因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,产品ID作为参数值;

发送到一个队列,这样方便搞顺序队列;

以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;

下面给具体事例,模拟两个订单发送消息;

消息生产者端:

/**
 * 发送同步顺序消息
 */
public void sendOrderlyMessage(){
	// hashKey用来计算决定消息发送到哪个消息队列 一般是订单ID,产品ID等
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");

	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
}

消费者端:

/**
 * 消息消费者
 * @author java1234_小锋
 * @site www.java1234.com
 * @company 南通小锋网络科技有限公司
 * @create 2021-08-22 22:40
 */
@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",consumerGroup ="${rocketmq.consumer.group}",consumeMode =ConsumeMode.ORDERLY )
@Component
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费者:收到消息内容:"+s);
    }

}

运行测试:

没问题!

10 RocketMQ实现延迟消息


延迟消息
对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。

   延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了。

Rocket的延迟消息
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

我们会发现,所有消息发送方法都有一个带int类型的delayLevel参数重载方法,这个就是设置延迟消息级别的参数。

同时注意,每个带delayLevel参数的方法,也同时带有long类型的timeout参数,这个是设置消息发送超时时间,默认是3秒,我们也可以自行设置;

同时还有 Message参数,如果发送这种类型的消息,可以携带具体的消息数据;

我们给下实例:

/**
 * 发送延迟消息
 */
public void sendDelayMessage(){
	rocketMQTemplate.syncSend("java1234-rocketmq",MessageBuilder.withPayload("rocketmq延迟1秒消息").build(),3000,1);
	rocketMQTemplate.syncSend("java1234-rocketmq",MessageBuilder.withPayload("rocketmq延迟5秒消息").build(),3000,2);
	rocketMQTemplate.syncSend("java1234-rocketmq",MessageBuilder.withPayload("rocketmq延迟10秒消息").build(),3000,3);
}

运行测试:

没问题!

11 RocketMQ实现事务消息


事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

执行流程:

  1. 应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
  2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
  3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
  4. 如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
  5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步。
  6. MQ消费的成功机制由MQ自己保证。

具体实例:

通过rocketMQTemplatesendMessageInTransaction方法发送事务消息

/**
 * 发送事务消息
 */
public void sendTransactionMessage(){
	// 构造消息
	Message msg = MessageBuilder.withPayload("rocketmq事务消息-01").build();
	rocketMQTemplate.sendMessageInTransaction("java1234-transaction-rocketmq",msg,null);
}

定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;

executeLocalTransaction方法,当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有bollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回bollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;

checkLocalTransaction方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// ... local transaction process, return bollback, commit or unknown
		System.out.println("executeLocalTransaction");
		return RocketMQLocalTransactionState.UNKNOWN;
	}

	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		// ... check transaction status and return bollback, commit or unknown
		System.out.println("checkLocalTransaction");
		return RocketMQLocalTransactionState.COMMIT;
	}
}

运行:

生产者端两个方法都执行到了,

消费端也获取到了消息;

执行如下:

生产者端发送half消息到MQ-SERVER,然后异步执行executeLocalTransaction方法,返回unknown,MQ-SERVER接收到unknown后,继续等待,然后再执行checkLocalTransaction确认,返回commit,MQ-SERVER得到commit后,消费端才可以消费消息;

12 RocketMQ过滤消息


在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过滤,处理需要的消息;

尤其是广播模式下,消息过滤经常使用;

RocketMQ提供了TAG和SQL表达式两种消息过滤方式;

12.1 根据TAG方式过滤消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag。

接收消息端通过 ‘||’ 设置多个tag,如下:tag1 || tag2 || tag3 || ...

上实例,生产端发送三个消息,TAG分别是TAG1,TAG2,TAG3

/**
 * 发送带Tag消息,测试根据Tag过滤消息
 */
public void sendMessageWithTag(){
	// 构造消息1
	Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试-TAG01").build();
	// 构造消息2
	Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试-TAG02").build();
	// 构造消息3
	Message msg3 = MessageBuilder.withPayload("rocketmq过滤消息测试-TAG03").build();

	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq" + ":" + "TAG1", msg1);
	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq" + ":" + "TAG2", msg2);
	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq" + ":" + "TAG3", msg3);

}

消费端,通过selectorExpression = "TAG1 || TAG2",selectorType = SelectorType.TAG,指定需要消费的TAG

@RocketMQMessageListener(topic = "java1234-filter-rocketmq",consumerGroup ="${rocketmq.consumer.group}" ,selectorExpression = "TAG1 || TAG2",selectorType = SelectorType.TAG)
@Component
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费者:收到消息内容:"+s);
    }

}

运行测试:

发现只消费了TAG1和TAG2的消息,TAG3消息没有被消费;

12.2 根据SQL表达式过滤消息

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

数字比较,如>,>=,<,<=,BETWEEN,=;

字符比较,如:=,<>,IN;

IS NULL or IS NOT NULL;

逻辑运算符:AND, OR, NOT;

常量类型:

数值,如:123, 3.1415;

字符, 如:‘abc’, 必须使用单引号;

NULL,特殊常量

Boolean, TRUE or FALSE;

上实例,发送三个消息,分别带上不同的header头信息;

/**
 * 发送SQL表达式头信息消息,测试根据SQL表达式过滤消息
 */
public void sendMessageWithSQL(){
	// 构造消息1
	Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试01").build();
	Map<String, Object> headers = new HashMap<>() ;
	headers.put("type", "pay") ;
	headers.put("a", 10) ;
	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg1, headers);

	// 构造消息2
	Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试02").build();
	Map<String, Object> headers2 = new HashMap<>() ;
	headers2.put("type", "store") ;
	headers2.put("a", 4) ;
	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg2, headers2);

	// 构造消息3
	Message msg3 = MessageBuilder.withPayload("rocketmq过滤消息测试03").build();
	Map<String, Object> headers3 = new HashMap<>() ;
	headers3.put("type", "user") ;
	headers3.put("a", 7) ;
	rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg3, headers3);

}

消费者端,selectorExpression = "type='user' or a <7",selectorType = SelectorType.SQL92 ,指定selectorType 以及设置表达式selectorExpression

@RocketMQMessageListener(topic = "java1234-filter-rocketmq",consumerGroup ="${rocketmq.consumer.group}" ,selectorExpression = "type='user' or a <7",selectorType = SelectorType.SQL92)
@Component
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费者:收到消息内容:"+s);
    }

}

默认不支持SQL表达式,启动报错:

The broker does not support consumer to filter message by SQL92

找到broker.conf配置文件

加下:

enablePropertyFilter=true

重新启动borker

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true -c .\conf\broker.conf

运行测试:

过滤,收到2条消息,没问题;

13 RocketMQ集群搭建


13.1 RocketMQ集群介绍

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

13.2 RocketMQ集群模式介绍

单 master 模式

  • 也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用,适合个人学习使用。

多 master 模式

  • 多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

  • 优点:所有模式中性能最高

  • 缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。

  • 注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某个节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。

多 master 多 slave 异步复制模式

  • 在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。
  • master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主从模式。
  • 优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
  • 缺点:使用异步复制的方式有可能会有消息丢失的问题。

多 master 多 slave 同步双写模式

  • 同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
  • 优点:同步双写的同步模式能保证数据不丢失。
  • 缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
  • 刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
  • 同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
  • 注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。
13.3 RocketMQ双主双从集群模式工作流程介绍
  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
13.4 RocketMQ集群搭建
13.4.1 Centos单机搭建Rocketmq

先把rocketmq上传到/hom/data/目录下;

为了方便,我们统一用finalshell工具上传;

/home/下面再新建一个mq目录用来存放rocketmq安装文件;

进入data目录,解压rocketmq压缩包到mq目录

unzip rocketmq-all-4.9.0-bin-release.zip -d ../mq

假如unzip没安装,可以安装下

yum install -y unzip zip

为了操作方便我们把解压后的文件名改成rocketmq,工具里右击重命名即可;

rocketmq目录下,新建logs和store两个目录;

logs是存储rocketmq日志的目录;

store是存储rocketmq数据文件的目录;

在store目录再新建commitlogconsumequeueindex三个目录;

commitlog存储RocketMQ消息信息目录;

consumequeueindex存储索引文件数据目录;

我们先配置单节点,修改conf下的2m-2s-sync配置文件;

image-20220717010818989覆盖掉broker-a.properties配置文件内容:

brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.0.110:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/home/mq/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/home/mq/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/home/mq/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/home/mq/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/home/mq/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

这里有个 刷盘方式 具体讲下:

RocketMQ提供了两种刷盘策略同步刷盘、异步刷盘

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

复制方式 优点 缺点 适应场景
同步刷盘 保证了消息不丢失 吞吐率相对于异步刷盘要低 消息可靠性要求较高的场景
异步刷盘 系统的吞吐量提高 系统断电等异常时会有部分丢失 对应吞吐量要求较高的场景

进入conf目录下,替换掉所有xml中的${user.home},确保日志路径正确

执行:

sed -i 's#${user.home}#/home/mq/rocketmq#g' *.xml

路径全部替换了;

Rocketmq启动对内存要求比较高,一般至少1个G内存,否则会影响RocketMQ的性能;

默认配置内存比较高,虚拟机不方便演示,所以我们修改下bin目录下的runbroker.sh和runserver.sh文件;

runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

改成

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"

runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

改成

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

先启动nameserver(后台运行方式启动):

nohup sh mqnamesrv &

再启动broker(后台运行方式启动)

 nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-a.properties &

可以用jps查看进程:

jps
83545 Jps
65852 NamesrvStartup
83310 BrokerStartup

备注:关闭命令

先关闭broker,再关闭nameserver

sh mqshutdown broker
sh mqshutdown namesrv

我们代码测试下:

把消费端,生产端的rocketmq.name-server值改成:虚拟机IP地址:9876

启动测试OK;

13.4.2 Centos集群一主一从同步搭建Rocketmq

基于前面的单机模式,我们VM里面克隆一下系统;

192.168.0.110 机器 作为主节点

192.168.0.103 机器作为从节点

配置项要点:

  • brokerClusterName集群名称一样;

  • brokerName同一组主从节点名称一样;

  • brokerId为0表示Master主节点,非0表示Slave从节点;

所以,从节点机器,我们修改conf下的2m-2s-sync配置文件broker-a-s.properties

从原来的broker-a.properties复制一份内容到broker-a-s.properties,然后修改三个地方:

#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
namesrvAddr=192.168.0.110:9876;192.168.0.103:9876;

我们先把两台机器的nameserver启动起来:

192.168.0.110 主机器:

nohup sh mqnamesrv &

192.168.0.103 从机器

nohup sh mqnamesrv &

再把两台机器的broker启动起来:

192.168.0.110 主机器:

 nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-a.properties &

192.168.0.103 从机器

 nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-a-s.properties &

可视化控制台项目namesrvAddr配置改下:

启动控制台项目:

项目代码,生产端和消费端name-server都改下;

启动测试:

我们发现,消费消息都是从主节点broker消费;

我们模拟下,让主节点broker挂掉;

sh mqshutdown broker

启动项目生产端发送消息报错:

Exception in thread "main" org.springframework.messaging.MessagingException: No route info of this topic: java1234-rocketmq

消费端没有问题,可以继续订阅;

所有这种一主一从模式还是有问题;我们继续后面双主双从;

13.4.3 Centos集群双主双从同步搭建Rocketmq

我们企业级开发,一般采用的是双主双从同步,以及异步刷盘;

同步消息保证消息不丢失,异步刷盘提高吞吐量;

我们VM里再克隆两台机器;

192.168.0.110 机器 作为m1主节点

192.168.0.103 机器作为s1从节点

192.168.0.111 机器 作为m2主节点

192.168.0.112 机器作为s2从节点

首先,我们把namesrvAddr配置修改,每个broker都要注册到所有nameserver

namesrvAddr=192.168.0.110:9876;192.168.0.103:9876;192.168.0.111:9876;192.168.0.112:9876;

刷盘机制都改成异步:

#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

192.168.0.111 机器 作为m2主节点从m1主节点的broker-a.properties复制内容到broker-b.properties文件;

修改broker-b.properties配置文件;

修改内容如下:

brokerName=broker-b

192.168.0.112 机器 作为s2主节点从s1主节点的broker-a-s.properties复制内容到broker-b-s.properties文件;

修改broker-b-s.properties配置文件;

修改内容如下:

brokerName=broker-b

然后分别启动四个机器;

先启动nameserver:

nohup sh mqnamesrv &

再启动broker:

192.168.0.110 机器 作为m1主节点

nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-a.properties &

192.168.0.103 机器作为s1从节点

nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-a-s.properties &

192.168.0.111 机器 作为m2主节点

nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-b.properties &

192.168.0.112 机器作为s2从节点

nohup sh mqbroker -c /home/mq/rocketmq/conf/2m-2s-sync/broker-b-s.properties &

可视化控制台项目namesrvAddr配置改下:

启动控制台项目:

项目代码,生产端和消费端name-server都改下;

启动测试:

两个主节点一起分担消息处理;

我们模拟下,让a主节点broker挂掉;

sh mqshutdown broker

再运行代码测试,发现b主节点承担了所有消息接收和处理;实现了高可用;


标签:RocketMQ,队列,java1234,broker,发送,消息,rocketmq,分布式
来源: https://www.cnblogs.com/q1359720840/p/16485767.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有