spark core源码分析15 Shuffle详解-写流程,sparkshuffle



博客地址: http://blog.csdn.net/yueqian_zhu/


Shuffle是一个比较复杂的过程,有必要详细剖析一下内部写的逻辑

ShuffleManager分为SortShuffleManager和HashShuffleManager

一、SortShuffleManager

每个ShuffleMapTask不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个本地文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。

它在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题


/** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
      : ShuffleWriter[K, V] = {
    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
    shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
    new SortShuffleWriter(
      shuffleBlockResolver, baseShuffleHandle, mapId, context)
  }

shuffleMapNumber是一个HashMap<shuffleId,numMaps>

SortShuffleWriter提供write接口用于真实数据的写磁盘,而在write接口中会使用shuffleBlockResolver与底层文件打交道


下面看获得SortShuffleWriter之后,调用write进行写

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
write的参数其实就是调用了rdd的compute方法进行计算,返回的这个partition的迭代器
/** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

可以看到,设置了mapSideCombine的需要将aggregator和keyOrdering传入到ExternalSorter中,否则将上面两项参数设为None。接着调用insertAll方法

def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    } else if (bypassMergeSort) {
      // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
      if (records.hasNext) {
        spillToPartitionFiles(
          WritablePartitionedIterator.fromIterator(records.map { kv =>
            ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
          })
        )
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }
解释一下内部逻辑:

(1)  如果是shouldCombine,将k-v信息记录到一个Array中,默认是大小是64*2,存储格式为key0,value0,key1,value1,key2,value2...。map.changeValue方法就是将value的值不断的调用mergeValue方法,去更新array中指定位置的value值。如果k-v的量达到array size的0.7时,会自动扩容。

之后调用maybeSpillCollection,首先判断是否需要spill,依据是开启spillingEnabled标志(不开启有OOM风险,其实上面rehash扩容的时候就应该是有OOM风险了),且读取的元素是32的整数倍,且目前占用的内存大于设置的阀值(5M),就去向shuffleMemoryManager申请内存(shuffleMemoryManager中有一个阀值,每个shuffle task向他申请时都会记录一下),申请的容量是当前使用容量的2倍减去阀值(5M),如果申请成功就增加阀值。如果目前内存占用量还是大于新的阀值,则必须要进行spill了,否则认为内存还够用。真正spill操作之后,释放刚才从shuffleMemoryManager中申请的内存以及还原阀值到初始值(5M)。

spill方法:如果partition数量<=200,且没有设置map端的combine,就调用spillToPartitionFiles方法,否则调用spillToMergeableFile方法,之后会讲到。

所以在这个分支而言,我们是shouldCombine的,所以调用的是spillToMergeFile方法。

需要注意的是,在spill之前,我们是有一个数据结构来保存数据的,有map和buffer可选择。由于shouldCombine是有可能去更新数据的,即调用我们的mergeValue方法之类的,所以我们用map。

(2)  如果是bypassMergeSort(partition数量<=200,且没有设置map端的combine),调用的是spillToPartitionFiles方法。这种模式直接写partition file,就没有缓存这一说了。

(3)  如果是 非shouldCombine,非bypassMergeSort,这里因为我们不需要merge操作,直接使用buffer作为spill前的缓存结构。之后调用maybeSpillCollection方法。


看一下spillToMergeableFile方法:

(1) 在localDirs下面的子目录下创建一个写shuffle的文件

(2) 对缓存中的数据进行排序,原则是按partitionID和partition内的key排序,得到的数据格式为((partitionId_0,key_0),value_0),((partitionId_0,key_1),value_1)......((partitionId_100,key_100),value_100)。

(3) 逐步往文件里写,每写10000个,sync一把。同时保存一个spilledFile的结构在内存中。

也就是说,一个map task,每次spill都生成一个文件(因为有可能一个map task有多次spill),文件内有序。

这样,一次spill就完成了。


看一下spillToPartitionFiles方法:

每个map task对每一个reduce 分区都建立一个不同的文件,也不需要排序。


insertAll方法介绍完了,接着往下介绍。

根据shuffleId+ mapId信息创建data文件,调用writePartitionedFile方法:

(1) 如果之前是bypassMergeSort,即调用的是spillToPartitionFiles,就把剩余的buffer中的信息写到指定的reduce分区对应的文件。然后将所有的输出文件合并成一个data文件

(2) 如果内存中没有spilledFile的信息,即全部的信息都在内存中,就直接写到data文件即可

(3) 否则,也是最复杂的情况,将这个map task输出的所有文件,按partition进行整合到一个data文件中,格式大概为(partition0,这个map task中分区0的全部数据),(partition1,这个map task中分区1的全部数据)......


需要注意的是,(2)和(3)的情况写到一个data文件中时,每个partition在data文件中的的大小是记录下来的。

创建data文件相对应的index文件,index文件记录了data文件中的每个partition的起始offset。可以想象,记录了每个partition的offset,其实就是知道了每个partition在data文件中哪一部分。

最后将shuffleServerId(记录了host,port、executorId),每个partition的文件length封装成mapStatus返回。


二、HashShuffleManager

Spark在每一个Mapper中为每个Reducer创建一个bucket,并将RDD计算结果放进bucket中。每一个bucket拥有一个DiskObjectWriter,每个write handler拥有一个buffer size,使用这个write handler将Map output写入文件中。也就是说Map output的key-value pair是逐个写入到磁盘而不是预先把所有数据存储在内存中在整体flush到磁盘中去,这样对于内存的压力会小很多。当然,同时运行的map数受限于资源,所以所需内存大概为cores*reducer num*buffer size。但是,当reduce数量和map数量很大的时候,所需的内存开销也是惊人的。

hashShuffleManager写的流程相对而言就简单很多了

 /** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem._1, elem._2)
    }
  }
(1) 如果定义了mapSideCombine,同上insertAll方法中的shouldCombine分支类似,对k-v进行合并处理。否则就不做处理。

(2) 然后将所有的k-v计算需要输出到哪个分区,逐个写入指定的分区文件中。

这种模式自然不需要排序,merge等复杂操作,因为最终每个map task对每一个reduce分区输出一个文件。

最后还是同样组装成一个mapStatus结构返回。

至此,shuffle的写流程就介绍结束了。

下一节介绍shuffle的读流程。


版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容