ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java框架-ActiveMQ

2020-10-13 15:32:33  阅读:259  来源: 互联网

标签:Java 框架 Session 对象 创建 connection session 消息 ActiveMQ


 适用场景: 商品修改了名称,商品详情模块、搜索模块、其他业务模块中的“商品名称”也修改   ActiveMQ的消息形式 对于消息的传递有两种类型: 一种是点对点的,即一个生产者和一个消费者一一对应; 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。 JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。   · StreamMessage -- Java原始值的数据流   · MapMessage--一套名称-值对   · TextMessage--一个字符串对象   · ObjectMessage--一个序列化的 Java对象   · BytesMessage--一个字节的数据流   安装步骤: 第一步: 把ActiveMQ 的压缩包上传到Linux系统。 第二步:解压缩。 第三步:启动。 使用bin目录下的activemq命令   启动: [root@localhost bin]# ./activemq start 关闭: [root@localhost bin]# ./activemq stop 查看状态: [root@localhost bin]# ./activemq status   工程需要添加jar包:     1.1. Queue 1.1.1.    Producer 生产者:生产消息,发送端。 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 第二步:使用ConnectionFactory对象创建一个Connection对象。 第三步:开启连接,调用Connection对象的start方法。 第四步:使用Connection对象创建一个Session对象。 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 第六步:使用Session对象创建一个Producer对象。 第七步:创建一个Message对象,创建一个TextMessage对象。 第八步:使用Producer对象发送消息。 第九步:关闭资源。
@Test       public void testQueueProducer() throws Exception {            // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。            //brokerURL服务器的ip及端口号            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");            // 第二步:使用ConnectionFactory对象创建一个Connection对象。            Connection connection = connectionFactory.createConnection();            // 第三步:开启连接,调用Connection对象的start方法。            connection.start();            // 第四步:使用Connection对象创建一个Session对象。            //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。            //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。            //参数:队列的名称。            Queue queue = session.createQueue("queue-test");            // 第六步:使用Session对象创建一个Producer对象。            MessageProducer producer = session.createProducer(queue);            // 第七步:创建一个Message对象,创建一个TextMessage对象。            /*TextMessage message = new ActiveMQTextMessage();            message.setText("hello activeMq,this is my first test.");*/            TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");            // 第八步:使用Producer对象发送消息。            producer.send(textMessage);            // 第九步:关闭资源。            producer.close();            session.close();            connection.close();       }
  1.1.2.    Consumer 消费者:接收消息。 第一步:创建一个ConnectionFactory对象。 第二步:从ConnectionFactory对象中获得一个Connection对象。 第三步:开启连接。调用Connection对象的start方法。 第四步:使用Connection对象创建一个Session对象。 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。 第六步:使用Session对象创建一个Consumer对象。 第七步:接收消息。 第八步:打印消息。 第九步:关闭资源
public class QueueCustomer {       @Test       public void recieve() throws Exception{            // 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");            // 2.获取连接            Connection connection = connectionFactory.createConnection();            // 3.开启连接            connection.start();            // 4.根据连接对象创建session            // 第一个参数:表示是否使用分布式事务(JTA)            // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);             // 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)            Queue queue = session.createQueue("queue-test");            // 6.创建消费者            MessageConsumer consumer = session.createConsumer(queue);            //7.接收消息            //第一种 //               while(true){ //                    //接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接) //                    Message message = consumer.receive(10000); //                    //如果message为空,没有接收到消息了就跳出 //                    if(message==null){ //                          break; //                    } //                    //                    if(message instanceof TextMessage){ //                          TextMessage messaget = (TextMessage)message; //                          System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容 //                    } //               }            System.out.println("start");            //第二种:                  //设置监听器,其实开启了一个新的线程。            consumer.setMessageListener(new MessageListener() {                  //接收消息,如果有消息才进入,如果没有消息就不会进入此方法                  @Override                  public void onMessage(Message message) {                       if(message instanceof TextMessage){                             TextMessage messaget = (TextMessage)message;                             try {                                   //获取消息内容                                   System.out.println(">>>获取的消息内容:"+messaget.getText());                             } catch (JMSException e) {                                   e.printStackTrace();                             }                       }                  }            });       System.out.println("end");            Thread.sleep(10000);//睡眠10秒钟。                       // 9.关闭资源            consumer.close();            session.close();            connection.close();       } }
  1.2. Topic     1.2.1.    Producer 使用步骤: 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 第二步:使用ConnectionFactory对象创建一个Connection对象。 第三步:开启连接,调用Connection对象的start方法。 第四步:使用Connection对象创建一个Session对象。 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。 第六步:使用Session对象创建一个Producer对象。 第七步:创建一个Message对象,创建一个TextMessage对象。 第八步:使用Producer对象发送消息。 第九步:关闭资源。  
public class TopicProducer {       @Test       public void send() throws Exception{            // 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");            // 2.获取连接            Connection connection = connectionFactory.createConnection();            // 3.开启连接            connection.start();            // 4.根据连接对象创建session            // 第一个参数:表示是否使用分布式事务(JTA)            // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // 5.根据session创建Destination(目的地,queue topic,这里使用的是topic)            Topic topic = session.createTopic("topic-test");//---------------------            // 6.创建生产者            MessageProducer producer = session.createProducer(topic);            // 7.构建消息对象,(构建发送消息的内容) 字符串类型的消息格式(TEXTMessage)            TextMessage textMessage = new ActiveMQTextMessage();            textMessage.setText("发送消息123");// 消息的内容            // 8.发送消息            producer.send(textMessage);            // 9.关闭资源            producer.close();            session.close();            connection.close();       } }
  1.2.2.    Consumer 消费者:接收消息。 第一步:创建一个ConnectionFactory对象。 第二步:从ConnectionFactory对象中获得一个Connection对象。 第三步:开启连接。调用Connection对象的start方法。 第四步:使用Connection对象创建一个Session对象。 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。 第六步:使用Session对象创建一个Consumer对象。 第七步:接收消息。 第八步:打印消息。 第九步:关闭资源
public class TopicCustomer1 {       @Test       public void reieve() throws Exception{              // 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");            // 2.获取连接            Connection connection = connectionFactory.createConnection();            // 3.开启连接            connection.start();            // 4.根据连接对象创建session            // 第一个参数:表示是否使用分布式事务(JTA)            // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)            Topic topic = session.createTopic("topic-test");//---------------------            // 6.创建消费者            MessageConsumer consumer = session.createConsumer(topic);            // 7.接收消息            while(true){                  //接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)                  Message message = consumer.receive(100000);                  //如果message为空,没有接收到消息了就跳出                  if(message==null){                       break;                  }                                   if(message instanceof TextMessage){                       TextMessage messaget = (TextMessage)message;                       System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容                  }            }            // 第二种:            // 设置监听器,其实开启了一个新的线程。 //         consumer.setMessageListener(new MessageListener() { //               // 接收消息,如果有消息才进入,如果没有消息就不会进入此方法 //               @Override //               public void onMessage(Message message) { //                    if (message instanceof TextMessage) { //                          TextMessage messaget = (TextMessage) message; //                          try { //                                // 获取消息内容 //                                System.out.println(">>>获取的消息内容:" + messaget.getText()); //                          } catch (JMSException e) { //                                e.printStackTrace(); //                          } //                    } //               } //         });            //Thread.sleep(10000);// 睡眠10秒钟。              // 9.关闭资源            consumer.close();            session.close();            connection.close();         } }
  1.3. 小结 queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。 topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。   queue 默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。 topic 默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。 ---------------------- ---------------------   与Spring整合     1》JMS 1.1》什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。   1.2》为什么要学JMS 在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。 JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。   1.3》JMS有什么优势 异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。   可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。   1.4》JMS数据交互的两种方式 点对点消息模型 (1)、每个消息只有一个接受者(自己测试了一下,可以有多个接受者,但是当有多个接收者时,每个接收者只能获取随机的几条信息)   (2)、消息发送者和消息接受者并没有时间依赖性。   (3)、当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;                                          (4)、当接收者收到消息的时候,会发送确认收到通知(acknowledgement)。   (5)、点对点消息模型图:     发布/订阅消息模型 (1)、一个消息可以传递给多个订阅者   (2)、发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。   (3)、为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。   (4)、发布/订阅消息模型图:     2》ActiveMQ 2.1》什么是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。   2.2》ActiveMQ的下载 主页:http://activemq.apache.org/ 目前最新版本:5.15.3   开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html   ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin   2.3》ActiveMQ的点对点使用 Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。   Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。   Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝的确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置   为 true。 消息生产者
public class JMSProducer {           private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名       private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码       private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址           public static void main(String[] args) {                   ConnectionFactory connectionFactory;//连接工厂           Connection connection = null;//连接           Session session;//会话           Destination destination;//消息目的地           MessageProducer messageProducer;//消息生产者               connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);               try {               connection = connectionFactory.createConnection();//通过工厂获取连接               connection.start();//启动连接               session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session               destination = session.createQueue("新闻队列");               messageProducer = session.createProducer(destination);//创建消息生产者                   //发送消息               for(int i=0;i<10;i++){                   TextMessage msg = session.createTextMessage("郭永峰IT工作室客服" + (i + 1) +"号");                   messageProducer.send(destination,msg);               }                   session.commit();               }catch (Exception e){               e.printStackTrace();           }finally {               if(connection!=null){                   try {                       connection.close();                   } catch (JMSException e) {                       // TODO Auto-generated catch block                       e.printStackTrace();                   }               }           }       }   }  
    消息消费者
public class JMSConsumer {           private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名       private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码       private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址           public static void main(String[] args) {                   ConnectionFactory connectionFactory;//连接工厂           Connection connection = null;//连接           Session session;//会话           Destination destination;//消息目的地           MessageConsumer consumer;//消息消费者               connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);               try {               connection = connectionFactory.createConnection();//通过工厂获取连接               connection.start();//启动连接               session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session               destination = session.createQueue("新闻队列");               consumer = session.createConsumer(destination);//创建消息消费者                   //发送消息              while(true){                  TextMessage message = (TextMessage) consumer.receive(10000);                  if(message != null){                      System.out.println(message.getText());//获取消息                  }              }                   }catch (Exception e){               e.printStackTrace();           }   }  
    接收消息监听器  

 

 

等级: 二级丙等 类型: 公立
医保定点: 医保 类别: 综合

标签:Java,框架,Session,对象,创建,connection,session,消息,ActiveMQ
来源: https://www.cnblogs.com/YangBinChina/p/13809111.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有