ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Azkaban 单个Flow 任务执行流程 源码解读

2021-06-21 17:52:15  阅读:180  来源: 互联网

标签:node Status info Flow Azkaban job 源码 null final


Azkaban框架会将每个Flow抽象为FlowRunner,然后将FlowRunner放入线程池中异步运行,运行过程中涉及到多次修改job的状态,以及将状态持久化到DB元数据库中,这里就从源码角度将整个过程做个简单的分析:

先从azkaban.execapp.FlowRunner#runFlow开始分析

/**
 * Main method that executes the jobs.
 */
private void runFlow() throws Exception {
    this.logger.info("Starting flows");
    runReadyJob(this.flow); //执行准备job工作
    updateFlow(); //更新update时间
//阻塞直到任务结束
    while (!this.flowFinished) {
        synchronized (this.mainSyncObj) {
            if (this.flowPaused) {
                try {
                    this.mainSyncObj.wait(CHECK_WAIT_MS);
                } catch (final InterruptedException e) {
                }

                continue;
            } else {
                if (this.retryFailedJobs) {
                    retryAllFailures();
                } else if (!progressGraph()) {
                    try {
                        this.mainSyncObj.wait(CHECK_WAIT_MS);
                    } catch (final InterruptedException e) {
                    }
                }
            }
        }
    }
//停止Flow,更新状态
    this.logger.info("Finishing up flow. Awaiting Termination");
    this.executorService.shutdown();

    updateFlow();
    this.logger.info("Finished Flow");
}

以上方法中最主要的就是runReadyJob(this.flow)方法,该方法为Flow执行主体方法:

private boolean runReadyJob(final ExecutableNode node) throws IOException {
//任务结束 或者 正在运行中就返回False
      if (Status.isStatusFinished(node.getStatus())
              || Status.isStatusRunning(node.getStatus())) {
          return false;
      }
	 /**
     * Determines what the state of the next node should be. Returns null if the node should not be
     * run.
     */
     //nextNodeStatus 只要不是null就代表可以执行。
      final Status nextNodeStatus = getImpliedStatus(node);
      if (nextNodeStatus == null) {
          return false;
      }

      if (nextNodeStatus == Status.CANCELLED) {
          this.logger.info("Cancelling '" + node.getNestedId()
                  + "' due to prior errors.");
          node.cancelNode(System.currentTimeMillis());
          finishExecutableNode(node);
      } else if (nextNodeStatus == Status.SKIPPED) {
          this.logger.info("Skipping disabled job '" + node.getId() + "'.");
          node.skipNode(System.currentTimeMillis());
          finishExecutableNode(node);
      } else if (nextNodeStatus == Status.READY) {
      //具备执行条件,准备执行
      //递归执行该Flow下的所有可执行节点
          if (node instanceof ExecutableFlowBase) {
              final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
              this.logger.info("Running flow '" + flow.getNestedId() + "'.");
              //修改flow运行状态以及初始时间属性
              flow.setStatus(Status.RUNNING);
              flow.setStartTime(System.currentTimeMillis());
              prepareJobProperties(flow);
              for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
                  final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
                  runReadyJob(startNode);
              }
          } else {
              runExecutableNode(node);
          }
      }
      return true;
}

以上代码中,可执行节点执行主要方法在于runExecutableNode()方法:

@SuppressWarnings("FutureReturnValueIgnored")
private void runExecutableNode(final ExecutableNode node) throws IOException {
    // Collect output props from the job's dependencies.
    prepareJobProperties(node);
    //先将该节点的状态设置为队列等待状态
    node.setStatus(Status.QUEUED);
    //获取JobRunner
    final JobRunner runner = createJobRunner(node);
    this.logger.info("Submitting job '" + node.getNestedId() + "' to run.");
    try {
    //将JobRunner添加到线程池中,异步运行
        this.executorService.submit(runner);
        this.activeJobRunners.add(runner);
    } catch (final RejectedExecutionException e) {
        this.logger.error(e);
    }
}

这里有一个executorService,可以看到它就是一个线程池:

上边代码中我们可以看到,azkaban将我们的任务封装成了JobRunner对象,并异步提交线程,所以接下来我们要从azkaban.execapp.JobRunner#run着手:

/**
 * The main run thread.
 */
@Override
public void run() {
  try {
    doRun();
  } catch (final Exception e) {
    serverLogger.error("Unexpected exception", e);
    throw e;
  }
}

可以看到这里执行了一个doRun()方法,我们继续追踪:

private void doRun() {
//设置线程名称
  Thread.currentThread().setName(
      "JobRunner-" + this.jobId + "-" + this.executionId);

  // If the job is cancelled, disabled, killed. No log is created in this case
  //再次判断任务状态,如果任务被取消,任务处于无效状态或者被强制杀掉,就直接结束
  if (handleNonReadyStatus()) {
    return;
  }
//创建Attachment文件以及相关日志服务
  createAttachmentFile();
  createLogger();
  boolean errorFound = false;
  // Delay execution if necessary. Will return a true if something went wrong.
  errorFound |= delayExecution();

  // For pipelining of jobs. Will watch other jobs. Will return true if
  // something went wrong.
  errorFound |= blockOnPipeLine();

  // Start the node.
  //开始执行节点任务
  this.node.setStartTime(System.currentTimeMillis());
  Status finalStatus = this.node.getStatus();
  //往MySQL中插入一条数据,该方法主体后面会讲到
  uploadExecutableNode(); 
  if (!errorFound && !isKilled()) {
  //启动一个监听器
    fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));

//做一些任务执行前的准备工作,构建Job对象,这个prepareJob后面也会重点分析一下
    final Status prepareStatus = prepareJob();
    // 任务正常的情况下,这里返回的是RUNNING。
    // 如果任务中断 或者 取消等其他情况,prepareStatus 返回null,具体可看下边对于prepareJob()方法的解析
    if (prepareStatus != null) {
      // Writes status to the db
      //将状态持久化到数据库中
      writeStatus();
      fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
          new EventData(prepareStatus, this.node.getNestedId())));
       
// 终于到最后的任务实际执行了,具体方法分析在后边
      finalStatus = runJob();
    } else {
      finalStatus = changeStatus(Status.FAILED);
      logError("Job run failed preparing the job.");
    }
  }
  this.node.setEndTime(System.currentTimeMillis());

  if (isKilled()) {
    // even if it's killed, there is a chance that the job failed is marked as
    // failure,
    // So we set it to KILLED to make sure we know that we forced kill it
    // rather than
    // it being a legitimate failure.
    finalStatus = changeStatus(Status.KILLED);
  }

  logInfo(
      "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
          + " with status " + this.node.getStatus());

  try {
    finalizeLogFile(this.node.getAttempt());
    finalizeAttachmentFile();
    writeStatus();
  } finally {
    // note that FlowRunner thread does node.attempt++ when it receives the JOB_FINISHED event
    fireEvent(Event.create(this, EventType.JOB_FINISHED,
        new EventData(finalStatus, this.node.getNestedId())), false);
  }
}

如上 doRun() 方法里边有3个比较重要的方法:uploadExecutableNode()prepareJob()runJob() ,下面分别介绍一下这三个方法:

uploadExecutableNode()
可以追溯到以下方法:
可以看出azkaban在任务执行前会先在mysql中插入一条任务的相关元数据信息

public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
    throws ExecutorManagerException {
  final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
      + "(exec_id, project_id, version, flow_id, job_id, start_time, "
      + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";

  byte[] inputParam = null;
  if (inputProps != null) {
    try {
      final String jsonString =
          JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
      inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
    } catch (final IOException e) {
      throw new ExecutorManagerException("Error encoding input params");
    }
  }

  final ExecutableFlow flow = node.getExecutableFlow();
  final String flowId = node.getParentFlow().getFlowPath();
  logger.info("Uploading flowId " + flowId);
  try {
    this.dbOperator.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
        flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
        node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
        inputParam, node.getAttempt());
  } catch (final SQLException e) {
    throw new ExecutorManagerException("Error writing job " + node.getId(), e);
  }
}

azkaban.execapp.JobRunner#prepareJob

private Status prepareJob() throws RuntimeException {
 // Check pre conditions
 //再次判断条件
 if (this.props == null || this.isKilled()) {
   logError("Failing job. The job properties don't exist");
   return null;
 }
//判断条件
 final Status finalStatus;
 synchronized (this.syncObject) {
   if (this.node.getStatus() == Status.FAILED || this.isKilled()) {
     return null;
   }
//打印开始执行日志
   logInfo("Starting job " + this.jobId + getNodeRetryLog() + " at " + this.node.getStartTime());

   // If it's an embedded flow, we'll add the nested flow info to the job
   // conf 配置信息
   if (this.node.getExecutableFlow() != this.node.getParentFlow()) {
     final String subFlow = this.node.getPrintableId(":");
     this.props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
   }
// 以下方法是将任务的元数据参数 以及 JVM 参数 放入JobRunner对象的props中
   insertJobMetadata();
   insertJVMAargs();

   this.props.put(CommonJobProperties.JOB_ID, this.jobId);
   this.props.put(CommonJobProperties.JOB_ATTEMPT, this.node.getAttempt());
   this.props.put(CommonJobProperties.JOB_METADATA_FILE,
       createMetaDataFileName(this.node));
   this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
   this.props.put(CommonJobProperties.JOB_LOG_FILE, this.logFile.getAbsolutePath());
//将运行状态改为RUNNING状态
   finalStatus = changeStatus(Status.RUNNING);

   // Ability to specify working directory 工作目录配置
   if (!this.props.containsKey(AbstractProcessJob.WORKING_DIR)) {
     this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
   }
//提交任务的用户,先判断是否设置有代理用户及代理用户的权限检查,否则就是默认提交用户
   if (this.props.containsKey(JobProperties.USER_TO_PROXY)) {
     final String jobProxyUser = this.props.getString(JobProperties.USER_TO_PROXY);
     if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
       final String permissionsPageURL = getProjectPermissionsURL();
       this.logger.error("User " + jobProxyUser
           + " has no permission to execute this job " + this.jobId + "!"
           + " If you want to execute this flow as " + jobProxyUser
           + ", please add it to Proxy Users under project permissions page: " +
           permissionsPageURL);
       return null;
     }
   } else {
     final String submitUser = this.getNode().getExecutableFlow().getSubmitUser();
     this.props.put(JobProperties.USER_TO_PROXY, submitUser);
     this.logger.info("user.to.proxy property was not set, defaulting to submit user " +
         submitUser);
   }

   try {
   // 构建Job对象,这个job会在azkaban.execapp.JobRunner#runJob方法中执行
     this.job = this.jobtypeManager.buildJobExecutor(this.jobId, this.props, this.logger);
   } catch (final JobTypeManagerException e) {
     this.logger.error("Failed to build job type", e);
     return null;
   }
 }
// 同时 返回任务的状态,这里正常情况下,返回的应该为RUNNING
 return finalStatus;
}

azkaban.execapp.JobRunner#runJob
JobRunner的runJob()方法:

private Status runJob() {
 Status finalStatus;
 try {
 //前边的prepareJob()方法,构建的job对象
   this.job.run();
   //获取任务状态
   finalStatus = this.node.getStatus();
 } catch (final Throwable e) {
   synchronized (this.syncObject) {
     if (this.props.getBoolean("job.succeed.on.failure", false)) {
       finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
       logError("Job run failed, but will treat it like success.");
       logError(e.getMessage() + " cause: " + e.getCause(), e);
     } else {
       if (isKilled() || this.node.getStatus() == Status.KILLED) {
         finalStatus = Status.KILLED;
         logError("Job run killed!", e);
       } else {
         finalStatus = changeStatus(Status.FAILED);
         logError("Job run failed!", e);
       }
       logError(e.getMessage() + " cause: " + e.getCause());
     }
   }
 }

 if (this.job != null) {
   this.node.setOutputProps(this.job.getJobGeneratedProperties());
 }

 synchronized (this.syncObject) {
   // If the job is still running (but not killed), set the status to Success.
   if (!Status.isStatusFinished(finalStatus) && !isKilled()) {
     finalStatus = changeStatus(Status.SUCCEEDED);
   }
 }
 return finalStatus;
}

上边方法中执行到了this.job.run();,这里就要分情况了,job的类型不同,执行的方法也不一样,我们这里以最简单的commandtype为例:

azkaban.jobExecutor.ProcessJob

如下,当typecommand时,这里的job就是ProcessJob的实例:

ProcessJob的run方法如下:

  @Override
  public void run() throws Exception {
    try {
      resolveProps();
    } catch (final Exception e) {
      handleError("Bad property definition! " + e.getMessage(), e);
    }

    if (this.getSysProps().getBoolean(MEMCHECK_ENABLED, true)
        && this.getJobProps().getBoolean(AZKABAN_MEMORY_CHECK, true)) {
      final Pair<Long, Long> memPair = getProcMemoryRequirement();
      final long xms = memPair.getFirst();
      final long xmx = memPair.getSecond();
      // retry backoff in ms
      final String oomMsg = String
          .format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
              xms, xmx, getId());
      int attempt;
      boolean isMemGranted = true;

      //todo HappyRay: move to proper Guice after this class is refactored.
      final SystemMemoryInfo memInfo = SERVICE_PROVIDER.getInstance(SystemMemoryInfo.class);
      for (attempt = 1; attempt <= Constants.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
        isMemGranted = memInfo.canSystemGrantMemory(xmx);
        if (isMemGranted) {
          info(String.format("Memory granted for job %s", getId()));
          if (attempt > 1) {
            this.commonMetrics.decrementOOMJobWaitCount();
          }
          break;
        }
        if (attempt < Constants.MEMORY_CHECK_RETRY_LIMIT) {
          info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s",
              TimeUnit.MILLISECONDS.toSeconds(
                  Constants.MEMORY_CHECK_INTERVAL_MS), attempt,
              Constants.MEMORY_CHECK_RETRY_LIMIT));
          if (attempt == 1) {
            this.commonMetrics.incrementOOMJobWaitCount();
          }
          synchronized (this) {
            try {
              this.wait(Constants.MEMORY_CHECK_INTERVAL_MS);
            } catch (final InterruptedException e) {
              info(String
                  .format("Job %s interrupted while waiting for memory check retry", getId()));
            }
          }
          if (this.killed) {
            this.commonMetrics.decrementOOMJobWaitCount();
            info(String.format("Job %s was killed while waiting for memory check retry", getId()));
            return;
          }
        }
      }

      if (!isMemGranted) {
        this.commonMetrics.decrementOOMJobWaitCount();
        handleError(oomMsg, null);
      }
    }

    List<String> commands = null;
    try {
      commands = getCommandList();
    } catch (final Exception e) {
      handleError("Job set up failed: " + e.getMessage(), e);
    }

    final long startMs = System.currentTimeMillis();

    if (commands == null) {
      handleError("There are no commands to execute", null);
    }

    info(commands.size() + " commands to execute.");
    final File[] propFiles = initPropsFiles();

    // change krb5ccname env var so that each job execution gets its own cache
    final Map<String, String> envVars = getEnvironmentVariables();
    envVars.put(KRB5CCNAME, getKrb5ccname(this.getJobProps()));

    // determine whether to run as Azkaban or run as effectiveUser,
    // by default, run as effectiveUser
    String executeAsUserBinaryPath = null;
    String effectiveUser = null;
    final boolean isExecuteAsUser = this.getSysProps().getBoolean(EXECUTE_AS_USER, true);

    //Get list of users we never execute flows as. (ie: root, azkaban)
    final Set<String> blackListedUsers = new HashSet<>(
        Arrays.asList(
            this.getSysProps()
                .getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS, "root,azkaban")
                .split(",")
        )
    );

    // nativeLibFolder specifies the path for execute-as-user file,
    // which will change user from Azkaban to effectiveUser
    if (isExecuteAsUser) {
      final String nativeLibFolder = this.getSysProps().getString(AZKABAN_SERVER_NATIVE_LIB_FOLDER);
      executeAsUserBinaryPath = String.format("%s/%s", nativeLibFolder, "execute-as-user");
      effectiveUser = getEffectiveUser(this.getJobProps());
      // Throw exception if Azkaban tries to run flow as a prohibited user
      if (blackListedUsers.contains(effectiveUser)) {
        throw new RuntimeException(
            String.format("Not permitted to proxy as '%s' through Azkaban", effectiveUser)
        );
      }
      // Set parent directory permissions to <uid>:azkaban so user can write in their execution directory
      // if the directory is not permissioned correctly already (should happen once per execution)
      if (!canWriteInCurrentWorkingDirectory(effectiveUser)) {
        info("Changing current working directory ownership");
        assignUserFileOwnership(effectiveUser, getWorkingDirectory());
      }
      // Set property file permissions to <uid>:azkaban so user can write to their prop files
      // in order to pass properties from one job to another
      for (final File propFile : propFiles) {
        info("Changing properties files ownership");
        assignUserFileOwnership(effectiveUser, propFile.getAbsolutePath());
      }
    }

    for (String command : commands) {
      AzkabanProcessBuilder builder = null;
      if (isExecuteAsUser) {
        command =
            String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
                command);
        info("Command: " + command);
        builder =
            new AzkabanProcessBuilder(partitionCommandLine(command))
                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
                .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
                .setEffectiveUser(effectiveUser);
      } else {
        info("Command: " + command);
        builder =
            new AzkabanProcessBuilder(partitionCommandLine(command))
                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
      }

      if (builder.getEnv().size() > 0) {
        info("Environment variables: " + builder.getEnv());
      }
      info("Working directory: " + builder.getWorkingDir());

      // print out the Job properties to the job log.
      this.logJobProperties();

      synchronized (this) {
        // Make sure that checking if the process job is killed and creating an AzkabanProcess
        // object are atomic. The cancel method relies on this to make sure that if this.process is
        // not null, this block of code which includes checking if the job is killed has not been
        // executed yet.
        if (this.killed) {
          info("The job is killed. Abort. No job process created.");
          return;
        }
        this.process = builder.build();
      }
      try {
        this.process.run();
        this.success = true;
      } catch (final Throwable e) {
        for (final File file : propFiles) {
          if (file != null && file.exists()) {
            file.delete();
          }
        }
        throw new RuntimeException(e);
      } finally {
        info("Process completed "
            + (this.success ? "successfully" : "unsuccessfully") + " in "
            + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
      }
    }

    // Get the output properties from this job.
    generateProperties(propFiles[1]);
  }

当然后面还有更细一层的分析:
this.process = builder.build();以及this.process.run();,基本上看了源码就能看懂了,这里也就不再提了。

标签:node,Status,info,Flow,Azkaban,job,源码,null,final
来源: https://blog.51cto.com/u_15278282/2933648

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有