ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java操作storm入门

2021-03-13 17:33:22  阅读:162  来源: 互联网

标签:Java 入门 void storm new collector public resMap String


1.导入依赖

<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core-->

<dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>2.2.0</version>

</dependency>

2.创建 spout 继承  BaseRichSpout

 

public class WordSpout extends BaseRichSpout {

 

    //模拟数据来源

    String[] init_data = {"hello java", "hello python", "hello C++", "hello scala"};

 

    /**

     * 放射方法在里面,应该在nextTuple中调用,可以把他提出来在初始化中赋值

     */

    private SpoutOutputCollector collector;

 

    /**

     * 初始化方法,只执行一次

     * @param map

     * @param topologyContext

     * @param spoutOutputCollector

     */

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

        collector = spoutOutputCollector;

    }

 

    /**

     * 死循环,storm 内部一直在调用

     * 数据来源 kafka flume ...

     */

    public void nextTuple() {

        //拿数据

        String init_datum = init_data[new Random().nextInt(init_data.length)];

        //拆分

        String[] split = init_datum.split(" ");

        //循环发射到 bolt

        for (String str:split){

//            List list = Arrays.asList(str);

//            collector.emit(list);

            //第二种

            collector.emit(new Values(str));

        }

    }

 

    /**

     * 定义发射出去,tuple 的字段名

     * @param outputFieldsDeclarer

     */

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("word"));

    }

}

 

3.创建求和 bolt 继承 BaseRichBolt

 

public class WordBolt extends BaseRichBolt {

 

    /**

     * 临时解决方案 结果集

     */

    private Map<String, Long> resMap;

 

    private OutputCollector collector;

 

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        resMap = new HashMap<String, Long>();

        collector = outputCollector;

    }

 

    public void execute(Tuple tuple) {

        //根据字段名拿到每一个 tuple

        String word = tuple.getStringByField("word");

        //给每一个单词次数累加

        Long time = resMap.get(word);

        if (time != null){

            resMap.put(word, time + 1L);

        }else {

            resMap.put(word, 1L);

        }

        //发射

        collector.emit(new Values(resMap));

    }

 

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("resMap"));

    }

}

 

4.创建输出 bolt

 

public class PrintBolt extends BaseRichBolt {

 

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

 

    }

 

    public void execute(Tuple tuple) {

        //取值

        Map<String, Long> resMap = (Map<String, Long>)tuple.getValueByField("resMap");

        //处理

        for (String key:resMap.keySet()){

            System.out.println(key + " ==> " + resMap.get(key));

        }

    }

 

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

 

    }

}

 

5.创建 topology

 

public class WordTopoigy {

 

    public static void main(String[] args) throws Exception {

        //调用主的 Api

        TopologyBuilder builder = new TopologyBuilder();

        //关联 spout bolt

        builder.setSpout("spout01", new WordSpout());

        builder.setBolt("count01", new WordBolt())

                //关联线(放射方向)

                .shuffleGrouping("spout01");

        builder.setBolt("print01", new PrintBolt()).shuffleGrouping("count01");

        //本地发布 开发时用

        LocalCluster cluster = new LocalCluster();

        LocalCluster.LocalTopology topology01 = cluster.submitTopology("topology01",

                new HashMap<String, Object>(), builder.createTopology());

    }

}

 

 

标签:Java,入门,void,storm,new,collector,public,resMap,String
来源: https://blog.csdn.net/qiouou/article/details/114753870

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有