可能有一个愚蠢的简单答案,但我正在尝试使用ActiveMQ在生产者和消费者之间传递消息.我将有许多生产者和许多消费者,但我希望每个消息只在消费者中传递一次.这似乎意味着我不能使用主题,因为它们会向正在收听的所有消费者传递消息,并且我只希望一个消费者接收每条消息.
我的问题是我能够接收消息,但消息没有出列.因此,如果我重新启动我的消费者流程,则会重新处理所有消息. This answer似乎是相关的,但似乎并不适用,因为我无法创建持久的队列使用者,只有持久的主题消费者(除非我在API文档中遗漏了一些内容).
我的代码如下.
TopicConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn = factory.createConnection();
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);
然后是
Message msg = consumer.receive();
msg.acknowledge();
if (!(msg instanceof TextMessage)) continue;
String msgStr = ((TextMessage)msg).getText();
这是我目前的代码.我尝试过Session.AUTO_ACKNOWLEDGE而没有使用msg.acknowledge().此代码的任何工作排列似乎都会检索消息,但是当我重新启动消费者时,即使在重新启动之前已收到所有消息,也会再次收到所有消息.
解决方法:
您将会话创建为事务会话,因此如果要通知代理现在已消耗所有消息且不需要重新传递,则需要调用session.commit.如果你没有将createSession的第一个参数设置为true,那么Ack模式会被尊重,否则它会被忽略,这是我害怕的JMS API的一个奇怪之处.如果你这样做:
ConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);
然后这将工作:
Message msg = consumer.receive();
msg.acknowledge();
否则你需要做:
Message msg = consumer.receive();
session.commit();
但请记住,对于单个消息,事务对客户端确实没有意义,没有事务是更好的选择.
标签:java,jms,activemq 来源: https://codeday.me/bug/20190902/1787711.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。