ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spring Cloud Alibaba技术栈【下】

2021-11-29 09:04:49  阅读:177  来源: 互联网

标签:dubbo stream spring Alibaba rocketmq Cloud Spring msg cloud


Spring Cloud Alibaba Dubbo

一、项目简介

Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。

Dubbo Spring Cloud 首个 Preview Release,随同Spring Cloud Alibaba 0.2.2.RELEASE0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)

二、功能的完成度

由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟的实现,特性比对如下表所示:
在这里插入图片描述
在这里插入图片描述

三、框架的搭建

我们将搭建如图所示的项目框架
在这里插入图片描述

3.1 搭建 spring-cloud-dubbo-examples

spring-cloud-dubbo-exmaples 是一个父项目,用来给子项目控制版本和去除公共的依赖。

3.1.1 创建项目

使用 IDEA 创建一个模块:
在这里插入图片描述
选择 Maven:
在这里插入图片描述
点击 Next,进行下一步操作:
在这里插入图片描述
Parent:必须选择之前我们创建的 spring-cloud-alibaba-examples。
Name:spring-cloud-dubbo-examples 项目的名称
点击 Finish,完成项目的创建。
在这里插入图片描述
至此,spring-cloud-dubbo-examples 项目已经完成创建了。

3.1.2 添加依赖

打开该项目的 pom.xml,添加以下内容:

<dependencies>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-dubbo</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
	</dependency>
</dependencies>

3.1.3 修改项目的打包方式

<packaging>pom</packaging>

3.1.4 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-alibaba-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<packaging>pom</packaging>
	<artifactId>spring-cloud-dubbo-examples</artifactId>
	<dependencies>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-dubbo</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
		</dependency>
	</dependencies>
</project>

3.2 搭建 dubbo-api

dubbo-api 里面将存放用于发表服务的接口。

3.2.1 创建 dubbo-api 项目

使用 IDEA 创建一个子模块。
在这里插入图片描述
选择 Maven 项目:
在这里插入图片描述
点击 Next 进行下一步操作:
在这里插入图片描述
Parent:选择 spring-cloud-dubbo-examples
Name:名称为 dubbo-api
点击 Finish 完成项目的创建:
在这里插入图片描述

3.2.2 完整的 pom.xml 文件如下

dubbo-api 的 pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-api</artifactId>
</project>

3.3 搭建 dubbo-provider

3.3.1 创建 dubbo-provider 项目

搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:
在这里插入图片描述
选择 Maven 项目:
在这里插入图片描述
点击 Next,进行下一步操作:
在这里插入图片描述
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-provider
点击 Finish,完成项目的创建。

3.3.2 修改 Maven 的打包方式

Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar来运行

3.3.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-provider</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.4 搭建 dubbo-consumer

3.4.1 创建 dubbo-provider-consumer 项目

搭建 dubbo-provider 用来做一个服务的提供者。
使用 IDEA 创建一个模块:

在这里插入图片描述
选择 Maven 项目:
在这里插入图片描述
点击 Next,进行下一步操作:
在这里插入图片描述
Parent:选择 spring-cloud-alibaba-examples
Name:dubbo-consumer
点击 Finish,完成项目的创建。

3.4.2 修改 Maven 的打包方式

Maven 项目默认会被 target 目录下的 class 文件打包在一个 jar 里面,该 jar 并不能直接运行,我们需要修改它的打包方式为 spring-boot 的打包,这样,打包后的项目将能直接被运行。
修改 pom.xml ,添加如下的内容:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

这样,该项目将最终被打包成为一个 jar,能直接通过 java -jar 来运行

3.4.3 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-dubbo-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>dubbo-consumer</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.4.4 完整的项目结构

在这里插入图片描述

四、代码的完善

4.1 dubbo-api 代码的完善

4.1.1 定义 Dubbo 服务接口

Dubbo 服务接口是服务提供方与消费方的远程通讯契约,通常由普通的 Java 接口(interface)来声明。

在这里插入图片描述
代码如下:

public interface EchoService {
	String echo(String message);
}

4.1.2 项目的打包

Api 项目主要是为了把 rpc 中定义的接口发布出去。
我们可以使用 Maven 的普通打包方式把编译后的 class 文件打包为jar。
在这里插入图片描述
打包成功后,项目的 jar 位于:
在这里插入图片描述

4.2 dubbo-provider 代码的完善

4.2.1 添加依赖

在 dubbo-provider 的 pom.xml 的 dependencies 添加以下的依赖。

<dependencies>
	<dependency>
		<groupId>com.bjsxt</groupId>
		<artifactId>dubbo-api</artifactId>
		<version>1.0</version>
	</dependency>
</dependencies>

4.2.2 实现 dubbo-api 里面定义的接口

在这里插入图片描述
代码内容如下:

@Service
public class EchoServiceImpl implements EchoService {

	@Override
	public String echo(String message) {
		return "[echo] Hello, " + message;
	}
}

4.2.3 添加配置文件

在这里插入图片描述
内容如下:

dubbo:
	scan:
		# dubbo 服务扫描基准包
		base-packages: com.bjsxt.service.impl
	cloud:
		subscribed-services: dubbo-provider
	protocol:
		# dubbo 协议
		name: dubbo
		# dubbo 协议端口( -1 表示自增端口,从 20880 开始)
		port: -1
	registry:
		# 挂载到 Spring Cloud 注册中心
		address: spring-cloud://localhost
spring:
	application:
		# Dubbo 应用名称
		name: dubbo-provider
	main:
		# Spring Boot 2.1 需要设定
		allow-bean-definition-overriding: true
	cloud:
		nacos:
			# Nacos 服务发现与注册配置
			discovery:
				server-addr: localhost:8848

4.2.4 启动类

在这里插入图片描述
代码如下:

@SpringBootApplication
@EnableDiscoveryClient
public class ProviderServiceApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProviderServiceApplication.class, args) ;
	}
}

4.3 dubbo-consumer 代码的完善

4.3.1 添加依赖

在 dubbo-consumer 的 pom.xml 的 dependencies 添加以下依赖:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>com.bjsxt</groupId>
		<artifactId>dubbo-api</artifactId>
		<version>1.0</version>
	</dependency>
	<!-- Dubbo Spring Cloud Starter -->
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-dubbo</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
	</dependency>
</dependencies>

4.3.2 添加配置文件

在这里插入图片描述
内容如下:

dubbo:
	registry:
		# 挂载到 Spring Cloud 注册中心
		address: nacos://127.0.0.1:8848
	cloud:
		subscribed-services: dubbo-provider
server:
	port: 8080
spring:
	application:
	# Dubbo 应用名称
	name: dubbo-consumer
	main:
		# Spring Boot 2.1 需要设定
		allow-bean-definition-overriding: true
	cloud:
		nacos:
			# Nacos 服务发现与注册配置
			discovery:
				server-addr: 127.0.0.1:8848

4.3.3 启动类

在这里插入图片描述
代码如下:

@EnableDiscoveryClient
@SpringBootApplication
@RestController
public class ConsumerServiceApplication {

	@Reference
	private EchoService echoService ;
	
	public static void main(String[] args) {
		SpringApplication.run(ConsumerServiceApplication.class,args) ;
	}
	
	@GetMapping("/rpc")
	public ResponseEntity<String> rpc(){
		return ResponseEntity.ok(String.format("调用结果
	为%s",echoService.echo("info")));
	}
}

4.4 远程调用测试

  • 启动 Nacos-Server
  • 启动 dubbo-provider
  • 启动 dubbo-consumer
    查看 Nacos 控制台:
    http://localhost:8848/nacos/
    在这里插入图片描述
    浏览器访问:
    在这里插入图片描述
    调用已经成功;

五、负载均衡调用测试

5.1 启动多个服务的提供者

修改服务提供者里面实现类的代码:
在这里插入图片描述
启动多个:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再次使用 Clt+D 复制一个:
在这里插入图片描述
启动这 2 个:
在这里插入图片描述
现在,共有 3 台同时运行:
在这里插入图片描述
查看 Nacos:
在这里插入图片描述

5.2 使用消费者负载均衡调用测试

访问:
http://localhost:8080/rpc
在这里插入图片描述
在这里插入图片描述
负载均衡测试成功。

Spring Cloud Alibaba RocketMQ

一、RocketMQ 介绍

在这里插入图片描述
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能

二、RocketMQ 基本使用

2.1 下载 RocketMQ

使用浏览器打开:
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
在这里插入图片描述

2.2 RocketMQ 目录分析

将该压缩包复制到软件目录里面,使用压缩软件进行解压。
在这里插入图片描述
Benchmark:包含一些性能测试的脚本;
Bin:可执行文件目录;
Conf:配置文件目录;
Lib:第三方依赖;
LICENSE:授权信息;
NOTICE:版本公告;

2.3 配置环境变量

找到配置环境变量的对话框:
在这里插入图片描述
点击新建创建一个环境变量:
在这里插入图片描述

  • 变量名:ROCKETMQ_HOME
  • 变量值:D:\devtools\rocketMQ\rocketmq-all-4.4.0-bin-release

2.4 RocketMQ 的启动

我们进入到${rocketMQ}/bin,在此目录里面启动和停止命令。

2.4.1 启动 NameServe

在这里插入图片描述
注意:弹出的黑窗口不要关闭。

2.4.2 启动 Broker

./mqbroker.cmd -n localhost:9876

其中:
-n localhost:9876是为了指定 nameserver 的地址
在这里插入图片描述

2.5 RocketMQ 的停止

直接把弹出的黑框关闭,即可停止 RocketMQ 的 namesrv 和 broker。

2.6 RocketMQ 控制台的安装

Rocketmq 控制台可以可视化 MQ 的消息发送!

2.6.1 下载 RocketMQ 控制台

2.6.2 复制到软件目录里面

在这里插入图片描述

2.6.3 运行该 jar

java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876

在这里插入图片描述
其中:
在这里插入图片描述
运行成功后:
访问:
http://localhost:8080/#/
在这里插入图片描述

三、Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。Spring Cloud Stream 内部有两个概念:Binder 和 Binding:

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的Binder 实现。

    举例说明:
    Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

  • Binding: 包括 Input Binding 和Output Binding。
    Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

四、测试框架搭建

我们将创建 spring-cloud-bus-rocketmq-example 项目,用来测试它的所有功能。
在这里插入图片描述

4.1 搭建 spring-cloud-bus-rocketmq-example

spring-cloud-bus-rocketmq-example 将去除子模块的公共依赖部分。

4.1.1 使用 IDEA 创建一个 Maven 项目

在这里插入图片描述
选择 Maven 项目:
在这里插入图片描述
点击 Next ,填写以下的内容:
在这里插入图片描述
Parent:我们选择 spring-cloud-alibaba-examples
Name:spring-cloud-bus-rocketmq-example
其他的项保持不变。
点击 Finish 完成创建。

4.1.2 添加依赖

打开项目的 pom.xml 文件,我们添加以下的内容:

<dependencies>
	<dependency>
		<groupId>com.alibaba.cloud</groupId>
		<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
</dependencies>

4.1.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-alibaba-examples</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
	<packaging>pom</packaging>
	<modules>
		<module>rocketmq-produce-example</module>
	</modules>
	<dependencies>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.2 搭建 rocketmq-produce-example

produce 代表服务的生产者,用来发送消息。

4.2.1 使用 IDEA 创建一个 Maven 项目

在这里插入图片描述
选择 Maven:
在这里插入图片描述
点击 Next 添加以下的内容:
在这里插入图片描述
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-produce-example
点击 Finish 完成项目的创建

4.2.2 修改 Maven 的打包方式

此项目我们以后可能需要使用 jar 发布,在此,我们添加 spring-boot 的打包插件:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

4.2.3 完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>rocketmq-produce-example</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.3 搭建 rocketmq-produce-example

4.3.1 使用 IDEA 创建一个 Maven 项

在这里插入图片描述
选择 Maven:
在这里插入图片描述
点击 Next 添加以下的内容:
在这里插入图片描述
Parent:spring-cloud-bus-rocketmq-example
Name:rocketmq-consumer-example
点击 Finish 完成项目的创建

4.3.2 修改 Maven 的打包方式

为了以后打包为一个 jar 发布,我们添加一个打包插件:

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
	</plugins>
</build>

4.3.3 完整的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<parent>
		<artifactId>spring-cloud-bus-rocketmq-example</artifactId>
		<groupId>com.bjsxt</groupId>
		<version>1.0</version>
	</parent>
	<modelVersion>4.0.0</modelVersion>
	<artifactId>rocketmq-consume-example</artifactId>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

4.4 项目的完整结构如下

在这里插入图片描述

五、完善 rocketmq-produce-example 项目

5.1 添加一个配置文件

在这里插入图片描述
配置信息如下:

logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
# rocketmq 服务器nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

# stream->binding->output(input)
# output1
# 发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
# 消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
# 生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
# 消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true

# output2 主要样式事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
# 发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup

# output3 用它样式消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group

spring.application.name=rocketmq-produce-example

server.port=28081

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

5.2 添加一个启类

在这里插入图片描述

@SpringBootApplication
public class RocketMQProduceApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQProduceApplication.class, args);
	}
}

5.3 添加 MQSource

在这里插入图片描述
在 Source 里面定义输出:

/**
* 读取我们配置文件里的output
*/
public interface MQSource {
	@Output("output1")
	MessageChannel output1() ;
	
	@Output("output2")
	MessageChannel output2() ;//演示事务消息的发送
	
	@Output("output1")
	MessageChannel output3() ;
}

4.5 添加配置类

在这里插入图片描述
代码如下:

@Configuration
@EnableBinding({MQSource.class})
public class MQConfig {
}

5.4 添加发送消息的类

在这里插入图片描述

@Service
public class SendService {

	@Autowired
	private MQSource source;
	
	/**
	* 发送简单的测试消息
	* @param msg
	* @throws Exception
	*/
	public void send(String msg) throws Exception {
	source.output1().send(MessageBuilder.withPayload(msg).build());
	}
	
	/**
	* 发消息时添加标签
	* @param msg
	* @param tag
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendWithTags(T msg, String tag) throws Exception {
		Message message = MessageBuilder.createMessage(msg,
			new MessageHeaders(Stream.of(tag).collect(Collectors
				.toMap(str -> MessageConst.PROPERTY_TAGS,String::toString))));
		source.output1().send(message);
	}
	/**
	* 发送一个对象消息
	* @param msg
	* @param tag
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendObject(T msg, String tag) throws Exception {
	Message message = MessageBuilder.withPayload(msg)
		.setHeader(MessageConst.PROPERTY_TAGS, tag)
		.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON)
		.build();
	source.output1().send(message);
	}
	/**
	* 发送事务的消息
	* @param msg
	* @param num
	* @param <T>
	* @throws Exception
	*/
	public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
		MessageBuilder builder = MessageBuilder.withPayload(msg)
			.setHeader(MessageHeaders.CONTENT_TYPE,MimeTypeUtils.APPLICATION_JSON);
		builder.setHeader("test", String.valueOf(num));
		Message message = builder.build();
		source.output2().send(message);
	}
		public void sendMassiveMessage(String msg) {
			source.output3().send(MessageBuilder.withPayload(msg).build());
		}
}

5.6 事务消息往往需要我们监听回查

新建一个类:
在这里插入图片描述
在这里插入图片描述
代码如下:

/**
* TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
* TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费。
* TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
*/

@RocketMQTransactionListener(
	txProducerGroup = "myTxProducerGroup",
	corePoolSize = 5,
	maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
	/**
	*消息生产者需要在 executeLocalTransaction 中执行本地事务,当事务半消息提交成功,执行完
	毕后需要返回事务状态码。
	* @param msg
	* @param o
	* @return
	*/
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o)
	{
		Object num = msg.getHeaders().get("test");
		if ("1".equals(num)) {
			System.out.println(
				"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
			return RocketMQLocalTransactionState.UNKNOWN; // 将会导致再次查询本地事务
		}
		else if ("2".equals(num)) {
			System.out.println(
				"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
			return RocketMQLocalTransactionState.ROLLBACK; // 半消息将会被 mq 服务器删除,并且消费者不会消费到该消息
		}
		System.out.println(
			"executer: " + new String((byte[]) msg.getPayload()) + " commit");
		return RocketMQLocalTransactionState.COMMIT; // 半消息提交,消费者会消费到该消息。
	}

	/**
	* 实现 checkLocalTransaction 方法,该方法用于进行本地事务执行情况回查,并回应事务状态给
	MQ 的 broker,
	* 执行完成之后需要返回对应的事务状态码
	* @param message
	* @return
	*/
	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
		System.out.println("check: " + new String((byte[]) message.getPayload()));
		return RocketMQLocalTransactionState.COMMIT;
	}
}

5.7 构建一个简单的模型

在这里插入图片描述
代码如下:
在这里插入图片描述

5.8 测试消息的发送

在这里插入图片描述

@RestController
public class SendMessageController {
	
	@Autowired
	private SendService sendService ;
	
	/**
	* 发送一个简单的消息
	* @param msg
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/simple")
	private ResponseEntity<String> sendSimpleMessage( @RequestParam(required = true)
	String msg) throws Exception {
		sendService.send(msg);
		return ResponseEntity.ok("发送成功") ;
	}
	/**
	* 发送消息并且带上标签
	* @param msg 消息
	* @param tags 消息的标签
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/tags")
		private ResponseEntity<String> sendMessageWithTag( @RequestParam(required = true)
	String msg,@RequestParam(required = true)String tags) throws Exception {
		sendService.sendWithTags(msg,tags);
		return ResponseEntity.ok("发送成功") ;
	}
	/**
	* 发送对象消息
	* @param user
	* @param tags
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/object")
	public ResponseEntity<String> sendObjectMessage(User user,String tags) throws
	Exception {
		sendService.sendObject(user,tags);
		return ResponseEntity.ok("发送成功") ;
	}
	
	/**
	* 发送一个事务消息,也就是 half 消息
	* @param msg
	* @param num 类型
	* @return
	* @throws Exception
	*/
	@GetMapping("/send/transaction")
	public ResponseEntity<String> sendTransactionMessage(String msg ,int num) throws
	Exception {
		sendService.sendTransactionalMsg(msg,num);
		return ResponseEntity.ok("发送成功") ;
	}
/**
* 发送很多消息
* @param msg
* @return
* @throws Exception
*/
@GetMapping("/send/poll")
public ResponseEntity<String> sendMassiveMessage(String msg) throws Exception {
	sendService.sendMassiveMessage(msg);
	return ResponseEntity.ok("发送成功") ;
	}
}

5.9 启动类

在这里插入图片描述
代码如下:

@SpringBootApplication
public class RocketMQProduceApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQProduceApplication.class, args);
	}
}

六、完善 rocketmq-consumer-example 项目

6.1 添加配置文件

在这里插入图片描述
内容如下:

# rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

# stream->bindings->input
# input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true

# input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1

# input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20

# input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5

# input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group

spring.application.name=rocketmq-consume-example

server.port=28082

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

6.2 添加一个 Sink

在这里插入图片描述
在 Sink 里面添加输入:

public interface Sink {
	@Input("input1")
	SubscribableChannel input1();
	
	@Input("input2")
	SubscribableChannel input2();
	
	@Input("input3")
	SubscribableChannel input3();
	
	@Input("input4")
	SubscribableChannel input4();
	
	@Input("input5")
	PollableMessageSource input5();
}

6.3 创建消息的监听器

在这里插入图片描述

/**
* receive
*/
@Service
public class ReceiveService {

	@StreamListener("input1")
	public void receiveInput1(String receiveMsg) {
		System.out.println("input1 receive: " + receiveMsg);
	}
	
	@StreamListener("input2")
	public void receiveInput2(String receiveMsg) {
		System.out.println("input2 receive: " + receiveMsg);
	}
	
	@StreamListener("input3")
	public void receiveInput3(@Payload User user) {
		System.out.println("input3 receive: " + user);
	}
	
	@StreamListener("input4")
	public void receiveTransactionalMsg(String transactionMsg) {
		System.out.println("input4 receive transaction msg: " + transactionMsg);
	}
}

6.4 主动去 mq 服务器拉取消息

在这里插入图片描述
使用定时任务,主动去服务器拉取消息:

@Service
public class PullMessageTask {
	
	@Autowired
	private Sink sink ;
	
	@Scheduled(fixedRate = 5*1000)
	public void pullMessage(){
		sink.input5().poll((message) -> {
			String payload = (String) message.getPayload();
			System.out.println("pull msg: " + payload);
		}, new ParameterizedTypeReference<String>() {
		});
	}
}

6.5 模型类

直接从 produce 里面复制过来:
在这里插入图片描述

6.6 配置类

新建 MQConfig:
在这里插入图片描述
代码如下:

@Configuration
@EnableBinding({Sink.class})
public class MQConfig {
}

6.7 启动类

@SpringBootApplication
@EnableScheduling
public class RocketMQConsumerApplication {
	public static void main(String[] args) {
		SpringApplication.run(RocketMQConsumerApplication.class ,args);
	}
}

七、测试案例测试

7.1 启动服务

启动 2 个服务:

  • rocketmq-produce-example
  • rocketmq-consumer-example
    在这里插入图片描述

7.2 发送消息测试

7.2.1 发送简单的字符串

http://localhost:28081/send/simple?msg=RocketMQ
在这里插入图片描述
在这里插入图片描述

7.2.2 发送带标签的消息

7.2.3 发送对象消息

http://localhost:28081/send/object?id=1&userName=bjsxt&password=123456&tags=xxx

在这里插入图片描述

7.2.4 发送事务消息

http://localhost:28081/send/transaction?msg=order&num=1

在这里插入图片描述

http://localhost:28081/send/transaction?msg=order&num=2

在这里插入图片描述

http://localhost:28081/send/transaction?msg=order&num=3

在这里插入图片描述

7.2.4 手动拉取消息

http://localhost:28081/send/poll?msg=order

在这里插入图片描述

标签:dubbo,stream,spring,Alibaba,rocketmq,Cloud,Spring,msg,cloud
来源: https://blog.csdn.net/qq_42588990/article/details/121572310

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有