spark core源码分析9 从简单例子看action操作,sparkcore


上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作

首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算;

而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并。所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中

/**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
首先介绍一下handleJobSubmitted方法的参数

finalRDD:触发该action之前的RDD

func:对于每个分区中的元素执行的函数

partitions:分区号Array

listener:这里指JobWaiter

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    }
    submitWaitingStages()
  }

这是一个比较重要的过程,先讲finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)

这里有一个stage的概念。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理,这一组可以同时运行的Task就组成了一个Stage。偷一个官方的图。


finalRDD为参数构建一个ResultStage

private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
解释一下getParentStagesAndId内部的处理逻辑:从finalRDD开始,查找它的所有依赖中的shuffle依赖,如果是普通依赖,则继续往前找,直到找到shuffle依赖为止。这样,就能获取到与finalRDD相邻的所有shuffle依赖。在上图中,即是groupBy和join两个操作产生的依赖。
得到这些shuffle依赖之后,再往前获取整个job所有shuffle依赖,并以shuffle依赖为边界创建ShuffleMapStage,将每个shuffleId注册到mapOutputTracker中,它是跟踪每个shuffleMapStage输出的位置等信息。
在newResultStage方法中,getParentStagesAndId只返回与finalRDD最近的stage
之后再通过父stages,分区数目,stageId,finalRDD,jobId等构建ResultStage。将jobId保存到所有stage的jobIds成员中。一个stage还能有多个jobId???
至此,finalStage的建设就完成了。

接着创建了ActiveJob,它只是将那些参数信息封装起来,并有一个成员记录每个partition是否完成。

最后就是调用submitStage将finalStage提交

/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    //waitingStages的意思是它还有依赖的父stage还没执行完成时,会先放进这里
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      //如果没有父stage未完成,则提交本身的stage
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {//如果还有未完成的父stage,则递归调用submitStage,先提交父stage,把自己放进waitingStages中
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id)
  }
}

submitMissingTasks代码解析见注释

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
  logDebug("submitMissingTasks(" + stage + ")")
  // Get our pending tasks and remember them in our pendingTasks entry
  stage.pendingTasks.clear()

  // First figure out the indexes of partition ids to compute.
  //如果是ShuffleMapStage,计算这个stage中哪些分区是需要计算的。如果某个分区计算完成了,则会向该stage中记录
  //该分区的MapStatus。所以这里返回的是需要计算的分区号
  //如果是ResultStage,返回这个stage中标记是未完成的分区号
  val partitionsToCompute: Seq[Int] = {
    stage match {
      case stage: ShuffleMapStage =>
        (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
      case stage: ResultStage =>
        val job = stage.resultOfJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
    }
  }

  val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull

  runningStages += stage
  // SparkListenerStageSubmitted should be posted before testing whether tasks are
  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  // event.
  stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
  outputCommitCoordinator.stageStart(stage.id)
  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  // the serialized copy of the RDD and for each task we will deserialize it, which means each
  // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  // might modify state of objects referenced in their closures. This is necessary in Hadoop
  // where the JobConf/Configuration object is not thread-safe.
  //将Task执行所要用到的数据序列化,再进行广播出去,在Executor端真正执行时反序列化
  //下面说的很清楚了,对于ShuffleMapTask而言,包括rdd和shuffle的依赖;对于ResultStage而言,包括rdd和执行函数
  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
      case stage: ResultStage =>
        closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
    }

    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    // In the case of a failure during serialization, abort the stage.
    case e: NotSerializableException =>
      abortStage(stage, "Task not serializable: " + e.toString)
      runningStages -= stage

      // Abort execution
      return
    case NonFatal(e) =>
      abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
      runningStages -= stage
      return
  }
  //对参数中的stage类型的不同,构建不同的tasks,每个分区new一个ShuffleMapTask或者ResultTask
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = getPreferredLocs(stage.rdd, id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, taskBinary, part, locs)
        }

      case stage: ResultStage =>
        val job = stage.resultOfJob.get
        partitionsToCompute.map { id =>
          val p: Int = job.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = getPreferredLocs(stage.rdd, p)
          new ResultTask(stage.id, taskBinary, part, locs, id)
        }
    }
  } catch {
    case NonFatal(e) =>
      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
      runningStages -= stage
      return
  }

  //注意,这里将所有的tasks放进stage的pendingTasks中,之后每完成一个任务就删除一个。最后将这些tasks,stageId,attemptId,jobId等信息封装成TaskSet,调用taskScheduler.submitTasks进行任务调度
  if (tasks.size > 0) {
    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    stage.pendingTasks ++= tasks
    logDebug("New pending tasks: " + stage.pendingTasks)
    taskScheduler.submitTasks(
      new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    // Because we posted SparkListenerStageSubmitted earlier, we should mark
    // the stage as completed here in case there are no tasks to run
    markStageAsFinished(stage, None)

    val debugString = stage match {
      case stage: ShuffleMapStage =>
        s"Stage ${stage} is actually done; " +
          s"(available: ${stage.isAvailable}," +
          s"available outputs: ${stage.numAvailableOutputs}," +
          s"partitions: ${stage.numPartitions})"
      case stage : ResultStage =>
        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }
    logDebug(debugString)
  }
}
看Standalone下的taskSchedulerImpl的submitTasks方法
<pre name="code" class="java">override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    //这里创建一个TaskSetManager,用来管理这个taskset整个生命周期。在新建这个manager时,会根据我们设置的
preferredLocations放进各种不同本地性的HashMap中,作为之后调度的依据。
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    activeTaskSets(taskSet.id) = manager
    //我们之前章节讲过,pool是用来调度taskset的,调度的顺序就是依靠实际的builder来管理的(FIFO/FAIR)。这里就是往调度池中放入一个taskset
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      //间隔15s之后启动定时器,如果还没有启动任务,发出警告;如果启动任务了,关闭定时器
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  backend.reviveOffers()//见下面分析
}
最重要的是调用了reviveOffers,实际最终还是调用了CoarseGrainedSchedulerBackend的makeOffers方法
// Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

makeOffers方法看似简单,实则处理的逻辑非常多。我们先看里面的resourceOffers,再看外面的launchTasks

1、resourceOffers

将之前注册上来的每个Executor包装成WorkerOffer。

他的参数其实就是Executor的一个list,我们的任务就是下发到这些Executor上去执行

/**
 * Called by cluster manager to offer resources on slaves. We respond by asking our active task
 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
 * that tasks are balanced across the cluster.
 */
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  var newExecAvail = false
  for (o <- offers) {
    executorIdToHost(o.executorId) = o.host
    activeExecutorIds += o.executorId
    if (!executorsByHost.contains(o.host)) {
      executorsByHost(o.host) = new HashSet[String]()
      executorAdded(o.executorId, o.host)
      newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
  }

  // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
  //将所有的这些Executor随机化,
  val shuffledOffers = Random.shuffle(offers)
  // Build a list of tasks to assign to each worker.
  //针对每一个Executor,新建一个ArrayBuffer存放TaskDescription。因为每个Executor上运行不止一个任务
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  //每个Executor上剩余的cores
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  //根据配置的调度模式,从中取出一组taskSet。因为没有依赖关系的多个taskset是可以并发运行的。
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  // of locality levels so that it gets a chance to launch local tasks on all of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  //这部分代码是根据规则分配任务。
  //这里根据PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY的先后顺序,依次在Executor上分配task,最后将形成一个tasks{Seq[Seq[TaskDescription]]结构},第一个Seq是Executor的序号,下一个Seq是在每个Executor上分配的tasks信息
  var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      //针对一个maxLocality,在每个executor上分配一个task;再针对下一个maxLocality,。。。
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true//这里看到,hasLaunchTask被置为true了,那前面间隔15s的定时器就可以关闭了
  }
  return tasks
}
这样,resourceOffers就介绍完了。之后将调用launchTasks将上面的tasks启动起来。
2、launchTasks
这部分的介绍见代码注释,主要的工作还是将任务序列化,之后发送到Executor端执行

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //这里将TaskDescription进行序列化,内容包含ExecutorId,task index等。之前是将每个task的rdd及依赖或者方法序列化,注意区分。
    val serializedTask = ser.serialize(task)
    //如果序列化之后的大小超出限制,abort
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
      scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
            "spark.akka.frameSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
            AkkaUtils.reservedSizeBytes)
          taskSet.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {//向Executor发送LaunchTask消息
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}
查看Executor端收到LaunchTask消息之后的逻辑

case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    //将TaskDescription反序列化出来
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    //这里的参数taskDesc.serializedTask就是第一次序列化的rdd及依赖或者执行的方法的结果和执行该task是需要的第三方jar包等
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }
查看executor.launchTask,创建TaskRunner,之后从线程池中取线程运行
def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
    serializedTask)
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}
至此,我们了解了action动作触发之后的处理流程。
下一节介绍具体task运行的流程以及获取结果。

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容