ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Spark源码——通信环境——通信原理、通信组件

2022-02-18 13:02:16  阅读:139  来源: 互联网

标签:val create bindAddress 通信 源码 conf new Spark port


组件通信

Driver=>Executor

Executor=>Driver

Executor=>Executor

 

Netty:通信框架

到一个饭馆吃饭

BIO:要一份蛋炒饭,老板说前面还有十个人,那我等一会,不干其他的事

NIO:要一份蛋炒饭,老板说前面还有十个人,一个人五分钟,我等不了先告诉老板我五十分钟后来拿,我先去干别的事

AIO:不要去饭店找老板订餐了,打个电话给老板,老板说人多待会给你送过去,约定好送的地点,然后我先去干别的事(性能最好)

 

Linux对AIO支持不好,Windows支持较好

Linux为了模拟AIO异步操作采用Epoll操作

 

通信组件

SparkContext.scala

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)

SparkEnv.scala

private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!")
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get(DRIVER_PORT)
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

点击create函数
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

点击create函数
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)

占用一个端口号提供服务
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
NettyRpcEnv.scala
创建服务器
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

点击createServer函数
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, this.rpcHandler, bootstraps);
}
 

至此,Driver TransportServer 服务器已经准备好了

server = transportContext.createServer(bindAddress, port, bootstraps)

下面需要RpcEndpoint 终端

dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

RpcEndpoint作用是啥呢,点进去

RpcEndpoint.scala

中有

def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

由此可见,该终端是用来作接收的

还有

final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

点进去看看RpcEndpointRef是做什么用的?

RpcEndpointRef.scala

def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   *
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

有很多ask函数,由此可见是用来作发送的。

 

接收数据的话,就有收件箱的概念

收件箱在Dispatcher.scala的

  new DedicatedMessageLoop(name, e, this)函数中
var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }

进入该函数就可以看到inbox

private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher)
  extends MessageLoop(dispatcher) {

  private val inbox = new Inbox(name, endpoint)

除了收件箱,也需要发件箱

在NettyRpcEnv.scala中new出来的,而且和RpcAddress绑定,有多个发件箱

private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

Driver看完了

回头再看Executor,在CoarseGrainedExecutorBackend.scala中,创建ExecutorEnv

val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

进入createExecutorEnv函数

val env = create(
      conf,
      executorId,
      bindAddress,
      hostname,
      None,
      isLocal,
      numCores,
      ioEncryptionKey
    )

再进入create函数

val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

再进入环境创建函数

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

再进入create函数

val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }

又出现startServer,说明跟Driver一样,TransportServer、RpcEndpoint(接收)、RpcEndpoint(发送)、收件箱、发件箱,Executor也都有

同样的进行发件的也有相应的客户端,在outbox.scala中,就创建了TransportClient对象

private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {

  outbox => // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  private val messages = new java.util.LinkedList[OutboxMessage]

  @GuardedBy("this")
  private var client: TransportClient = null

然后客户端和Server建立连接,互相发送数据

 

标签:val,create,bindAddress,通信,源码,conf,new,Spark,port
来源: https://www.cnblogs.com/luostudy/p/15908170.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有