Spark技术内幕:Stage划分及提交源码分析,sparkstage


当触发一个RDD的action后,以count为例,调用关系如下:

其中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实现就一目了然了:

  /**
   * The main event loop of the DAG scheduler.
   */
  def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。

    case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。
      // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) => // 取消整个Job Group
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled => //取消所有Job
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) => // TaskScheduler得到一个Executor被添加的消息。具体来自org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) => //来自TaskScheduler
      dagScheduler.handleExecutorLost(execId)

    case BeginEvent(task, taskInfo) => // 来自TaskScheduler
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion
      dagScheduler.resubmitFailedStages()
  }

总结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较复杂功能接口化。


handleJobSubmitted

org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted首先会根据RDD创建finalStage。finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满足一下条件,那么它将以本地模式运行:

1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition

3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

要理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种Task:

ShuffleMapTask根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组ResultTask。

在用户触发了一个action后,比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就开始在运行在集群上了。

一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。


创建finalStage

handleJobSubmitted 通过调用newStage来创建finalStage:

finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

创建一个result stage,或者说finalStage,是通过调用org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。 

private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val id = nextStageId.getAndIncrement()
    val stage =
      new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

对于result 的final stage来说,传入的shuffleDep是None。

我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:


一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。

上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。

从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:

// 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] //存储parent stage
    val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage
          }
        }
      }
    }
    waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
    while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。
      visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
    }
    parents.toList
  }

生成了finalStage后,就需要提交Stage了。

  // 提交Stage,如果有parent Stage没有提交,那么递归提交它。
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) { // 有parent stage为完成,则递归提交它
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }


DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。


接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。


BTW,最近工作太忙了,基本上到家洗漱完都要10点多。也再没有精力去进行源码解析了。幸运的是周末不用加班。因此以后的博文更新都要集中在周末了。加油。



彩色图像分割源码

你说的是什么东西的彩色图像分割源码啊?说清楚了我帮你查
=======================================
OpenCV学习笔记(三)人脸检测的代码分析

OpenCV学习笔记(三)人脸检测的代码分析
一、预备知识:
1、动态内存存储及操作函数
CvMemStorage
typedef struct CvMemStorage
{
struct CvMemBlock* bottom;/* first allocated block */
struct CvMemBlock* top; /* the current memory block - top of the stack */
struct CvMemStorage* parent; /* borrows new blocks from */
int block_size; /* block size */
int free_space; /* free space in the top block (in bytes) */
} CvMemStorage;
内存存储器是一个可用来存储诸如序列,轮廓,图形,子划分等动态增长数据结构的底层结构。它是由一系列以同等大小的内存块构成,呈列表型 ---bottom 域指的是列首,top 域指的是当前指向的块但未必是列尾.在bottom和top之间所有的块(包括bottom, 不包括top)被完全占据了空间;在 top和列尾之间所有的块(包括块尾,不包括top)则是空的;而top块本身则被占据了部分空间 -- free_space 指的是top块剩余的空字节数。新分配的内存缓冲区(或显示的通过 cvMemStorageAlloc 函数分配,或隐示的通过 cvSeqPush, cvGraphAddEdge等高级函数分配)总是起始于当前块(即top块)的剩余那部分,如果剩余那部分能满足要求(够分配的大小)。分配后,free_space 就减少了新分配的那部分内存大小,外加一些用来保存适当列型的附加大小。当top块的剩余空间无法满足被分配的块(缓冲区)大小时,top块的下一个存储块被置为当前块(新的top块) -- free_space 被置为先前分配的整个块的大小。如果已经不存在空的存储块(即:top块已是列尾),则必须再分配一个新的块(或从parent那继承,见 cvCreateChildMemStorage)并将该块加到列尾上去。于是,存储器(memory storage)就如同栈(Stack)那样, bottom指向栈底,(top, free_space)对指向栈顶。栈顶可通过 cvSaveMemStoragePos保存,通过 cvRestoreMemStoragePos 恢复指向, 通过 cvClearStorage 重置。
CvMemBlock
内存存储块结构
typedef struct CvMemBlock
{
struct CvMemBlock* prev;
struct CvMemBlock* next;
} CvMemBlock;
CvMemBlock 代表一个单独的内存存储块结构。 内存存储块中的实际数据存储在 header块 之后(即:存在一个头指针 head 指向的块 header ,该块不存储数据),于是,内存块的第 i 个字节可以通过表达式 ((char*)(mem_block_p......余下全文>>
 

寻找一篇软件文档需分析的文章,要英汉对照各位大虾速度帮忙

程序文档合一与动态文档
很多企业已经建立了许多庞大的计算机管理系统,而且将不断地推出新的系统。满足经营的需求须不断维护、改造计算机系统,但同时又要不影响现行生产,所以必须建立一整套机制来评价、控制和完成对系统的维护。在软件维护过程中,提出程序与文档合一的概念在软件开发的同时建立动态文档。

程序与文档合一概念的提出

一、目前软件的状况

程序与文档的形式分离,不仅是用各自独立的形式存放,而且使用不同的工具在不同的时间里书写和检索。维护程序时不能方便地得到文档的帮助,不能同步修改文档。

程序与文档的内容分离,由于程序与文档采用不同的描述,既有计算机语言也有自然语言。维护过程中不能及时、一致地更新文档或程序,使文档不能准确地描述程序而几乎成为废纸甚至带来负面价值。

软件开发与维护的分离,绝大多数软件在设计、开发时不太考虑以后可能的修改,加大了软件维护的难度,而且使维护容易引入新的错误。

这些分离也表现在设计、开发的不同阶段的文档之间的不相容性,例如:需求分析说明书是纸上的东西,在概要设计阶段不能很好地继承、利用需求分析说明书,设计、编制概要设计时必须从零开始,需要重新分析、理解需求分析,这种思维上的脱节,不仅延缓开发进度、加重设计人员的负担,而且由于理解上的不同导致不同阶段描述的对象有许多不相容情况。这些分离使得文档在系统的设计、开发、维护中的作用下降,这也是很多软件人员不愿意编写文档的主要原因。

二、程序与文档合一的概念提出

怎样才是好的文档系统呢?应当具备以下属性:
1. 能够准确地描述软件、并且简单易懂;
2. 能够迅速错误定位、影响分析、修正设计;
3. 能够提高软件维护质量;
4. 能够方便程序修改时理解程序。

为此提出了程序与文档合一的概念。这概念使软件成为真正意义上的软件:程序+文档,程序就是文档,文档集成在程序中。它要求在选择开发环境时不仅要考虑环境对设计、开发的完美支持,而且要考虑对维护、文档的支持;它要求软件人员在设计、开发过程中要考虑维护问题、文档问题;它要求程序与文档存储在同一位置、同一系统中;它要求使用相同工具进行程序与文档的书写、检索;它要求在编写和维护程序的同时形成文档,在书写文档时编写、维护程序。程序与文档合一的概念不仅存在于系统的设计、开发阶段而且存在于系统的维护阶段,它贯穿软件的生命周期。

动态文档系统是建立在程序与文档合一的概念基础上的、文档与程序一致的、简单易懂的联机文档系统。它包括构件说明与数据描述、对构件与构件之间、构件与数据之间的关系进行的描述。动态文档系统是提高了文档的可用性、易用性和连贯性,使文档更加有效,是解决维护问题的有效途径。

动态文档系统问题分析

需要解决的问题是:软件文档的内容划分与获取、文档的存储与维护、文档的检索、软件文档的生成打印。

一、软件文档的内容划分成:语义文档、结构文档、过程文档

语义文档是对软件的功能、概念、总体设计、流程、规约等用自然语言的描述,是软件人员根据规范在使用CASE工具编写并填入程序的文档,它也是为更全面的解释文档而灵活加入的额外信息。

结构文档是在软件设计工具、开发环境中对象的属性、构件间接口、构件间引用关系、软件的结构等的描述。利用词法、语法分析程序对整个系统的对象、构件进行识别、分析,获取上述描述并形成表格文件。

过程文档是对软件的设计、编码、维护过程中形成的过程描述和程序注释,如设计目的、设计人、时间等说明,利用开发环境对软件人员在设计、开发、维护过程中操作的记......余下全文>>
 

相关内容