Spark的IndexedRDD,SparkIndexedRDD


昨天又看到了IndexedRDD,趁着恰好有时间就好好看了一下原理,顺便拿出来分享一下。

原理简介

这个问题的地址:https://issues.apache.org/jira/browse/SPARK-2365  目测后面会在正式版中放出来。

IndexedRDD的设计目的是为了解决下面这些问题:

1.join操作效率低下。

2.更新或删除RDD中某条记录需要全部复制一遍。

3.查找某条记录需要在整个RDD中扫描一遍。

4.直接在每个分区里存一个一个key-value pair很浪费。

这三个问题其实都还比较好理解就不多做解释了。

影响join效率最大的因素一个是shuffle,另外一个是本地的aggregate。join有两种,见下图:


这张图很熟悉了,可以看到如果inputs是co-partitioned 的,那么不会有shuffle动作,否则就会有shuffle。所以只要能够确保co-partitioned就能避免shuffle,但本地aggregate(要在本地把key-value建立一个hashtable才能开始join)还是无法避免。

对于这个问题,机智的spark开发人员想出一个办法:干嘛分区里面一定要存一个一个key-value pair呢,我直接存个hashtable不就完了么。这样一来,配合co-partitioned,join性能大幅度提高;另外也顺便解决了第3点问题和第4点问题。

现在问题就剩下第二点了。因为RDD是只读的,是不可更改的,这样的设计是为了保证数据一致性,简化不必要的锁机制。所以当发生update或者delete时你不可能直接在原先数据上操作,根本不能去修改原先的数据内容,那么以前的做法就是从原本的数据中copy一份出来改或者删了。

为了解决这个问题,spark的开发人员再次机智地对这个hashtable方案进行了改进:

class IndexedRDDPartition[V] {
  type Id = Long
  val index: OpenHashSet[Id] // implemented as an array with
  // hash probing
  val values: Array[V] // parallel to index
  val mask: BitSet 
  // parallel to index and values
  ...
}

上面这段代码组成了一个自定义的hashtable所需要的存储结构,那帮人称之为Local Hash Tables。稍微解释一下:

index是用来存key的,IndexedRDD规定所有的数据的Key必须是Long类型的,而且是唯一的。index实际上是个数组,每个key的存储位置是hash得到的。

values顾名思义就是用来在对应位置存值的。

mask平常情况下对应的bit位是被置为1的,如果某个位需要改变,比如要被删掉,那么对应的bit就被置为0。

以上三个东西物理上都是数组,这构成了一个不可修改的hashtable的基础,用它构建成RDD依然还是全量拷贝一份,于是这里用一个ImmutableVector这个结构来替代数组:


这实际上是一颗度为32的树,叶子上是长度为32的数组,由于整数最大4个字节,所以整棵树的深度不会超过7。如果现在发生某个节点要被删除的情况会发生什么事情呢,看下图:


只需要把该元素所在的数组copy一份就好了,其他数据根!本!不!用!动!   就是这么帅气!

好了,现在用这个数据结构替换掉之前的数组,那么前面第二个难点也就解决了。


代价

某些方面的性能提升总归要付出点代价的,具体如下:

1.对于得到指定位置的元素来说,在原本数组里面只要访问一次内存就可以了,而现在由于是树型结构,所以要多次访问内存,这会增加时间开销。

2.对于那种线性全量扫描,性能也必然有所下降。

3.由于这棵树除了叶子节点保存数据以外,树的中间节点也需要存储,这会增加存储空间。


性能测试

下面性能测试数据是开发人员的数据,我还没有自己测过:

测试条件:RDD A 8M的元素,RDDB 8个元素。

普通PairedRDD 做Join耗时11.8s,用IndexedRDD耗时0.126s,提高了94倍;


另外对于1M的元素,数据查询速度从47.9ms减少到了609ns,性能提高了70倍;数据更新从59.7ms减少到了1.03us,性能提高了60倍;数据删除从59.8ms减少到了557us,性能提高100倍。


差不多就是这样啦~


相关内容