Spark技术内幕:Worker源码与架构解析,sparkworker
Spark技术内幕:Worker源码与架构解析,sparkworker
首先通过一张Spark的架构图来了解Worker在Spark中的作用和地位:
Worker所起的作用有以下几个:
1. 接受Master的指令,启动或者杀掉Executor
2. 接受Master的指令,启动或者杀掉Driver
3. 报告Executor/Driver的状态到Master
4. 心跳到Master,心跳超时则Master认为Worker已经挂了不能工作了
5. 向GUI报告Worker的状态
说白了,Worker就是整个集群真正干活的。首先看一下Worker重要的数据结构:
val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner]
这些Hash Map存储了名字和实体时间的对应关系,方便通过名字直接找到实体进行调用。
看一下如何启动Executor:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } } catch { case e: Exception => { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } } }
1行到3行是验证该命令是否发自一个合法的Master。7到10行定义了一个ExecutorRunner,实际上系统并没有一个类叫做Executor,我们所说的Executor实际上是由ExecutorRunner实现的,这个名字起得也比较贴切。11行将新建的executor放到上面提到的Hash Map中。然后12行启动这个Executor。13行和14行将现在已经使用的core和memory进行的统计。15到17行实际上是向Master报告Executor的状态。这里需要加锁。
如果在这过程中有异常抛出,那么需要check是否是executor已经加到Hash Map中,如果有则首先停止它,然后从Hash Map中删除它。并且向Master report Executor是FAILED的。Master会重新启动新的Executor。
接下来看一下Driver的Hash Map的使用,通过KillDriver:
case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } }
这个KillDirver的命令实际上由Master发出的,而Master实际上接收了Client的kill driver的命令。这个也可以看出Scala语言的简洁性。
多级目录的编译顺序也是有Makefile控制的, 如下这个例子(3个子目录, 一个Makefile), 子目录的编译顺序由这个Makefile控制.
sub_a sub_b sub_c Makefile
限元
有限元法(FEA,Finite Element Analysis)的基本概念是用较简单的问题代替复杂问题后再求解。它将求解域看成是由许多称为有限元的小的互连子域组成,对每一单元假定一个合适的(较简单的)近似解,然后推导求解这个域总的满足条件(如结构的平衡条件),从而得到问题的解。这个解不是准确解,而是近似解,因为实际问题被较简单的问题所代替。由于大多数实际问题难以得到准确解,而有限元不仅计算精度高,而且能适应各种复杂形状,因而成为行之有效的工程分析手段。
英文:Finite Element 有限单元法是随着电子计算机的发展而迅速发展起来的一种现代计算方法。它是50年代首先在连续体力学领域--飞机结构静、动态特性分析中应用的一种有效的数值分析方法,随后很快广泛的应用于求解热传导、电磁场、流体力学等连续性问题。 有限元法分析计算的思路和做法可归纳如下:
编辑本段1) 物体离散化
将某个工程结构离散为由各种单元组成的计算模型,这一步称作单元剖分。离散后单元与单元之间利用单元的节点相互连接起来;单元节点的设置、性质、数目等应视问题的性质,描述变形形态的需要和计算进度而定(一般情况单元划分越细则描述变形情况越精确,即越接近实际变形,但计算量越大)。所以有限元中分析的结构已不是原有的物体或结构物,而是同新材料的由众多单元以一定方式连接成的离散物体。这样,用有限元分析计算所获得的结果只是近似的。如果划分单元数目非常多而又合理,则所获得的结果就与实际情况相符合。
编辑本段2) 单元特性分析
A、 选择位移模式 在有限单元法中,选择节点位移作为基本未知量时称为位移法;选择节点力作为基本未知量时称为力法;取一部分节点力和一部分节点位移作为基本未知量时称为混合法。位移法易于实现计算自动化,所以,在有限单元法中位移法应用范围最广。 当采用位移法时,物体或结构物离散化之后,就可把单元总的一些物理量如位移,应变和应力等由节点位移来表示。这时可以对单元中位移的分布采用一些能逼近原函数的近似函数予以描述。通常,有限元法我们就将位移表示为坐标变量的简单函数。这种函数称为位移模式或位移函数。 B、 分析单元的力学性质 根据单元的材料性质、形状、尺寸、节点数目、位置及其含义等,找出单元节点力和节点位移的关系式,这是单元分析中的关键一步。此时需要应用弹性力学中的几何方程和物理方程来建立力和位移的方程式,从而导出单元刚度矩阵,这是有限元法的基本步骤之一。 C、 计算等效节点力 物体离散化后,假定力是通过节点从一个单元传递到另一个单元。但是,对于实际的连续体,力是从单元的公共边传递到另一个单元中去的。因而,这种作用在单元边界上的表面力、体积力和集中力都需要等效的移到节点上去,也就是用等效的节点力来代替所有作用在单元上的力。
编辑本段3) 单元组集
利用结构力的平衡条件和边界条件把各个单元按原来的结构重新连接起来,形成整体的有限元方程 (1-1) 式中,K是整体结构的刚度矩阵;q是节点位移列阵;f是载荷列阵。
编辑本段4) 求解未知节点位移
解有限元方程式(1-1)得出位移。这里,可以根据方程组的具体特点来选择合适的计算方法。 通过上述分析,可以看出,有限单元法的基本思想是"一分一合",分是为了就进行单元分析,合则为了对整体结构进行综合分析。 有限元的发展概况 1943年 courant在论文中取定义在三角形域上分片连续函数,利用最小势能原理研究St.Venant的扭转问题。 1960年......余下全文>>
评论暂时关闭