ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java – 我正确实现ActiveMQ吗?实现事务处理会话并重试

2019-07-11 15:20:13  阅读:192  来源: 互联网

标签:java spring jms activemq


我正在尝试使用事务会话来支持回滚的JMS-ActiveMQ实现.
我是ActiveMQ的新手,我已经使用它的Java库进行了第一次实现.

当我运行我的应用程序时,我看到消息已成功入队并出列.我还可以看到相应的DLQ是自动生成的.但是,我不确定我是否正确配置了redeliverypolicy.截至目前它已在生产者上配置,但有些examples将重新传递策略与监听器容器联系起来,所以我不能完全确定在我的情况下(如果有的话)是否会将有毒消息放在DLQ上.摘要中包含详细注释.

此外,到目前为止我遇到的所有示例都使用Spring.但是,我没有选择使用它需要重新布线整个项目(如果只涉及最小的开销,我会打开).

任何关于如何使用ActiveMQ api在Java中做到这一点的见解将不胜感激.

制片人

public void publishUpdate(final MessageBody payload)
            throws JMSException {
        Session session = session(connection());
        try {
            Message message = message(session, payload);
            LOGGER.info("About to put message on queue");
            producer(session).send(message);
            // without session.commit()-- no messages get put on the queue.
            session.commit();// messages seem to be enqueued now.
            
        } catch ( BadRequestException e) { //to avoid badly formed requests?
            LOGGER.info("Badly formed request. Not attempting retry!");
            return;
        } catch (JMSException jmsExcpetion) {

            LOGGER.info("Caught JMSException will retry");
            session.rollback();// assume rollback is followed by a retry?
        }         
    }

  private MessageProducer producer(Session session) throws JMSException {
        return session.createProducer(destination());
    }

   private Connection connection() throws JMSException {
        ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory();
        Connection connection = connectionFactory.createConnection();
     connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());//redelivery policy with three retries and redelivery time of 1000ms  
        return connection;
    }

 private Session session(Connection connection) throws JMSException {
        Session session = connection.createSession(true,
                Session.SESSION_TRANSACTED);
        connection.start();
        return session;
    } 

监听器:

public class UpdateMessageListener implements MessageListener{
….
    public void onMessage(Message message) {
        String json = null;
        try {
            //Does the listener need to do anything to deal with retry?
            json = ((TextMessage) message).getText();
            MessageBody request = SerializeUtils.deserialize(json, MessageBody.class);
            processTransaction(request.getUpdateMessageBody(), headers);//perform some additional processing.
            } catch (Throwable e) {
            LOGGER.error("Error processing request: {}", json);
        }
    }
}

消费者:

  private MessageConsumer consumer() throws JMSException {
        LOGGER.info("Creating consumer");
            MessageConsumer consumer = session().createConsumer(destination());
            consumer.setMessageListener(new UpdateMessageListener()); //wire listener to consumer
        return consumer;
    }
    private Session session() throws JMSException {
        Connection connection=connection();
         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//create an auto-ack  from the consumer side? Is this correct?
         connection.start();
         return session;
  }

如果有必要,我也愿意提供更多代码.

解决方法:

你的解决方案有一点缺陷.

根据JMS document,如果在onMessage函数中存在异常,则在Session.AUTO_ACKNOWLEDGE模式下,失败的执行消息将由消息队列(例如ActiveMQ)重新传递.但是这个流程被打破了,因为侦听器在onMessage函数中捕获了Throwable或Exception.数据流如下图所示:
enter image description here

ACK_TYPE

如果要实现本地事务,则在消息处理程序执行失败时应该抛出异常.在pesudo代码中,异步重新传递消息使用者可能如下所示:

Session session = connection.getSession(consumerId);  
sessionQueueBuffer.enqueue(message);  
Runnable runnable = new Ruannale(){  
    run(){  
        Consumer consumer = session.getConsumer(consumerId);  
        Message md = sessionQueueBuffer.dequeue();  
        try{  
            consumer.messageListener.onMessage(md);  
            ack(md);//send an STANDARD_ACK_TYPE, tell broker success
        }catch(Exception e){  
            redelivery();//send redelivery ack. DELIVERED_ACK_TYPE, require broker keep message and redeliver it.
    }  
}   
threadPool.execute(runnable);

标签:java,spring,jms,activemq
来源: https://codeday.me/bug/20190711/1433161.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有