ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

azkaban整体工作流程

2021-10-19 17:35:42  阅读:234  来源: 互联网

标签:run 流程 flow 整体 azkaban exflow job 提交 executor


1、工作流程


WEB:   ExecutorServlet web端执行一个流的入口   1、ajaxExecuteFlow执行这个方法     1、getProjectAjaxByPermission执行这个方法,判断用户是否有权限执行这个工程     2、final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);获取一个ExecutableFlow对象     3、executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId()); 提交这个flow ExecutorManager  flow的管理类,上面把flow提交到了这个类   1、submitExecutableFlow     1、exflow.isLocked(),判断flow的状态     2、this.queuedFlows,判断放flow的队列是不是满了,满了就报错     3、this.queuedFlows.enqueue(exflow, reference);把flow放到队列中   2、QueueProcessorThread这个线程负责消费queuedFlows队列里面的flow     1、if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow             || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {  根据当前时间和最后一次更新executor的时间与更新时间窗口做对比,已提交的flow跟允许提交的flow数量做对比,满足其一,向exector发送请求,获取exector的状态,包括cpu,mem,上次是否提交flow,以及运行的flow总数     2、if (exflow.getUpdateTime() > lastExecutorRefreshTime) 这块用来判断flow的最后更新时间,跟最后一次刷新executor的时间做对比,如果更新时间晚的话,则sleep到刷新executor状态的时候再进行提交     3、selectExecutorAndDispatchFlow(reference, exflow); 选择executor并且提交flow       1、selectExecutor(exflow, remainingExecutors);          1、getUserSpecifiedExecutor(exflow.getExecutionOptions(),               exflow.getExecutionId() 指定executorId,直接去数据库里面查找wxwcutor是否存在          2、final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,             ExecutorManager.this.comparatorWeightsMap); 上述方式没有找到executor的情况下,创建一个ExecutorSelector选择器,          3、choosenExecutor = selector.getBest(availableExecutors, exflow); 根据executor的状态,上述刷新executor状态时候获取到的各项指标,进行对比,选择一个合适的executor       2、dispatch(reference, exflow, selectedExecutor); 找到executor以后进行flow的分配,发送到相应的executor     EXEC:   ExecutorServlet exec端执行一个流的入口   1、handleAjaxExecute     1、this.flowRunnerManager.submitFlow(execId); 具体执行的那个流的id,后面通过这个id去数据库获取flow的具体配置信息。 FlowRunnerManager   1、submitFlow     1、isAlreadyRunning(execId) 判断这个flow是否正在运行     2、final FlowRunner runner = createFlowRunner(execId); 通过execId创建一个FlowRunner对象     3、final Future<?> future = this.executorService.submit(runner); 往线程池里面提交这个FlowRunner对象     4、submitFlowRunner(runner); 提交这个flow去运行   2、submitFlowRunner     1、this.submittedFlows.put(future, runner.getExecutionId()); 往submittedFlows队列里面提交这个flow,submittedFlows只是为了记录已经提交的flow FlowRunner   1、run(),直接运行run方法     1、setupFlowExecution 添加配置信息     2、runFlow(); 真正的运行这个flow       1、runReadyJob 判断flow里面node的状态,也就是job的状态,从第一个job开始运行       2、runExecutableNode 运行具体的job         1、 prepareJobProperties(node); 准备job的配置文件         2、final JobRunner runner = createJobRunner(node); 创建一个JobRunner对象,         3、this.executorService.submit(runner); executorService是一个指定线程数量的线程池         this.executorService = Executors.newFixedThreadPool(this.numJobThreads,             new ThreadFactoryBuilder().setNameFormat("azk-job-pool-%d").build());         4、this.activeJobRunners.add(runner); activeJobRunners是一个记录正在运行job的队列 JobRunner 提交到executorService线程池以后开始运行   1、run(),直接运行run方法     1、doRun();       1、createAttachmentFile(); 创建job的工作目录       2、createLogger(); 创建一个job运行日志的追加器       3、uploadExecutableNode(); 往数据库中插入正在运行的job       4、prepareJob() 判断job是否准备好         1、finalStatus = runJob(); 运行这个job   2、runJob()     1、this.job.run();       1、执行的是ProcessJob的实现类 ProcessJob   1、run() 执行job的方法,     1、resolveProps(); 解析配置文件     2、this.process.run(); 执行的方法   2、public void run()  一直堵塞直到job执行完成     1、ProcessBuilder 使用java自带的实现来执行cmd     2、LogGobbler 一个获取job日志的方法      判断任务是否执行完成 JobRunner对象生成的时候有一个FlowWatcher对象,监听job的状态 FlowRunner在判断到执行成功的job时,会执行这个方法,finalizeFlow(),如果这个job是这个flow的最下游job的话,那么就把这个flow的状态改成执行成功,如果不是,就拿到job的下游job,继续执行。       

 

标签:run,流程,flow,整体,azkaban,exflow,job,提交,executor
来源: https://www.cnblogs.com/study-wen/p/15425632.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有