ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

storm manual drpc 的远程调用

2019-07-02 14:00:08  阅读:179  来源: 互联网

标签:String manual storm conf new drpc Config public LOG


一.创建server端

public class ManualDRPC {
    
    private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class);
    
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        DRPCSpout spout = new DRPCSpout("add");
        builder.setSpout("drpc", spout);
        builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
        
        Config conf = new Config();
        StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
        LOG.warn("==================================================");
        LOG.warn("the topology {} is submitted.","ManualDRPC");
        LOG.warn("==================================================");
        
    }
    public static class AddBolt extends BaseBasicBolt{

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object returnInfo = input.getValue(1);
            String params = input.getString(0);
            String[] numbers = params.split(",");
            String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]));
            collector.emit(new Values(conversValue,returnInfo));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("result","return-info"));
        }
        
    }
}

二. client端

public class ManualClientDRPC {
    
    private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class);
    
    public static void main(String[] args) {
        Config conf = new Config();
        conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
        
        try {
            DRPCClient client = new DRPCClient(conf, "master", 3777);
            String result = client.execute("add", "1,2");
            LOG.info("============== result:{}",result);
        } catch (Exception e) {
            LOG.info("ERR");
        }
    }
}

 

标签:String,manual,storm,conf,new,drpc,Config,public,LOG
来源: https://www.cnblogs.com/MrRightZhao/p/11120211.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有