ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

activemq 事务和签收

2022-08-11 18:04:55  阅读:144  来源: 互联网

标签:事务 Session jms 签收 session import activemq javax


activemq事务:

Connection类中的createSession有两个参数,第一个参数是是否开启事务(true/false);第二个参数是消息确认机制;当第一个参数设置为true,即开启事务;当开启事务时,activemq不会主动提交事务,需要我们手动提交。即需要额外执行commit方法;

  • 生产者开启事务后,先执行send方法,再执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行roolback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要,如果被成功提交,那么消息就自动签收,如果回滚了,则改消息需要被重新发送。
  • 消费者开启事务后,先执行send方法,再执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和roolback方法,甚至能够违反一个消费者只能消费一次消息的原理;

生产者事务代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Queue;
 9 import javax.jms.Session;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqProducerTransactionCanDelete {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String QUEUE_NAME = "queue_01";
16 
17     public static void main(String[] args) {
18         // 创建连接工厂,按照给定的URL,采用默认用户名密码
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = null;
22         Session session = null;
23         try {
24             conn = activeMQConnectionFactory.createConnection();
25 
26             conn.start();
27             // 创建session会话 开启事务
28             session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
29             // 创建目的地 (具体是队列还是主题topic)
30             Queue queue = session.createQueue(QUEUE_NAME);
31             // 创建消息的生产者
32             MessageProducer messageProducer = session.createProducer(queue);
33             messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
34             // Byte类型的数据
35             ObjectMessage message = session.createObjectMessage();
36             User user = new User();
37             user.setAddress("嘉兴");
38             user.setName("Joy");
39             message.setObject(user);
40             message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
41             messageProducer.send(message);
42 
43             messageProducer.close();
44 
45             // 模拟异常
46             // int a = 10 / 0;
47             session.commit();
48         } catch (Exception e) {
49             try {
50                 session.rollback();
51             } catch (JMSException e1) {
52                 e1.printStackTrace();
53             }
54             e.printStackTrace();
55         } finally {
56             try {
57                 if (session != null) {
58                     session.close();
59                 }
60                 if (conn != null) {
61                     conn.close();
62                 }
63             } catch (JMSException e) {
64                 e.printStackTrace();
65             }
66         }
67 
68         System.out.println("发送消息成功");
69     }
70 
71 }
View Code

消费者代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.Queue;
 8 import javax.jms.Session;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class TestActiveMqConsumerTxCanDelete {
13     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
14     private static final String QUEUE_NAME = "queue_01";
15 
16     @SuppressWarnings("unchecked")
17     public static void main(String[] args) {
18         // 创建连接工厂,按照给定的URL,采用默认用户名密码
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         activeMQConnectionFactory.setTrustAllPackages(true);
21         // 通过连接工厂 获取connection 并启动访问
22         Connection conn = null;
23         Session session = null;
24         try {
25             conn = activeMQConnectionFactory.createConnection();
26 
27             conn.start();
28             // 创建session会话
29             session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
30             // 创建目的地 (具体是队列还是主题topic)
31             Queue queue = session.createQueue(QUEUE_NAME);
32 
33             // 创建消息的生产者
34             MessageConsumer messageConsumer = session.createConsumer(queue);
35             /**
36              * 同步阻塞方式(receive()) 订阅者或者接受者调用MessageConsumer的receive()方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞;
37              */
38             while (true) {
39                 ObjectMessage message = (ObjectMessage) messageConsumer.receive();
40                 System.out.println("*********************");
41                 if (message != null) {
42                     System.out.println(message.getStringProperty("StringProperty"));
43                     User user = (User) message.getObject();
44                     System.out.println(user);
45                 } else {
46                     break;
47                 }
48                 System.out.println("*********************");
49             }
50 
51             messageConsumer.close();
52             // 模拟异常
53             // int a = 10 / 0;
54             // 必须要加commit,这样才算正在被消费
55             session.commit();
56         } catch (JMSException e) {
57             try {
58                 session.rollback();
59             } catch (JMSException e1) {
60                 e1.printStackTrace();
61             }
62             e.printStackTrace();
63         } finally {
64             try {
65                 if (session != null) {
66                     session.close();
67                 }
68                 if (conn != null) {
69                     conn.close();
70                 }
71             } catch (JMSException e) {
72                 e.printStackTrace();
73             }
74 
75         }
76     }
77 
78 }
View Code

 

签收:

签收一般应用在消费者,对于生产者,无论是自动签收还是手动签收,消息都能进入到队列中;

在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务的优先级大于签收,当事务模式,如果事务没有提交,即使客户端签收也不会消费消息。非事务性会话中,消息何时被确认取决于创建会话时的应答模式(ACKNOWLEDGE),所以这里所讲的签收,前提是非事务模式;即Connection类中的createSession的两个参数,第一个参数是是否开启事务是false;第二个参数是设置签收模式;

签收模式:

1.Session.AUTO_ACKNOWLEDGE

当消息从MessageConsumer的receive方法返回或者从MessageListener接口的onMessage方法返回时,会话自动确认消息签收

2.Session.CLIENT_ACKNOWLEDGE

需要消费者客户端主动调用acknowledge方法签收消息,这种模式实在Session层面进行签收的,签收一个已经消费的消息会自动的签收这个Session已消费的所有消息:

例如一个消费者在一个Session中消费了5条消息,然后确认第3条消息,所有这5条消息都会被签收

3.Session.DUPS_OK_ACKNOWLEDGE(一般不建议使用)

这种方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.

这种方式会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息才可使用。(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

4.Session.DUPS_OK_ACKNOWLEDGE(很少使用)

事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到

消费者自动签收Session.AUTO_ACKNOWLEDGE:

activemq 之hello world 显示的消费者签收都是自动签收,这里不再赘述;

消费者手动签收Session.CLIENT_ACKNOWLEDGE:

设置手动签收,需要设置签收(message.acknowledge());如果不设置签收,则后台管理会显示 消费未消费;如果又多个消费者,都是手动签收,其中又一个消费者没有设置签收(即没有message.acknowledge()),但是还是会显示所有消息已签收;

 

消费者签收代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.Queue;
 8 import javax.jms.Session;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class TestActiveMqConsumerTxCanDelete {
13     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
14     private static final String QUEUE_NAME = "queue_01";
15 
16     public static void main(String[] args) throws JMSException {
17         // 创建连接工厂,按照给定的URL,采用默认用户名密码
18         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
19         activeMQConnectionFactory.setTrustAllPackages(true);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = activeMQConnectionFactory.createConnection();
22 
23         conn.start();
24         // 创建session会话
25         Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
26         // 创建目的地 (具体是队列还是主题topic)
27         Queue queue = session.createQueue(QUEUE_NAME);
28 
29         // 创建消息的生产者
30         MessageConsumer messageConsumer = session.createConsumer(queue);
31         /**
32          * 同步阻塞方式(receive()) 订阅者或者接受者调用MessageConsumer的receive()方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞;
33          */
34         while (true) {
35             ObjectMessage message = (ObjectMessage) messageConsumer.receive();
36             System.out.println("*********************");
37             if (message != null) {
38                 System.out.println(message.getStringProperty("StringProperty"));
39                 User user = (User) message.getObject();
40                 System.out.println(user);
41                 message.acknowledge();// 设置签收
42             } else {
43                 break;
44             }
45             System.out.println("*********************");
46         }
47 
48         messageConsumer.close();
49         session.close();
50         conn.close();
51     }
52 
53 }

 

标签:事务,Session,jms,签收,session,import,activemq,javax
来源: https://www.cnblogs.com/lixiuming521125/p/16566022.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有