标签:Java 入门 void storm new collector public resMap String
1.导入依赖
<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
2.创建 spout 继承 BaseRichSpout
public class WordSpout extends BaseRichSpout {
//模拟数据来源
String[] init_data = {"hello java", "hello python", "hello C++", "hello scala"};
/**
* 放射方法在里面,应该在nextTuple中调用,可以把他提出来在初始化中赋值
*/
private SpoutOutputCollector collector;
/**
* 初始化方法,只执行一次
* @param map
* @param topologyContext
* @param spoutOutputCollector
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
/**
* 死循环,storm 内部一直在调用
* 数据来源 kafka flume ...
*/
public void nextTuple() {
//拿数据
String init_datum = init_data[new Random().nextInt(init_data.length)];
//拆分
String[] split = init_datum.split(" ");
//循环发射到 bolt 中
for (String str:split){
// List list = Arrays.asList(str);
// collector.emit(list);
//第二种
collector.emit(new Values(str));
}
}
/**
* 定义发射出去,tuple 的字段名
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
3.创建求和 bolt 继承 BaseRichBolt
public class WordBolt extends BaseRichBolt {
/**
* 临时解决方案 结果集
*/
private Map<String, Long> resMap;
private OutputCollector collector;
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
resMap = new HashMap<String, Long>();
collector = outputCollector;
}
public void execute(Tuple tuple) {
//根据字段名拿到每一个 tuple
String word = tuple.getStringByField("word");
//给每一个单词次数累加
Long time = resMap.get(word);
if (time != null){
resMap.put(word, time + 1L);
}else {
resMap.put(word, 1L);
}
//发射
collector.emit(new Values(resMap));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("resMap"));
}
}
4.创建输出 bolt
public class PrintBolt extends BaseRichBolt {
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
public void execute(Tuple tuple) {
//取值
Map<String, Long> resMap = (Map<String, Long>)tuple.getValueByField("resMap");
//处理
for (String key:resMap.keySet()){
System.out.println(key + " ==> " + resMap.get(key));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
5.创建 topology
public class WordTopoigy {
public static void main(String[] args) throws Exception {
//调用主的 Api
TopologyBuilder builder = new TopologyBuilder();
//关联 spout bolt
builder.setSpout("spout01", new WordSpout());
builder.setBolt("count01", new WordBolt())
//关联线(放射方向)
.shuffleGrouping("spout01");
builder.setBolt("print01", new PrintBolt()).shuffleGrouping("count01");
//本地发布 开发时用
LocalCluster cluster = new LocalCluster();
LocalCluster.LocalTopology topology01 = cluster.submitTopology("topology01",
new HashMap<String, Object>(), builder.createTopology());
}
}
标签:Java,入门,void,storm,new,collector,public,resMap,String 来源: https://blog.csdn.net/qiouou/article/details/114753870
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。