ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ActiveMQ--消息接收端(二)

2021-07-01 11:58:37  阅读:296  来源: 互联网

标签:MqObservable 接收端 -- jms public org import ActiveMQ javax


结合观察者模式(Observable 和 Observer ) ​​​​​​​以及LinkedBlockingQueue处理接收的消息。

接收消息

package com.xxx.controller;

import java.util.Observable;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;

import com.zchz.testbase.Consts;

import net.sf.json.JSONObject;

public class MqObservable extends Observable implements  MessageListener{
    public static MqObservable instance;

    public static MqObservable getInstance(){
        if(instance == null){
              synchronized(MqObservable.class){
                   if (instance == null){
                       ApplicationContext factory=new ClassPathXmlApplicationContext("activemq.xml");
     
                       instance = (MqObservable)factory.getBean("jmsListener");
                   }
               }
        }
        return instance;
    }
    

    private MqObservable() {
    }

    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage) message;//获取的mq消息
                //JSONObject jsonMsg =  JSONObject.fromObject(msg.getText());//获取的mq消息
               //到此为止关于activemq的消息接收完成;下面是触发MqListener类对消息的处理存储
                this.setChanged();//会触发Observer中的update(Observable arg0, Object arg1)
                this.notifyObservers(msg);
    }

}

Observer中处理消息

package com.xxx.controller;

import java.util.Observable;
import java.util.Observer;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;

import net.sf.json.JSONObject;

public class MqListener implements Observer{
    public static String EVENT_CREATE="EVENT_CREATE";//消息发送方给发送的MQ消息定义的名称

    public LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<JSONObject>();
    
     MqObservable mqObservable;
     public String type="";//消息中带有的信息,后面过滤使用的

    public MqListener(String type) throws JMSException{
         this.type=type;
       
        // ConnectionFactory :连接工厂,JMS用它创建连接  
         MqObservable mqObservable = MqObservable.getInstance();
       mqObservable.addObserver(this);//addObserver后,MqObservable发生变化能通知MqListener
      
    }  
    /**
     * 重置队列
     */
    public void resetQueue() {
        queue.clear();
    }
    @Override
    public void update(Observable arg0, Object arg1) {
        //收到消息,触发update,其中update的第二个参数就是收到的消息的内容
        TextMessage msg =(TextMessage) arg1;
        //System.out.println(msg.toString());
        try {
            JSONObject jsonText = JSONObject.fromObject(msg.getText());
            if(msg==null){
                return;
            }
            if(("queue://"+type).equals(msg.getJMSDestination().toString()){
                queue.add(jsonText);//过滤后的消息存到队列中,待用
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        
    }
}

使用

MqListener txMq = new MqListener(MqListener.EVENT_CREATE);//监听注册
//消息发送方发送消息
JSONObject txMsg = txMq.queue.poll(40, TimeUnit.SECONDS);//获取queue中存入的消息
logger.info("txMsg:" + txMsg);

标签:MqObservable,接收端,--,jms,public,org,import,ActiveMQ,javax
来源: https://blog.csdn.net/leminfei/article/details/118381677

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有