ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

storm源码分析研究(十一)

2021-12-09 10:58:01  阅读:155  来源: 互联网

标签:十一 context TridentProcessor 源码 storm new nodes 节点 SubTopologyBolt


2021SC@SDUSC

Trident的Bolt节点分析

2021SC@SDUSC

SubTopologyBolt类型为Trident中运行的基本单位,但它并不是真正的Bolt节点,Trident会利用TridentBoltExecutor对SubTopologyBolt进行接口适配。

TridentBoltExecutor继承自IRichBolt接口,是Trident中真正运行的Bolt节点。它提供了类似于协调Bolt ( CoordinatedBolt )节点的功能,通过发送协调消息来对各个节点进行同步。

SubTopologyBolt主要用于对TridentProcessor的执行进行抽象。本篇文章将讨论SubTopologyBolt和TridentBoltExecutor的实现。

SubtopologyBolt.java

类的定义为:

public class SubtopologyBolt implements ITridentBatchBolt {
    private static final long serialVersionUID = 1475508603138688412L;
    @SuppressWarnings("rawtypes")
    final DirectedGraph<Node, IndexedEdge> graph;
    final Set<Node> nodes;
    final Map<String, InitialReceiver> roots = new HashMap<>();
    final Map<Node, Factory> outputFactories = new HashMap<>();
    final Map<String, List<TridentProcessor>> myTopologicallyOrdered = new HashMap<>();
    final Map<Node, String> batchGroups;


graph:整个Topology所对应的有向图。
nodes:该Bolt中所包含的处理节点。_nodesS_graph节点的子集。
roots:每种类型的输人流都会对应一个lnitalReceiver对象,用于表示如何处理该流的消息。
outputFactories:每个处理节点都会对应一个输出的工厂。 myTopologicallyOrdered:它的键为节点组序号,值为节点组所对应的TridentProcessor。
batchGroups:该变量保存了反向的索引,用来表示每个节点属于哪一个节点组。
BatchGroup对应于graph中的一个最大连通子图。

主要方法:

    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector batchCollector) {
        int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
        for (Node n : nodes) {
            if (n.stateInfo != null) {
                State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
                context.setTaskData(n.stateInfo.id, s);
            }
        }
        DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(graph, nodes, null);
        TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph);
        int stateIndex = 0;
        while (it.hasNext()) {
            Node n = it.next();
            if (n instanceof ProcessorNode) {
                ProcessorNode pn = (ProcessorNode) n;
                String batchGroup = batchGroups.get(n);
                if (!myTopologicallyOrdered.containsKey(batchGroup)) {
                    myTopologicallyOrdered.put(batchGroup, new ArrayList<>());
                }
                myTopologicallyOrdered.get(batchGroup).add(pn.processor);
                List<String> parentStreams = new ArrayList<>();
                List<Factory> parentFactories = new ArrayList<>();
                for (Node p : TridentUtils.getParents(graph, n)) {
                    parentStreams.add(p.streamId);
                    if (nodes.contains(p)) {
                        parentFactories.add(outputFactories.get(p));
                    } else {
                        if (!roots.containsKey(p.streamId)) {
                            roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
                        }
                        roots.get(p.streamId).addReceiver(pn.processor);
                        parentFactories.add(roots.get(p.streamId).getOutputFactory());
                    }
                }
                List<TupleReceiver> targets = new ArrayList<>();
                boolean outgoingNode = false;
                for (Node cn : TridentUtils.getChildren(graph, n)) {
                    if (nodes.contains(cn)) {
                        targets.add(((ProcessorNode) cn).processor);
                    } else {
                        outgoingNode = true;
                    }
                }
                if (outgoingNode) {
                    targets.add(new BridgeReceiver(batchCollector));
                }

                TridentContext triContext = new TridentContext(
                    pn.selfOutFields,
                    parentFactories,
                    parentStreams,
                    targets,
                    pn.streamId,
                    stateIndex,
                    batchCollector
                );
                pn.processor.prepare(conf, context, triContext);
                outputFactories.put(n, pn.processor.getOutputFactory());
            }
            stateIndex++;
        }
    }

for (Node n : nodes):
在此循环中判断节点的statelnfo是否为空,并对存储的State对象进行初始化。初始化过后的State对象被存储于TopologyContext的taskData中,并以statelnfo.id为键。statelnfo.id是一个以串 “ state” 为前缀的在Topology中唯一的字符串。
subgraph:
根据SubTopologyBolt中包含的节点获得一个子图。
it:
对子图进行拓扑排序,TopologicalOrderlterator类型的it变量用来按照拓扑排序的顺序遍历子图。
if (n instanceof ProcessorNode):
SubTopologyBolt只对处理节点进行操作。处理节点中包含了一个TridentProcessor。Spout节点和分区节点则不在SubTopologyBolt的处理范围之内。
pn.processor.prepare(conf, context, triContext:
调用该TridentProcessor的prepare方法,它将新产生的TridentContext对象作为构造函数的一个参数传入。
outputFactories.put(n, pn.processor.getOutputFactory()):
将该TridentProcessor所对应的输出添加到SubTopologyBolt的输出中。于是该输出便可被其他的SubTopologyBolt作为输入了。
statelndex变量:
用于唯一地标识SubTopologyBolt中的每一个节点。

public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = roots.get(sourceStream);
        if (ir == null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }

首先,根据输人消息的流号,在roots中找到对应的InitialReceiver对象,并调用其receive方法。
所有等待该流消息的TridentProcessor的execute方法均会被调用。
在某个TridentProcessor的execute方法中,下游TridentProcessor的execute方法也会被依次调用到,于是构成了一个调用链,直到SubTopologyBolt完成对该消息的处理后结束。

public void finishBatch(BatchInfo batchInfo) {
        for (TridentProcessor p : myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[nodes.size()]);
        for (TridentProcessor p : myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for (Node n : nodes) {
            declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
        }
    }

在initBatchState方法中,会对ProcessorContext的数据进行初始化,然后返回ProcessorContext对象。Trident中的聚集器均要基于ProcessorContext中state所存储的数据来实现。
declareOutputFields方法会对SubTopologyBolt中每一个节点的输出进行声明,它将$batchid作为第1列。可以看出SubTopologyBolt虽然作为一个整体而存在,可其中每一个节点的输出均可能成为最终的输出。

标签:十一,context,TridentProcessor,源码,storm,new,nodes,节点,SubTopologyBolt
来源: https://blog.csdn.net/qq_45849855/article/details/121716312

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有