Spark技术内幕:Client,Master和Worker 通信源码解析


Spark的Cluster Manager可以有几种部署模式:

在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算。具体阐述可以阅读《Spark:大数据的电花火石! 》。

那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢?


1. node之间的RPC - akka

模块间通信有很多成熟的实现,现在很多成熟的Framework已经早已经让我们摆脱原始的Socket编程了。简单归类,可以归纳为基于消息的传递和基于资源共享的同步机制。

基于消息的传递的机制应用比较广泛的有Message Queue。Message Queue, 是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ和RabbitMQ(AMQP的开源实现,现在由Pivotal维护)。

还有不得不提的是ZeroMQ,一个致力于进入Linux内核的基于Socket的编程框架。官方的说法: “ZeroMQ是一个简单好用的传输层,像框架一样的一个socket library,它使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。

Spark在很多模块之间的通信选择是Scala原生支持的akka,一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。akka有以下5个特性:

在Spark中的Client,Master和Worker实际上都是一个actor,拿Client来说:

import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
  var masterActor: ActorSelection = _
  val timeout = AkkaUtils.askTimeout(conf)

  override def preStart() = {
    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))

    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")

    driverArgs.cmd match {
      case "launch" =>
        ...
        masterActor ! RequestSubmitDriver(driverDescription)

      case "kill" =>
        val driverId = driverArgs.driverId
        val killFuture = masterActor ! RequestKillDriver(driverId)
    }
  }

  override def receive = {

    case SubmitDriverResponse(success, driverId, message) =>
      println(message)
      if (success) pollAndReportStatus(driverId.get) else System.exit(-1)

    case KillDriverResponse(driverId, success, message) =>
      println(message)
      if (success) pollAndReportStatus(driverId) else System.exit(-1)

    case DisassociatedEvent(_, remoteAddress, _) =>
      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
      System.exit(-1)

    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
      println(s"Cause was: $cause")
      System.exit(-1)
  }
}

/**
 * Executable utility for starting and terminating drivers inside of a standalone cluster.
 */
object Client {
  def main(args: Array[String]) {
    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
    println("Use ./bin/spark-submit with \"--master spark://host:port\"")

    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
      conf.set("spark.akka.logLifecycleEvents", "true")
    }
    conf.set("spark.akka.askTimeout", "10")
    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
    val (actorSystem, _) = AkkaUtils.createActorSystem(
      "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

    actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

    actorSystem.awaitTermination()
  }
}

其中第19行的含义就是向Master提交Driver的请求,

masterActor ! RequestSubmitDriver(driverDescription)

而Master将在receive里处理这个请求。当然了27行到44行的是处理Client Actor收到的消息。

可以看出,通过akka,可以非常简单高效的处理模块间的通信,这可以说是Spark RPC的一大特色。


2. Client,Master和Workerq启动通信详解

源码位置:spark-1.0.0\core\src\main\scala\org\apache\spark\deploy。主要涉及的类:Client.scala, Master.scala和Worker.scala。这三大模块之间的通信框架如下图。

Standalone模式下存在的角色:

实际上,Master和Worker要处理的消息要比这多得多,本图只是反映了集群启动和向集群提交运算时候的主要消息处理。

接下来将分别走读这三大角色的源码。


2.1 Client源码解析

Client启动:

object Client {
  def main(args: Array[String]) {
    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
    println("Use ./bin/spark-submit with \"--master spark://host:port\"")

    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
      conf.set("spark.akka.logLifecycleEvents", "true")
    }
    conf.set("spark.akka.askTimeout", "10")
    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
    val (actorSystem, _) = AkkaUtils.createActorSystem(
      "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    // 使用ClientActor初始化actorSystem
    actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
    //启动并等待actorSystem的结束
    actorSystem.awaitTermination()
  }
}

从行21可以看出,核心实现是由ClientActor实现的。Client的Actor是akka.Actor的一个扩展。对于Actor,从它对recevie的override就可以看出它需要处理的消息。

  override def receive = {

    case SubmitDriverResponse(success, driverId, message) =>
      println(message)
      if (success) pollAndReportStatus(driverId.get) else System.exit(-1)

    case KillDriverResponse(driverId, success, message) =>
      println(message)
      if (success) pollAndReportStatus(driverId) else System.exit(-1)

    case DisassociatedEvent(_, remoteAddress, _) =>
      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
      System.exit(-1)

    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
      println(s"Cause was: $cause")
      System.exit(-1)
  }


2.2 Master的源码分析

源码分析详见注释。

 override def receive = {
    case ElectedLeader => {
      // 被选为Master,首先判断是否该Master原来为active,如果是那么进行Recovery。
    }
    case CompleteRecovery => completeRecovery() // 删除没有响应的worker和app,并且将所有没有worker的Driver分配worker
    case RevokedLeadership => {
      // Master将关闭。
    }
    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {      
      // 如果该Master不是active,不做任何操作,返回
      // 如果注册过该worker id,向sender返回错误
      sender ! RegisterWorkerFailed("Duplicate worker ID")
      // 注册worker,如果worker注册成功则返回成功的消息并且进行调度
      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
      schedule()
      // 如果worker注册失败,发送消息到sender
      sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
    }
    case RequestSubmitDriver(description) => {
        // 如果master不是active,返回错误
        sender ! SubmitDriverResponse(false, None, msg)
        // 否则创建driver,返回成功的消息
        sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
      }
    }
    case RequestKillDriver(driverId) => {
      if (state != RecoveryState.ALIVE) {
        // 如果master不是active,返回错误
        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
        sender ! KillDriverResponse(driverId, success = false, msg)
      } else {
        logInfo("Asked to kill driver " + driverId)
        val driver = drivers.find(_.id == driverId)
        driver match {
          case Some(d) =>
              //如果driver仍然在等待队列,从等待队列删除并且更新driver状态为KILLED
            } else {
              // 通知worker kill driver id的driver。结果会由workder发消息给master ! DriverStateChanged
              d.worker.foreach { w => w.actor ! KillDriver(driverId) }
            }
            // 注意,此时driver不一定被kill,master只是通知了worker去kill driver。
            sender ! KillDriverResponse(driverId, success = true, msg)
          case None =>
            // driver已经被kill,直接返回结果
            sender ! KillDriverResponse(driverId, success = false, msg)
        }
      }
    }
    case RequestDriverStatus(driverId) => {
      // 查找请求的driver,如果找到则返回driver的状态
      (drivers ++ completedDrivers).find(_.id == driverId) match {
        case Some(driver) =>
          sender ! DriverStatusResponse(found = true, Some(driver.state),
            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
        case None =>
          sender ! DriverStatusResponse(found = false, None, None, None, None)
      }
    }
    case RegisterApplication(description) => {
        //如果是standby,那么忽略这个消息
        //否则注册application;返回结果并且开始调度
    }
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      // 通过idToApp获得app,然后通过app获得executors,从而通过execId获得executor
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => {
          exec.state = state
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          if (ExecutorState.isFinished(state)) {
            val appInfo = idToApp(appId)
            // Remove this executor from the worker and app
            logInfo("Removing executor " + exec.fullId + " because it is " + state)
            appInfo.removeExecutor(exec)
            exec.worker.removeExecutor(exec)
           }
      }
    }
    case DriverStateChanged(driverId, state, exception) => {
      //  如果Driver的state为ERROR | FINISHED | KILLED | FAILED, 删除它。
    }
    case Heartbeat(workerId) => {
      // 更新worker的时间戳 workerInfo.lastHeartbeat = System.currentTimeMillis()
    }
    case MasterChangeAcknowledged(appId) => {
      //  将appId的app的状态置为WAITING,为切换Master做准备。
      }
    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
      // 通过workerId查找到worker,那么worker的state置为ALIVE,
      // 并且查找状态为idDefined的executors,并且将这些executors都加入到app中,
      // 然后保存这些app到worker中。可以理解为Worker在Master端的Recovery
      idToWorker.get(workerId) match {
        case Some(worker) =>
          logInfo("Worker has been re-registered: " + workerId)
          worker.state = WorkerState.ALIVE

          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
          for (exec <- validExecutors) {
            val app = idToApp.get(exec.appId).get
            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
            worker.addExecutor(execInfo)
            execInfo.copyState(exec)
          }
          // 将所有的driver设置为RUNNING然后加入到worker中。
          for (driverId <- driverIds) {
            drivers.find(_.id == driverId).foreach { driver =>
              driver.worker = Some(worker)
              driver.state = DriverState.RUNNING
              worker.drivers(driverId) = driver
            }
          }
      }
    }
    case DisassociatedEvent(_, address, _) => {
      // 这个请求是Worker或者是App发送的。删除address对应的Worker和App
      // 如果Recovery可以结束,那么结束Recovery      
    }
    case RequestMasterState => {
      //向sender返回master的状态
      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)
    }
    case CheckForWorkerTimeOut => {
      //删除超时的Worker
    }
    case RequestWebUIPort => {
      //向sender返回web ui的端口号
      sender ! WebUIPortResponse(webUi.boundPort)
    }
  }


2.3 Worker 源码解析

通过对Client和Master的源码解析,相信你也知道如何去分析Worker是如何和Master进行通信的了,没错,答案就在下面:

override def receive

参考资料:

Spark源码1.0.0。


请您支持:如果您读到这里,相信本文对您有所帮助,请点击投票支持一下吧。如果您已经在投票页面,请点击下面的投一票吧!

BTW,即使您没有CSDN的帐号,可以使用第三方登录的,包括微博,QQ,Gmail,GitHub,百度,等。

相关内容