ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ActiveMq----基础篇

2021-09-03 19:32:16  阅读:152  来源: 互联网

标签:Session 队列 基础 ---- connection session 消息 ActiveMq public


ActiveMQ 消息中间件

1.消息中间件的种类

KafKa

RabbitMQ

RocketMQ

ActiveMQ

ActiveMQ

API发送和接受

MQ的高可用性

MQ的集群和容错配置

MQ的持久化

延时发送/定 时投递

签收机制

Spring整合

下载地址

https://activemq.apache.org/components/classic/download/

linux下安装

1.先解压这个压缩包

2.进入bin目录

3.给启动文件权限 chmod 777 activemq

4.普通启动 ./activemq start

5.activemq的默认端口是61616

6.关闭./activemq stop

7.启动生成指定log ./activemq start > /usr/local/src/apache-activemq-5.15.14/runlog.log

8.通过window访问activemq图形化页面 http://159.75.49.61:8161/ 默认密码是 admin/admin

队列

JMS开发基本步骤

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8oJtf6ZG-1630668014212)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425162128674.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JF2SfUu9-1630668014213)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425162346746.png)]

消息生产者编码

1.先创建一个SpringBoot项目

2.添加pom依赖

   <!--activemq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.14</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>

3.创建一个主启动类

	  //MQ的地址
	  public static final String ACTIVEMQ_URL="tcp://159.75.49.61:61616";
	  //创建的队列名
	  public static final String QUEUE_NAME="queue01";
	  public static void main(String[] args) throws JMSException {
//	  	    1.创建连接工厂 指定URL
			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//			2.连接工厂获取连接Connection
			Connection connection = activemqFactory.createConnection();
			connection.start();
//			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//			4.创建目的地(分两个一个是队列一个是主体topic)
			Queue queue = session.createQueue(QUEUE_NAME);
//			5.创建消息生产者
			MessageProducer msgproducer = session.createProducer(queue);

			for (int i = 1; i <4 ; i++) {
//				  创建消息
				  TextMessage textMessage = session.createTextMessage("这是第" + i + "条消息");
//				  生产者推送消息  发送给MQ
				  msgproducer.send(textMessage);
			}
//			6.释放资源
			msgproducer.close();
			session.close();
			connection.close();
			System.out.println("消息发送完成奥利给!");
	  }

消息消费者编码

  //MQ的地址
	  public static final String ACTIVEMQ_URL="tcp://159.75.49.61:61616";
	  //创建的队列名
	  public static final String QUEUE_NAME="queue01";
	  public static void main(String[] args) throws JMSException {
//	  	    1.创建连接工厂 指定URL
			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//			2.连接工厂获取连接Connection
			Connection connection = activemqFactory.createConnection();
			connection.start();
//			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//			4.创建目的地(分两个一个是队列一个是主体topic)
			Queue queue = session.createQueue(QUEUE_NAME);

//			5.创建消费者
			MessageConsumer msgconsumer = session.createConsumer(queue);

			while (true){
//				  接受消息 receive不带参数表示一直等一直等 带参数表示等多久过时不候
				TextMessage textMessage=(TextMessage) msgconsumer.receive(4000L);
				if(textMessage!=null){
					  System.out.println("消费者收到消息:"+textMessage.getText());
				}else{
					  break;
				}
			}
//			6.释放资源
			msgconsumer.close();
			session.close();
			connection.close();


	  }

通过监听的方式 消费消息

//			通过监听的方式消费 消息
			msgconsumer.setMessageListener(new MessageListener() {
				  @Override
				  public void onMessage(Message message) {
						if(null != message && message instanceof TextMessage){
								TextMessage textMessage=(TextMessage) message;
								try {
									  System.out.println("消费者接收消息"+textMessage.getText());
								}catch (JMSException e){
									  e.printStackTrace();
								}
						}
				  }
			});
			//保证控制台不关闭 如果不使用 会监听不到消息
			System.in.read();
			msgconsumer.close();
			session.close();
			connection.close();

订阅 topic

仔细看图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xTdAKEdI-1630668014214)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425164407778.png)]

前面都是队列 机制都是 1—1

本次是订阅 topic 机制为 1----多

**先启动消费者 再启动生产者 **

Topic生产者

   //MQ的地址
	  public static final String ACTIVEMQ_URL="tcp://159.75.49.61:61616";
	  //创建的队列名
	  public static final String TOPIC_NAME="topic--xiaolin01";
	  public static void main(String[] args) throws JMSException {
//	  	    1.创建连接工厂 指定URL
			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//			2.连接工厂获取连接Connection
			Connection connection = activemqFactory.createConnection();
			connection.start();
//			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//			4.创建目的地(分两个一个是队列一个是主体topic)
			Topic topic = session.createTopic(TOPIC_NAME);
//			5.创建消息生产者
			MessageProducer msgproducer = session.createProducer(topic);

			for (int i = 1; i <=6 ; i++) {
//				  创建消息
				  TextMessage textMessage = session.createTextMessage("这是第" + i + "条消息------Topic消息订阅");
//				  生产者推送消息  发送给MQ
				  msgproducer.send(textMessage);
			}
//			6.释放资源
			msgproducer.close();
			session.close();
			connection.close();
			System.out.println("订阅--消息发送完成奥利给!");
	  }

Topic消费者

  //MQ的地址
	  public static final String ACTIVEMQ_URL="tcp://159.75.49.61:61616";
	  //跟生产者名字一样
	  public static final String TOPIC_NAME="topic--xiaolin01";
	  public static void main(String[] args) throws JMSException, IOException {
			System.out.println("------3号消费者");
//	  	    1.创建连接工厂 指定URL
			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//			2.连接工厂获取连接Connection
			Connection connection = activemqFactory.createConnection();
			connection.start();
//			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//			4.创建目的地(分两个一个是队列一个是主体topic)
			Topic topic = session.createTopic(TOPIC_NAME);

//			5.创建消费者
			MessageConsumer msgconsumer = session.createConsumer(topic);

//			通过监听的方式消费 消息
			msgconsumer.setMessageListener(new MessageListener() {
				  @Override
				  public void onMessage(Message message) {
						if(null != message && message instanceof TextMessage){
							  TextMessage textMessage=(TextMessage) message;
							  try {
									System.out.println("消费者接收消息"+textMessage.getText());
							  }catch (JMSException e){
									e.printStackTrace();
							  }
						}
				  }
			});
			//保证控制台不关闭 如果不使用 会监听不到消息
			System.in.read();
			msgconsumer.close();
			session.close();
			connection.close();
	  }

小总结

1.启动对比

队列:先启动生产者再启动消费者

订阅:先启动消费者再启动订阅者

2.消费对比

队列:消息不会重复消费 多个消费者会平均分配 (消息不会丢失)

订阅:消息会分配给每一个订阅过的消费者 进行消费 (消息会丢失)

3.看图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0XYlX7El-1630668014217)(file:///C:\Users\16202\AppData\Local\Temp\ksohtml5820\wps1.jpg)]

JMS是什么

JMS是JavaEE的13个核心模块的一个规范工业标准

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0XejFbnK-1630668014219)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425174634444.png)]

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等

用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IvNE73Aj-1630668014219)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425174846939.png)]

mq中间件的其他的落地的产品

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m3Q2tYmZ-1630668014220)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425175128906.png)]

消息队列的详细比较

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-42yaWIyz-1630668014221)(C:\Users\16202\AppData\Roaming\Typora\typora-user-images\image-20210425175519328.png)]

JMS的四大组成元素

JMS provider

实现 JMS 接口 和各种规范的消息中间件产品, 也就是 MQ服务器

JMS producer

消息的生产者,创建和发送JMS消息的客户端应用

JMS Consumer

消息的消费者,接收和处理晓得的客户端

JMS Message

JMS Message 分三大模块

  • 消息头

    1. JMSDestination

      消息发送的目的地,主要是指 Queue (队列)和Topic(主题)

    2. JMSDeliveryMode

      持久模式非持久模式

      持久性的消息:一次仅仅一次 ,如果JMS的提供者出现故障,该条消息不会丢失,他会在恢复之后再一次的传递!

      非持久性的消息:最多只会发送一次 ,如果提供者出现故障,该消息会永远丢失

    3. JMSExpiration

      可以设置消息在一定时间过期,默认是永不过期!

      消息过期时间,等于Destination的 send方法中的timeToLive值加上发送时刻的GMT时间值

      如果timeToLive值为0 表示该消息永不过期

      如果发送后,在消息过期的时间后 还没发送到目的地 这该消息会被清除!

    4. JMSPriority

      消息优先级

      分为0-9十个级别, 0-4为普通消息 5-9为加急消息

      JMS不要求MQ严格按照这个等级发送, 但是必须保证加急消息 先与普通消息到达!

    5. JMSMessageID

      唯一识别每个消息的标识由MQ产生

  • 消息体

    1. 封装具体的消息数据

    2. 五种消息体格式

      1. TextMessage

        普通字符串类型的消息 包含一个String

      2. MapMessage

        Map类型的消息,key为String 值为Java基本数据类型

      3. BytesMessage

        二进制的数据类型,包含一个byte[]

      4. StreamMessage

        Java数据流消息,用标准流操作来顺序的填充和读取

      5. ObjectMessage

        对象消息,包含一个可序列化的Java对象

    3. 发送和接收的消息体类型必须一致对应

  • 消息属性

    方便识别 ,去重,重点标注,如需除了消息头字段以外的值可以使用消息属性

JMS的可靠性

消息的持久性

默认消息为持久化

这是队列的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次 对于这些消息 可靠性是优先考虑的因素

可靠性的另一个重要方面是确保持久化消息传至目标后.消息服务在向消费者传送他们之前 不会丢失

  • 参数设置说明‘

    1. 非持久
      • messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 参数为1 ==非持久
      • 非持久化:当服务器宕机 消息不存在
    2. 持久
      • messageProducer.setDeliveryMode(PERSISTENT) 参数为2 ==持久
      • 持久化:当服务器宕机 消息依旧存在
  • 持久的 Queue

    	  //MQ的地址
    	  public static final String ACTIVEMQ_URL="tcp://47.113.102.7:61616";
    	  //创建的队列名
    	  public static final String QUEUE_NAME="queue01";
    	  public static void main(String[] args) throws JMSException {
    //	  	    1.创建连接工厂 指定URL
    			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    //			2.连接工厂获取连接Connection
    			Connection connection = activemqFactory.createConnection();
    			connection.start();
    //			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
    			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //			4.创建目的地(分两个一个是队列一个是主体topic)
    			Queue queue = session.createQueue(QUEUE_NAME);
    //			5.创建消息生产者
    			MessageProducer msgproducer = session.createProducer(queue);
    			//消息非持久化
    //			msgproducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    //			消息持久化
    			msgproducer.setDeliveryMode(2);
    			for (int i = 1; i <=6 ; i++) {
    //				  创建消息
    				  TextMessage textMessage = session.createTextMessage("这是第" + i + "条消息------这次丢给监听");
    //				  生产者推送消息  发送给MQ
    				  msgproducer.send(textMessage);
    			}
    //			6.释放资源
    			msgproducer.close();
    			session.close();
    			connection.close();
    			System.out.println("消息发送完成奥利给!");
    	  }
    
  • 持久的Topic

    一定要先运行一次消费者 等于向MQ注册 下次无论 消费者是否在线宕机 再次上线都会把未消费的 消息重新接收

      //MQ的地址
    	  public static final String ACTIVEMQ_URL="tcp://47.113.102.7:61616";
    	  //创建的队列名
    	  public static final String TOPIC_NAME="topic--xiaolin01";
    	  public static void main(String[] args) throws JMSException {
    //	  	    1.创建连接工厂 指定URL
    			ActiveMQConnectionFactory activemqFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    //			2.连接工厂获取连接Connection
    			Connection connection = activemqFactory.createConnection();
    
    //			3.创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
    			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //			4.创建目的地(分两个一个是队列一个是主体topic)
    			Topic topic = session.createTopic(TOPIC_NAME);
    //			5.创建消息生产者
    			MessageProducer msgproducer = session.createProducer(topic);
    			//订阅持久化
    			msgproducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    			connection.start();
    			for (int i = 1; i <=3 ; i++) {
    //				  创建消息
    				  TextMessage textMessage = session.createTextMessage("这是第" + i + "条消息------Topic消息订阅");
    //				  生产者推送消息  发送给MQ
    				  msgproducer.send(textMessage);
    			}
    //			6.释放资源
    			msgproducer.close();
    			session.close();
    			connection.close();
    			System.out.println("订阅--消息发送完成奥利给!");
    

事务

	   //创建会话Session   分别有两个参数 第一个为事务,第二个叫签收
         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

生产者事务

  1. true
    • 如果 为true 则消息进入缓冲区 不进入 消息队列 必须在结尾加上 session.commit();
  2. false
    • 为false则直接进入 队列

实用场景 如果消息发送出现异常 为true 可以使用 session.rollback();回滚

消费者事务

  1. true
    • 为true 且结尾未在结尾未添加 session.commit(); 消费者就会一直重复消费消息
  2. false
    • 为false 只消费一次

Acknowledge :签收

  1. 非事务

    • 自动签收(默认) ------------ Session.AUTO_ACKNOWLEDGE
    • 手动签收
      1. Session.CLIENT_ACKNOWLEDGE
      2. 客户端调用textMessage.acknowledge(); 如果不调用这个方法 则会重复签收
    • 允许重复消息--------------Session.DUPS_OK_ACKNOWLEDGE
  2. 事务

    //事务消费者  手动签收
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    

    事务高于 签收

    如果事务不在结尾加上 session.commit() 哪怕签收为手动签收 且调用了textMessage.acknowledge()方法 也会导致重复消费

    如果有session.commit() 哪怕签收不调用textMessage.acknowledge() 也不会重复消费

    如果事务回滚 则消息会被再次传送

    非事务会话 则取决与 是否调用textMessage.acknowledge() ;

  3. 签收和事务关系

小总结

JMS的点对点总结

点对点的模型是基于队列,生产者发消息到队列,消费者从消息队列进行消费,队列的本身存在是的消息的 异步传输 成为了可能

类型qq微信与朋友聊天 或者 发短信

  1. 在Session关闭时有部分消息被接收 到了 但是未被签收acknowledge(),那么等下次消费者连接到相同的队列 还会进行消费
  2. 队列可以长久的保存消息知道消费者成功接收到了消息. 消费者不需要担心消息的丢失而时刻保持跟队列的连接.充分展示异步传输的优势

JMS Pub/Sub模型 定义了如何向一个内容节点发布和订阅消息,这个节点被称为订阅 topic

主题可被认作是消息传递的中介,发布者 发布消息到主题 订阅者 从主题订阅消息

主题使得消息订阅者与消息发布者互相独立 不需要接触就能保证消息的传送.

标签:Session,队列,基础,----,connection,session,消息,ActiveMq,public
来源: https://blog.csdn.net/xiaolinBk/article/details/120088600

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有