标签:SpringBoot springframework annotation P2P org import ActiveMQ public
OK,上一篇博客已经讲述了 ActiveMQ 安装到 Windows 下,接下来,整合 SpringBoot,并实现 P2P 通讯。
我们使用 IDEA 编辑器新建一个 SpringBoot 项目,取名:ActiveMQTest,整体项目结构如下:
第一步,在 pom.xml 文件增加依赖,完整代码如下(我这里使用的 SpringBoot 是 2.0.8 版本,目前都使用 2.0+ 版本了):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>ActiveMQTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--Spring boot 集成包-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
bootstrap.yml 配置如下:
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
# ActiveMQ 的队列名
queueName: myQueueName
server:
port: 8080
# 将SpringBoot项目作为单实例部署调试时,不需要注册到注册中心
eureka:
client:
fetch-registry: false
register-with-eureka: false
说明一下,我们配置 ActiveMQ 的 broker-url 的端口是 61616,tcp 协议,不可以使用其它端口。上一篇博客有说明这个端口。
我们接下来编写配置文件 QueueConfig 的代码,如下:
package com.test.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
/**
* @author biandan
* @signature 让天下没有难写的代码
* @create 2021-03-19 下午 11:23
*/
@Configuration
public class QueueConfig {
/**
* 获取配置文件的队列名
*/
@Value("${queueName}")
private String queueName;
/**
* 注册 jms的队列到 Spring 容器
* @return
*/
@Bean
public Queue registerQueue(){
return new ActiveMQQueue(queueName);
}
}
接下来,我们创建消息生产者的类 P2PProducer,代码如下:
package com.test.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author biandan
* @description 点对点消息生产者
* @signature 让天下没有难写的代码
* @create 2021-03-21 下午 10:55
*/
@EnableScheduling
@Component
public class P2PProducer {
private final static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
@Autowired
private JmsMessagingTemplate template;
@Autowired
private Queue queue;
//每2秒执行一次方法,产生一条消息
@Scheduled(fixedDelay = 1000 * 2)
public void sendMsg() {
String msg = "【发送方】产生一条消息:" + SDF.format(new Date());
System.out.println(msg);
template.convertAndSend(queue, msg);
}
}
说明:
这里自动装配的 Queue,就是 QueueConfig 配置类里加入到 Spring 容器管理的 Queue 队列。使用的是 Spring IOC 原理。
然后,我们创建 SpringBoot 启动类:ProducerApplication,代码如下:
package com;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author biandan
* @signature 让天下没有难写的代码
* @create 2021-03-19 上午 12:07
*/
@SpringBootApplication
@EnableScheduling //启用任务调度
@EnableEurekaClient
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
接下来,我们直接启动程序,看控制台输出:每隔2秒,生产者就生产一条消息,推送到 ActiveMQ 服务器。
然后,我们打开 ActiveMQ 服务器看下,输入链接:http://127.0.0.1:8161/,用户名和密码都是默认的:admin
看到如下信息:
OK,生产者已经生产了很多消息了,接下来,我们开发一个消费者,把这些消息都消费掉。如代码结构图,我们创建类:P2PConsumer,代码如下:
package com.test.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* @author biandan
* @description 点对点消息消费者
* @signature 让天下没有难写的代码
* @create 2021-03-21 下午 11:08
*/
@Component
public class P2PConsumer {
//使用 Jms 监听器,监听生产者的消息
@JmsListener(destination = "${queueName}")
public void receiveMsg(String msg){
System.out.println("***接收方***接收到的消息:"+msg);
}
}
OK,我们重新启动程序,看到控制台如下输出:消费者把之前的消息都消费了,然后生产者一旦产生新消息,就被监听器监听到,消费者就马上消费掉消息了。
我们再去 ActiveMQ 管理台看下:等待消费的消息【Number Of Pending Messages】已经没有了,一产生就被消费掉了。进入队列的消息【Messages Enqueued】一直在递增。
OK,ActiveMQ 的 点对点通讯我们学习完毕。
参考代码地址在百度网盘:链接: https://pan.baidu.com/s/1NcKRRgJpg9VLa3ZPnEnkiA 提取码:3pnq
标签:SpringBoot,springframework,annotation,P2P,org,import,ActiveMQ,public 来源: https://blog.csdn.net/BiandanLoveyou/article/details/114994170
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。