ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ActiveMQ----->如何保证消息可靠性

2021-03-22 14:57:37  阅读:208  来源: 互联网

标签:事务 可靠性 Session 签收 topic connection session ----- ActiveMQ


一 持久化

1.1 API设置

java默认是持久化的

 

1.2 Topic持久化

默认情况下,topic发布时,如果订阅者不在线。消息就会丢失

但我们可以用持久化Topic来解决这个问题。

先启动订阅在启动生产

生产者

package com.ww.activemqdemo.consumer;





import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ClassName JmsProduce
 * @Description TODO
 * @Author DuanYueFeng
 * @Version 1.0
 **/
public class JmsProduce_Topic_p {
    public static final String URL = "tcp://192.168.196.4:61616";
    public static final String topic_name = "topic_p";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        Connection connection = factory.createConnection();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topic_name);
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();//start放后面

        for (int i = 1; i <=3; i++) {
            TextMessage textMessage = session.createTextMessage("Topic mag..." + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("完成....");
    }
}




订阅者

package com.ww.activemqdemo.consumer;


import org.apache.activemq.ActiveMQConnectionFactory;


import javax.jms.*;
import java.io.IOException;

/**
 * @ClassName JmsConsumer
 * @Description TODO
 * @Author DuanYueFeng
 * @Version 1.0
 **/
public class JmsConsumer_Topic_p {
    public static final String URL = "tcp://192.168.196.4:61616";
    public static final String topic_name = "topic_p";

    public static void main(String[] args) throws JMSException, IOException, InterruptedException {
        System.out.println("我是3号消费者。。。");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        Connection connection = factory.createConnection();
        connection.setClientID("z3");//订阅者的客户端id
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topic_name);
        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "remark..");//后面是订阅者的名称

        connection.start();//start也要延后

        Message message = durableSubscriber.receive();

        while (null!=message){
            TextMessage textMessage = (TextMessage)message;
            System.out.println("收到的持久化:"+textMessage.getText());
            message = durableSubscriber.receive(1000L);
        }

        System.in.read();
//        Thread.sleep(1000);
        session.close();
        connection.close();
        System.out.println("完成....");
    }
}

二 事务

事务偏生产者

2.1 生产者事务

事务的配置优先度高于应答模式(签收)

		//第一个参数是是否创建session,第二个参数表示应答模式
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

不开启事务时
只要执行send就进入到队列中

开启事务后
需要在最后使用session.commit();,某一条异常也可以用session.rollback();

2.2 消费者事务

不开启事务时
消息拿到就算作被消费了

开启事务时
没有commit,消息就不会算作被消费。会有重复消费的可能

 

三 签收

签收偏消费者

3.1 非事务模式

自动签收(默认)

Session.AUTO_ACKNOWLEDGE

手动签收

Session.CLIENT_ACKNOWLEDGE
客户端调用acknowledge方法手动签收

允许重复消息


Session.DUPS_OK_ACKNOWLEDGE

消息可重复确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

3.2 事务模式下
消费者事务开启,只有commit后才能将全部消息变为已消费,写ack没有作用,即使写了签收也不会签收出队

3.3 和事务的联系
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送
非事务会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement)
 

总结:在ActiveMQ保证消息可靠性的四种方式

 

1)进行消息的持久化:queue默认为持久化,topic模式下需要手动设置

2)开启消息的事务

3)在消费者的非事务状态下要开启消息手动签收功能

4)集群搭建实现高可用

 

标签:事务,可靠性,Session,签收,topic,connection,session,-----,ActiveMQ
来源: https://blog.csdn.net/m0_46405589/article/details/115078495

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有