ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

spark源码(七)Worker receive 方法

2022-09-12 23:04:30  阅读:275  来源: 互联网

标签:case val receive send 源码 masterRef master spark dir


receive 方法其实是大量的case,分别对应处理不同的场景

    case msg: RegisterWorkerResponse 
    case SendHeartbeat
    case WorkDirCleanup
    case MasterChanged
    case ReconnectWorker
    case LaunchExecutor
    case executorStateChanged: ExecutorStateChanged
    case KillExecutor(masterUrl, appId, execId)
    case LaunchDriver(driverId, driverDesc, resources_)
    case KillDriver(driverId)
    case driverStateChanged @ DriverStateChanged(driverId, state, exception)
    case ReregisterWithMaster
    case ApplicationFinished(id)
    case DecommissionWorker
    case WorkerSigPWRReceived

一. RegisterWorkerResponse 详解

    private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
        msg match {
            case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) =>
                val preferredMasterAddress = if (preferConfiguredMasterAddress) {
                  masterAddress.toSparkURL
                } else {
                  masterRef.address.toSparkURL
                }
                if (duplicate) {
                  logWarning(s"Duplicate registration at master $preferredMasterAddress")
                }    
                logInfo(s"Successfully registered with master $preferredMasterAddress")
                registered = true
                changeMaster(masterRef, masterWebUiUrl, masterAddress)/*更新master信息*/
                forwardMessageScheduler.scheduleAtFixedRate(
                  () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
                  0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)/*启动一个定时任务  开始心跳*/
                if (CLEANUP_ENABLED) {
                  logInfo(
                    s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
                  forwardMessageScheduler.scheduleAtFixedRate(
                    () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
//给自己发送一个清理目录的消息??这是干嘛的
                    CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
                }
                val execs = executors.values.map { e =>
                  new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
                }
                //给master发送一个当前状态的信息
                masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
            case RegisterWorkerFailed(message) =>
                if (!registered) {
                  logError("Worker registration failed: " + message)
                  System.exit(1)
                }
            case MasterInStandby => // Ignore. Master not yet ready.
        }
    }

二. SendHeartbeat 详解

    if (connected) { sendToMaster(Heartbeat(workerId, self)) }

    private def sendToMaster(message: Any): Unit = {
        master match {
          case Some(masterRef) => masterRef.send(message)//给master发送一个心跳信息
          case None =>
            logWarning(
              s"Dropping $message because the connection to master has not yet been established")
        }
    }

三. WorkDirCleanup 详解

    //所有的executors + drivers 目录
    val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
    try {
      val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
        val appDirs = workDir.listFiles()
        if (appDirs == null) {
          throw new IOException("ERROR: Failed to list files in " + appDirs)
        }
        appDirs.filter { dir =>
          val appIdFromDir = dir.getName
          val isAppStillRunning = appIds.contains(appIdFromDir)
          //当前是一个目录,并且不运行了,APP_DATA_RETENTION_SECONDS 是过了这个时间就清理目录配置项
          dir.isDirectory && !isAppStillRunning &&
            !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
        }.foreach { dir =>
          logInfo(s"Removing directory: ${dir.getPath}")
          Utils.deleteRecursively(dir)
          if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
              conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
            //是移除shuffle服务中的文件的   并不是是清理所用文件的
            //我也记得是任务kill的时候会看到清理目录的日志的
            shuffleService.applicationRemoved(dir.getName)
          }
        }
      }(cleanupThreadExecutor)

      cleanupFuture.failed.foreach(e =>
        logError("App dir cleanup failed: " + e.getMessage, e)
      )(cleanupThreadExecutor)
    } catch {
      case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
        logWarning("Failed to cleanup work dir as executor pool was shutdown")
    }

四. MasterChanged 详解

    logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
    //worker节点只是 被动接受主节点改变的事实  改变自身的配置即可
    //不存在 消息发送
    changeMaster(masterRef, masterWebUiUrl, masterRef.address)

    val executorResponses = executors.values.map { e =>
      WorkerExecutorStateResponse(new ExecutorDescription(
        e.appId, e.execId, e.cores, e.state), e.resources)
    }
    val driverResponses = drivers.keys.map { id =>
      WorkerDriverStateResponse(id, drivers(id).resources)}
    //把当前任务的每个状态给 master汇报一下就行了
    masterRef.send(WorkerSchedulerStateResponse(
      workerId, executorResponses.toList, driverResponses.toSeq))

五. ReconnectWorker 详解

    registerWithMaster() //这个方法上面有介绍的

标签:case,val,receive,send,源码,masterRef,master,spark,dir
来源: https://www.cnblogs.com/wuxiaolong4/p/16687610.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有