spark core源码分析16 Shuffle详解-读流程,sparkshuffle
spark core源码分析16 Shuffle详解-读流程,sparkshuffle
博客地址: http://blog.csdn.net/yueqian_zhu/
shuffle的读流程也是从compute方法开始的
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
目前来说,不管是sortShuffleManager还是hashShuffleManager,getReader方法返回的都是HashShuffleReader。
接着调用read方法,如下:
/** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer) val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort ordering defined. dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) sorter.iterator case None => aggregatedIter } }
该方法首先调用了fetch方法,介绍一下
1、在task运行那节介绍过,shuffleMapTask运行完成后,会将shuffleId及mapstatus的映射注册到mapOutputTracker中
2、fetch方法首先尝试在本地mapstatuses中查找是否有该shuffleId的信息,有则本地取;否则想master的mapOutputTracker请求并读取,返回块管理器的地址和对应partition的文件长度
3、然后根据我们得到的shuffleId等信息去remote或者local通过netty/nio读取,返回一个迭代器
4、返回的迭代器中的数据并不是全部在内存中的,读取时会根据配置的内存最大值来读取。内存不够的话,下一个待读取
fetch方法返回一个迭代器后,根据是否mapSideCombine来区分时候需要将读取到的数据进行合并操作。合并过程与写流程类似,内存放不下就写入本地磁盘。
如果还需要keyOrdering的,new一个ExternalSorter进行外部排序。之后也是同shuffle写流程的insertAll。
版权声明:本文为博主原创文章,未经博主允许不得转载。
评论暂时关闭