ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java – 在camel中使用队列的线程DSL行为

2019-06-23 15:49:22  阅读:143  来源: 互联网

标签:java multithreading apache-camel


在下面的路由中,我期望来自queue1的10 msg应该同时进行处理,但是一次只有一个进程.

我错了吗?或做错了什么?

context.addRoutes(new RouteBuilder() {
        public void configure() {                                       
            from("test-jms:queue:test.queue1").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

        private Processor sleep(final int sleepId) {
            return new Processor() {                    
                @Override
                public void process(Exchange exchange) throws Exception {                       
                    System.out.println(curTime() + " Going for sleep sleepid=" + sleepId );
                    Thread.sleep(5000l);                        
                    System.out.println(curTime() + " Done sleep sleepid=" + sleepId );
                }
            };
        }

使用以下方式呼叫上述路线:

   ExecutorService ec = Executors.newFixedThreadPool(5);

    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));

static class Task  implements Runnable{
    CamelContext context;
    ProducerTemplate template;
    public Task(CamelContext context, ProducerTemplate template) {
        super();
        this.context = context;
        this.template = template;
    }
    @Override
    public void run() {         
           Exchange exchange = new DefaultExchange(context);
           exchange.setPattern(ExchangePattern.InOnly);
           exchange.getIn().setBody("Test Message: " + Thread.currentThread().getName());
           System.out.println(Thread.currentThread().getName());
           Exchange send = template.send("test-jms:queue:test.queue1",exchange);
           System.out.println("completed");           
    }

}

来自代码的OutPut:

10:24:11 Going for sleep sleepid=1
10:24:16 Done sleep sleepid=1

10:24:16 Going for sleep sleepid=1
10:24:21 Done sleep sleepid=1

10:24:21 Going for sleep sleepid=1
10:24:26 Done sleep sleepid=1

10:24:26 Going for sleep sleepid=1
10:24:31 Done sleep sleepid=1

10:24:31 Going for sleep sleepid=1
10:24:36 Done sleep sleepid=1

如果我们观察时间戳,我们将看到该路由仅处理1 msg.

解决方法:

您需要在JMS端点上启用asyncConsumer以允许它为异步.执行此操作时,可以不按顺序处理从队列中消耗的消息,从而默认情况下订购消费者的原因.

代码应该是

 public void configure() {                                       
            from("test-jms:queue:test.queue1?asyncConsumer=true").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

但是JMS组件具有内置并发性,通常可以更好地使用,因为它可以使用并发JMS使用者和并发网络.有关更多详细信息,请参阅concurrentConsumers和maxConcurrentConsumers选项.

标签:java,multithreading,apache-camel
来源: https://codeday.me/bug/20190623/1272273.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有