ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

storm源码分析研究(二)

2021-10-07 23:00:11  阅读:188  来源: 互联网

标签:分析 spout 调用 msgId void 元组 源码 storm Spout


2021SC@SDUSC

spout源码分析(一)

2021SC@SDUSC

文章目录


2021SC@SDUSC

核心概念介绍

1、结构:
Spout是storm的核心组件之一,最源头的接口是IComponent。

2、发送:
当Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。Spout可以发射多个流,可以定义多个流(即定义多个stream),也可以使用方法来发射指定的流。

3、重要结构:
Spout的重要方法是nextTuple,nextTuple方法发射一个新的元组到Topology,如果没有新的元组发射,则直接返回。注意任务Spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。
Spout的另外两个重要方法是ack和fail方法,当spout发射的元组被拓扑成功处理时,调用ack方法,当处理失败时,调用fail方法,此外,ack和fail方法仅被可靠的spout调用。

ISpout.java

ISpout接口:
storm实现主要依靠以下几个函数,全局代码如下:

package org.apache.storm.spout;

import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
 
    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void activate();

    void deactivate();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);
}

Strom支持所有的基本类型,当它使用元组作为数据模型,元组中的每个字段都可以是任何类型的对象。而如果要使用自己定义的类型,需要为自己定义的类型实现并且注册一个serializer。每个节点还必须要为输出的元组定义字段名称。
部分函数解释:
open():
当该组件的任务在集群上初始化时调用。它为spout提供了执行spout的环境。
close():
当ISpout即将关闭时调用。不能保证会调用close,因为supervisor会杀死集群上的的worker进程。
activate():
当spout从非激活模式被激活时调用。
deactivate():
当spout失效时调用。

ShellSpout.java

重载函数如下:

    public void open(Map<String, Object> topoConf, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;

        if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }

        process = new ShellProcess(command);
        if (!env.isEmpty()) {
            process.setEnv(env);
        }

        Number subpid = process.launch(topoConf, context, changeDirectory);
        LOG.info("Launched subprocess with pid " + subpid);

        logHandler = ShellUtils.getLogHandler(topoConf);
        logHandler.setUpContext(ShellSpout.class, process, this.context);

        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override
    public void close() {
        heartBeatExecutorService.shutdownNow();
        process.destroy();
        running = false;
    }

    @Override
    public void nextTuple() {
        this.sendSyncCommand("next", "");
    }

    @Override
    public void ack(Object msgId) {
        this.sendSyncCommand("ack", msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.sendSyncCommand("fail", msgId);
    }

 @Override
    public void activate() {
        LOG.info("Start checking heartbeat...");
        // prevent timer to check heartbeat based on last thing before activate
        setHeartbeat();
        if (heartBeatExecutorService.isShutdown()) {
            //In case deactivate was called before
            heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
        heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
        this.sendSyncCommand("activate", "");
    }

    @Override
    public void deactivate() {
        this.sendSyncCommand("deactivate", "");
        heartBeatExecutorService.shutdownNow();
    }



void open(Map<String, Object> topoConf, TopologyContext context,SpoutOutputCollector collector)
参数:
topoconf :
Storm关于这个Spout的配置
context :
用来获取该Spout任务的信息,包括任务id,组件id,输入输出信息等等
collector :
用来从这个Spout里发送元组,元组可以在任何时间里发送,包括open和close函数里。collector是线程安全的,应该被作为一个实例对象保存到Spout对象里。

void ack(Object msgId):
以msgId消息告诉Storm这个Spout已经成功输出了该元组
void activate():
激活Spout,Spout从deactivate模式转化为activate模式,Spout开始调用nextTuple输出数据。
void close():
关闭Spout
void deactivate():
解除激活Spout,Spout从activate模式转化为deactivate模式,Spout停止调用nextTuple输出数据
void fail(Object msgId):
以msgId消息告诉Storm这个Spout输出该元组失败,主要用于将该元组重新放回消息队列,以在一段时间后重发该元组
void nextTuple():
调用该函数请求Storm发送元组到Output Collector,这个函数不应该是阻塞的,当没有元组发送时,一般调用sleep,以充分利用CPU。

参考链接:
https://blog.csdn.net/wdasdaw/article/details/48896321
https://xlucas.blog.csdn.net/article/details/55301577

标签:分析,spout,调用,msgId,void,元组,源码,storm,Spout
来源: https://blog.csdn.net/qq_45849855/article/details/120642454

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有