Spark编程指南V1.4.0(翻译),编程指南v1.4.0


Spark编程指南V1.4.0

·        简介

·        接入Spark

·        Spark初始化

        ·        使用Shell

        ·        在集群上部署代码

·        弹性分布式数据集

        ·        并行集合(Parallelized Collections)

        ·        其他数据集

        ·        RDD的操作

                 ·        基础操作

                 ·        Spark传递函数

                 ·        处理键值对

                 ·        转换

                 ·        动作

        ·        RDD的持久化

                 ·        存储级别的选择

                 ·        移除数据

·        共享变量

        ·        广播变量

        ·        累加器

·        部署到一个集群上

·        单元测试

·        1.0之前版本的Spark迁移

·        下一步该怎么做

 

简介

总的来说,每一个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 (resilientdistributed dataset,RDD),它是一个元素集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统)上的一个文件开始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。

Spark的第二个抽象概念是共享变量(shared variables),可以在并行操作中使用。在默认情况下,Spark通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。Spark支持两种类型的共享变量:广播变量,可以在内存的所有的结点上缓存变量;累加器:只能用于做加法的变量,例如计数或求和。

本指南将用每一种Spark支持的语言来展示这些特性。这都是很容易来跟着做的如果你启动了Spark的交互式Shell或者Scalabin/spark-shell或者Pythonbin/pyspark

 

接入Spark

Scala

Spark1.2.1需要和Scala2.10一起使用。如果你要用Scala来编写应用,你需要用一个相应版本的Scala(例如2.10.X)。

要写一个Spark应用程序,你需要在添加SparkMaven依赖,Spark可以通过Maven中心库来获得:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.2.1

除此之外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本,添加一个hadoop-client的依赖。一些通用的HDFS版本标签在第三方发行版页面列出。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,你需要将一些Spark的类和隐式转换导入到你的程序中。通过如下语句:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

 

Java

Spark1.2.1需要运行在Java6及更高版本上。如果你正在使用Java8Spark支持使用Lambda表达式简洁地编写函数,或者你可以使用在org.apache.spark.api.java.function包中的类。

要使用Java编写Spark应用程序,你需要添加一个Spark的依赖。Spark可以通过Maven中心库获得:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.2.1

此外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本,添加一个hadoop-client的依赖。一些通用的HDFS版本标签在第三方发行版页面列出。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,你需要将Spark的类导入到你的程序中。添加如下行:

importorg.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf

 

Python

Spark1.2.1需要和Python2.6或者更高的版本(但不是Python3)一起使用。它使用标准的CPython解释器,因此像NumPy这类的C语言库可以用。要用Python的方式运行Spark应用程序,可以使用在Spark目录下的bin/spark-submit脚本。这个脚本会装载SparkJavaScala库并允许你将程序提交到集群。你也可以使用bin/pyspark来启动一个交互式Python Shell

如果你想要访问HDFS数据,你需要根据你的HDFS版本使用一个PySpark的构建。一些通用的HDFS版本标签在第三方发行版页面列出。针对通用的HDFS版本的预先构建的包在Spark主页上也是可获得。

最后,你需要导入一些Spark相关的类到你的程序中。添加如下的行:

from pyspark import SparkContext, SparkConf

 

初始化Spark

Scala

Spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。要创建一个SparkContext你首先需要建立一个SparkConf对象,这个对象包含你的程序的信息。

每个JVM只能有一个活动的SparkContext。在创建一个新的SparkContext之前你必须stop()活动的SparkContext

val conf = newSparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也可以是一个如下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你可以通过“local”模式运行Spark

Java

Spark程序需要做的第一件事情,就是创建一个JavaSparkContext对象,它将告诉Spark如何访问一个集群。要创建一个SparkContext你首先需要建立一个SparkConf对象,这个对象包含你的程序的信息。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = newJavaSparkContext(conf);

appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也可以是一个如下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你可以通过“local”模式运行Spark

Python

Spark程序需要做的第一件事情,就是创建一个JavaSparkContext对象,它将告诉Spark如何访问一个集群。要创建一个SparkContext你首先需要建立一个SparkConf对象,这个对象包含你的程序的信息。

conf =SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也可以是一个如下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你可以通过“local”模式运行Spark

 

使用Shell

Scala

Spark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名叫做sc。创建自己的SparkContext将不会生效。你可以使用-master参数设置context连接到那个master,并且你可以使用-jars参数把用逗号分隔的一个jar包列表添加到classpath中。例如,如果在四核CPU上运行spark-shell,使用:

$ ./bin/spark-shell --master local[4]

或者,同时在classpath中加入code.jar,使用:

$ ./bin/spark-shell --master local[4] --jarscode.jar

想要获得完整的选项列表,运行spark-shell –help。在背后,spark-shell调用更一般的spark-submit脚本。

 

Python

PySpark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名叫做sc。创建自己的SparkContext将不会生效。你可以使用-master参数设置context连接到那个master,并且你可以使用—py-files参数把用逗号分隔的一个Python .zip.egg或者.py文件列表添加到classpath中。例如,如果在四核CPU上运行bin/pyspark,使用:

$ ./bin/pyspark --master local[4]

或者,同时将code.py添加到搜索路径中(为了以后使用import code),使用:

$ ./bin/pyspark --master local[4] --py-filescode.py

想要获得完整的选项列表,运行pyspark –help。在背后,pyspark调用更一般的spark-submit脚本。

也可以在IPython中启动Pyspark shell,一个增强的Python解释器。PySpark要使用IPython1.0.0及其之后的版本。要使用IPython,当运行bin/pyspark时要设置PYSPARK_DRIVER_PYTHON变量为ipython

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你可以通过设置PYSPARK_DRIVER_PYTHON_OPTS参数来自定义ipython命令。例如,启动有PyLab支持的IPython Notebook支持:

$PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook--pylab inline" ./bin/pyspark

 

弹性分布式数据集(RDDs

Spark围绕的概念是弹性分布式数据集(RDD),是一个有容错机制并可以被并行操作的元素集合。目前有两种创建RDDs的方法:并行化一个在你的驱动程序中已经存在的集合,或者引用在外部存储系统上的数据集,例如共享文件系统,HDFSHBase,或者任何以Hadoop输入格式提供的数据源。

并行集合

Scala

并行集合是通过调用SparkContextparallelize方法,在一个已经存在的集合上创建的(一个Scala Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面展示了怎样创建一个含有数字15的并行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦创建了分布式数据集(distData),就可以对其执行并行操作。例如,我们可以调用distData.reduce((a,b)=>a+b)来累加数组的元素。后续我们会进一步地描述对分布式数据集的操作。

并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每个分区数据起一个任务。典型情况下,你希望集群的每个CPU分布2-4个分区(partitions)。通常,Spark会尝试基于集群状况自动设置分区数。然而,你也可以进行手动设置,通过将分区数作为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。

 

Java

并行集合是通过对存在于驱动程序中的集合调用JavaSparkContextparallelize方法来构建的。构建时会拷贝集合中的元素,创建一个可以被并行操作的分布式数据集。例如,这里演示了如何创建一个包含数字15的并行集合:

List<Integer> data = Arrays.asList(1, 2,3, 4, 5);
JavaRDD<Integer> distData =sc.parallelize(data);

一旦创建了分布式数据集(distData),就可以对其执行并行操作。例如,我们可以调用distData.reduce((a,b)=>a+b)来累加数组的元素。后续我们会进一步地描述对分布式数据集的操作。

注意:在本指南中,我们会经常使用简洁地Java8lambda语法来指明Java函数,而在Java的旧版本中,你可以实现org.apache.spark.api.java.function包中的接口。下面我们将在把函数传递到Spark中描述更多的细节。

并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每个分区数据起一个任务。典型情况下,你希望集群的每个CPU分布2-4个分区(partitions)。通常,Spark会尝试基于集群状况自动设置分区数。然而,你也可以进行手动设置,通过将分区数作为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。

 

Python

并行集合是通过对存在于驱动程序中的迭代器(iterable)或集合(collection),调用SparkContextparallelize方法来构建的。构建时会拷贝迭代器或集合中的元素,创建一个可以被并行操作的分布式数据集。例如,这里演示了如何创建一个包含数字15的并行集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

一旦创建了分布式数据集(distData),就可以对其执行并行操作。例如,我们可以调用distData.reduce(lambda a,b:a+b)来累加列表的元素。后续我们会进一步地描述对分布式数据集的操作。

并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每个分区数据起一个任务。典型情况下,你希望集群的每个CPU分布2-4个分区(partitions)。通常,Spark会尝试基于集群状况自动设置分区数。然而,你也可以进行手动设置,通过将分区数作为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。

 

外部数据集

Scala

Spark可以从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFSCassandreHBaseAmazon S3等。Spark支持text filesSequence files,以及其他任何一种Hadoop InputFormat

Text file RDDs的创建可以使用SparkContexttextFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://s3n://URI)作为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:

scala> val distFile =sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

一旦创建完成,就可以在distFile上执行数据集操作。例如,要相对所有行的长度进行求和,我们可以通过如下的mapreduce操作来完成:

distFile.map(s => s.length).reduce((a, b)=> a + b)

Spark读文件时的一些注意事项:

1.         如果文件使用本地文件系统上的路径,那么该文件必须在工作节点的相同路径下也可以访问。可以将文件拷贝到所有的worker节点上,或者使用network-mounted共享文件系统。

2.         Spark的所有基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可以使用textFile”/my/directory”,textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)

3.         textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认情况下,Spark会为文件的每一个block创建一个分区,但是你也可以通过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。

除了text文件,SparkScala API也支持其他几种数据格式:

1.    SparkContext.wholeTextFiles可以让你读取包含多个小text文件的目录,并且每个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。

2.    对于Sequence文件,使用SparkContextsequenceFile[K,V]方法,其中KV分别对应文件中keyvalues的类型。这些类型必须是HadoopWritable接口的子类,如IntWritableText。另外,Spark允许你使用一些常见的Writables的原生类型;例如,sequenceFile[Int,String]会自动的转换为类型IntWritablesTexts

3.    对于其他的Hadoop InputFormats,你可以使用SparkContext.hadoopRDD方法,它可以接受一个任意类型的JobConf和输入格式类,key类和value类。像Hadoop Job设置输入源那样去设置这些参数即可。对基于“新”的MapReduce APIorg.apache.hadoop.mapreduce)的InputFormats,你也可以使用SparkContex.newHadoopRDD

4.    RDD.saveAsObjectFileSparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。虽然这不是一种像Avro那样有效的序列化格式,但是她提供了一种可以存储任何RDD的简单方式。

 

Java

Spark可以从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFSCassandreHBaseAmazon S3等。Spark支持text filesSequence files,以及其他任何一种Hadoop InputFormat

Text file RDDs的创建可以使用SparkContexttextFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://s3n://URI)作为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:

JavaRDD<String> distFile =sc.textFile("data.txt");

一旦创建完成,就可以在distFile上执行数据集操作。例如,要相对所有行的长度进行求和,我们可以通过如下的mapreduce操作来完成:

distFile.map(s -> s.length()).reduce((a, b)-> a + b)

Spark读文件时的一些注意事项:

1.    如果文件使用本地文件系统上的路径,那么该文件必须在工作节点的相同路径下也可以访问。可以将文件拷贝到所有的worker节点上,或者使用network-mounted共享文件系统。

2.    Spark的所有基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可以使用textFile(”/my/directory”),textFile(/my/directory/*.txt),和textFile(/my/directory/*.gz)

3.    textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认情况下,Spark会为文件的每一个block创建一个分区,但是你也可以通过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。

除了text文件,SparkJava API也支持其他几种数据格式:

1.    JavaSparkContext.wholeTextFiles可以让你读取包含多个小text文件的目录,并且每个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。

2.    对于Sequence文件,使用SparkContextsequenceFile[K,V]方法,其中KV分别对应文件中keyvalues的类型。这些类型必须是HadoopWritable接口的子类,如IntWritableText。另外,Spark允许你使用一些常见的Writables的原生类型;例如,sequenceFile[Int,String]会自动的转换为类型IntWritablesTexts

3.    对于其他的Hadoop InputFormats,你可以使用JavaSparkContext.hadoopRDD方法,它可以接受一个任意类型的JobConf和输入格式类,key类和value类。像Hadoop Job设置输入源那样去设置这些参数即可。对基于“新”的MapReduce APIorg.apache.hadoop.mapreduce)的InputFormats,你也可以使用JavaSparkContex.newHadoopRDD

4.    JavaRDD.saveAsObjectFileJavaSparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。虽然这不是一种像Avro那样有效的序列化格式,但是她提供了一种可以存储任何RDD的简单方式。

 

Python

PySpark可以从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFSCassandreHBaseAmazon S3等。Spark支持text filesSequence files,以及其他任何一种Hadoop InputFormat

Text file RDDs的创建可以使用SparkContexttextFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://s3n://URI)作为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:

>>> distFile =sc.textFile("data.txt")

一旦创建完成,就可以在distFile上执行数据集操作。例如,要相对所有行的长度进行求和,我们可以通过如下的mapreduce操作来完成:

distFile.map(lambda s: len(s)).reduce(lambda a,b: a + b)


Spark读文件时的一些注意事项:

1.    如果文件使用本地文件系统上的路径,那么该文件必须在工作节点的相同路径下也可以访问。可以将文件拷贝到所有的worker节点上,或者使用network-mounted共享文件系统。

2.    Spark的所有基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可以使用textFile(”/my/directory”),textFile(/my/directory/*.txt),和textFile(/my/directory/*.gz)

3.    textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认情况下,Spark会为文件的每一个block创建一个分区,但是你也可以通过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。

除了text文件,SparkPython  API也支持其他几种数据格式:

1.              JavaSparkContext.wholeTextFiles可以让你读取包含多个小text文件的目录,并且每个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。

2.              RDD.saveAsPickleFileSparkContext.pickleFile支持由pickled Python对象组成的简单格式保存RDD。使用批量的方式处理pickle模块的对象序列化,默认批处理大小为10.

3.              SequenceFileHadoop输入/输出格式

注意,此功能当前标识为试验性的,是为高级用户而提供的。在将来的版本中,可能会因为支持基于SparkSQL的读写而被取代,在这种情况下,SparkSQL是首选的方法。

 

Writable支持

PySparkSequenceFile支持加载Java中的键值(key-value)对RDD,可以将Writable转换为基本的Java类型,并且通过Pyrolite,在结果Java对象上执行pickles序列化操作。当将一个键值对的RDD保存为SequenceFIle时,PySpark会对其进行反操作。它会unpickles Python的对象为Java对象,然后再将它们转换为Writables。下表中的Writables会被自动地转换:

Writable Type

Python Type

Text

unicode str

IntWritable

int

FloatWritable

float

DoubleWritable

float

BooleanWritable

bool

BytesWritable

bytearray

NullWritable

None

MapWritable

dict

数组不支持开箱(out-of-the-box)处理。当读或写数组时,用户需要指定自定义的ArrayWritable子类。当写数组时,用户也需要指定自定义的转换器(converters),将数组转换为自定义的ArrayWritable子类。当读数组时,默认的转换器会将自定义的ArrayWritable子类转换为JavaObject[],然后被pickledPython的元组。如果要获取包含基本数据类型的数组,Pythonarray.array的话,用户需要为该数组指定自定义的转换器。

 

保存和加载SequenFiles

类似于text filesSequenceFiles可以被保存和加载到指定的路径下。可以指定keyvalue的类型,但对标准的Writables类型则不需要指定。

>>> rdd = sc.parallelize(range(1,4)).map(lambda x: (x, "a" * x ))
>>>rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

 

保存和加载其他的Hadoop输入/输出格式

PySpark也可以读任何Hadoop InputFormat或者写任何Hadoop OutputFormat,包括“新”和“旧”两个Hadoop MapReduce APIs。如果需要的话,可以将传递进来的一个Hadoop配置当成一个Python字典。这里有一个使用了Elasticsearch ESInputFormat的样例:

SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar./bin/pyspark
>>> conf = {"es.resource" :"index/type"}   # assumeElasticsearch is running on localhost defaults
>>> rdd =sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
   "org.apache.hadoop.io.NullWritable","org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that isconverted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
 u'field2': u'Some Text',
 u'field3': 12345})

注意,如果这个InputFormat只是简单地依赖于Hadoop配置和/或输入路径,以及keyvalue的类型,它就可以很容易地根据上面的表格进行转换,那么这种方法应该可以很好地处理这些情况。

如果你有一个定制序列化的二进制数据(比如加载自Cassandra/HBase的数据),那么你首先要做的,是在Scala/Java侧将数据转换为可以用Pyrolitepickler处理的东西。Converter特质提供了这一转换功能。简单地extend该特质,然后在convert方法中实现你自己的转换代码。记住要确保该类,以及访问你的InputFormat所需的依赖,都需要被打包到你的Spark作业的jar包,并且包含在PySpark的类路径中。

Python样例和Converter样例上给出了带自定义转换器的Cassandra/HBaseInputFormatOutputFormat的使用样例。

RDD操作

RDDs支持两种操作:转换(transformations),可以从已有的数据集创建一个新的数据集;而动作(actions),在数据集上运行计算后,会向驱动程序返回一个值。例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集来表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素聚合起来,并将最终结果返回给驱动程序(不过还有一个并行的reduceByKey,能返回一个分布式数据集)。

Spark中的所有转换都是惰性的,也就是说,它们并不会马上计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给驱动程序的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。例如,我们对map操作创建的数据集进行reduce操作时,只会向驱动返回reduce操作的结果,而不是返回更大的map操作创建的数据集。

默认情况下,每一个转换过的RDD都会在你对它执行一个动作时被重新计算。不过,你也可以使用持久化或者缓存方法,把一个RDD持久化到内存中。在这种情况下,Spark会在集群中保存相关元素,以便你下次查询这个RDD时,能更快速地访问。对于把RDDs持久化到磁盘上,或在集群中复制到多个节点也是支持的。

 

基础操作

Scala

为了描述RDD的基础操作,可以考虑下面的简单程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b)=> a + b)

第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操作:lines仅仅指向这个文件。第二行定义了lineLengths作为map转换结果。此外,由于惰性,不会立即计算lineLengths。最后,我们运行reduce,这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并运行在独立的机器上,并且每台机器运行它自己的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。

如果我们希望以后可以复用lineLengths,可以添加:

lineLengths.persist()

reduce之前,这将导致lineLengths在第一次被计算之后,被保存在内存中。

 

Java

为了描述RDD的基础操作,可以考虑下面的简单程序:

JavaRDD<String> lines =sc.textFile("data.txt");
JavaRDD<Integer> lineLengths =lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b)-> a + b);

第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操作:lines仅仅指向这个文件。第二行定义了lineLengths作为map转换结果。此外,由于惰性,不会立即计算lineLengths。最后,我们运行reduce,这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并运行在独立的机器上,并且每台机器运行它自己的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。

如果我们希望以后可以复用lineLengths,可以添加:

lineLengths.persist();

reduce之前,这将导致lineLengths在第一次被计算之后,被保存在内存中。

 

Python

为了描述RDD的基础操作,可以考虑下面的简单程序:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a+ b)

第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操作:lines仅仅指向这个文件。第二行定义了lineLengths作为map转换结果。此外,由于惰性,不会立即计算lineLengths。最后,我们运行reduce,这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并运行在独立的机器上,并且每台机器运行它自己的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。

如果我们希望以后可以复用lineLengths,可以添加:

lineLengths.persist()

reduce之前,这将导致lineLengths在第一次被计算之后,被保存在内存中。


把函数传递到Spark

Scala

SparkAPI,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。这有两种推荐的实现方式:

使用匿名函数的语法,这可以用来替换简短的代码。

●使用全局单例对象的静态方法。比如,你可以定义函数对象objectMyFunctions,然后传递该对象的方法MyFunction.func1,如下所示:

object MyFunctions {
  deffunc1(s: String): String = { ... }
}
 
myRdd.map(MyFunctions.func1)

注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象),在传递方法的时候,应该同时传递包含该方法的对象。比如,考虑:

class MyClass {
  deffunc1(s: String): String = { ... }
  defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

这里,如果我们创建了一个类实例new MyClass,并且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func1方法,所以需要将整个对象传递到集群中。类似于写成:rdd.map(x=>this.func1(x))

类似地,访问外部对象的字段时将引用整个对象:

class MyClass {
  valfield = "Hello"
  defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

等同于写成rdd.map(x=>this.field+x),引用了整个this。为了避免这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  valfield_ = this.field
 rdd.map(x => field_ + x)
}

 

Java

SparkAPI,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。在Java中,函数由那些实现了org.apache.spark.api.java.function包中的接口的类表示。有两种创建这样的函数的方式:

在你自己的类中实现Function接口,可以是匿名内部类,或者命名类,并且传递类的一个实例到Spark

●在Java8中,使用lambda表达式来简明地定义函数的实现。

为了保持简洁性,本指南中大量使用了lambda语法,这在长格式中很容易使用所有相同的APIs。比如,我们可以把上面的代码写成:

JavaRDD<String> lines =sc.textFile("data.txt");
JavaRDD<Integer> lineLengths =lines.map(new Function<String, Integer>() {
  publicInteger call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(newFunction2<Integer, Integer, Integer>() {
  publicInteger call(Integer a, Integer b) { return a + b; }
});

或者,如果不方便编写内联函数的话,可以写成:

class GetLength implements Function<String,Integer> {
  publicInteger call(String s) { return s.length(); }
}
class Sum implements Function2<Integer,Integer, Integer> {
  publicInteger call(Integer a, Integer b) { return a + b; }
}
 
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths =lines.map(new GetLength());
int totalLength = lineLengths.reduce(newSum());

注意,Java中的匿名内部类也可以访问封闭域中的变量,只要这些变量标识为final即可。Spark会像处理其他语言一样,将这些变量拷贝到每个工作节点上。

 

Python

SparkAPI,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。有三种推荐方法可以使用:

使用Lambda表达式来编写可以写成一个表达式的简单函数(Lambdas不支持没有返回值的多语句函数或表达式)。

●Spark调用的函数中的Local defs,可以用来代替更长的代码。

●模块中的顶级函数。

例如,如果想传递一个支持使用lambda表达式的更长的函数,可以考虑以下代码:

"""MyScript.py"""
if __name__ == "__main__":
    defmyFunc(s):
       words = s.split(" ")
       return len(words)
 
    sc =SparkContext(...)
   sc.textFile("file.txt").map(myFunc)

注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象(singleton object)),在传递方法的时候,应该同时传递包含该方法的对象。比如,考虑:

class MyClass(object):
    deffunc(self, s):
       return s
    defdoStuff(self, rdd):
       return rdd.map(self.func)

这里,如果我们创建了一个类实例new MyClass,并且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func1方法,所以需要将整个对象传递到集群中。

类似地,访问外部对象的字段时将引用整个对象:

class MyClass(object):
    def__init__(self):
       self.field = "Hello"
    defdoStuff(self, rdd):
       return rdd.map(lambda s: self.field + x)

为了避免这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:

def doStuff(self, rdd):
    field= self.field
   return rdd.map(lambda s: field + x)

 

理解闭包

关于Spark的一个更困难的问题是理解当在一个集群上执行代码的时候,变量和方法的范围以及生命周期。修改范围之外变量的RDD操作经常是造成混乱的源头。在下面的例子中我们看一下使用foreach()来增加一个计数器的代码,不过同样的问题也可能有其他的操作引起。

例子

考虑下面的单纯的RDD元素求和,根据是否运行在一个虚拟机上,它们的行为完全不同。一个平常的例子是在local模式(--master=local[n])下运行Spark对比将Spark程序部署到一个集群上(例如通过spark-submit提交到YARN)。

Scala

var counter = 0
var rdd = sc.parallelize(data)
 
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
 
println("Counter value: " + counter)

Java

int counter = 0;
JavaRDD<Integer> rdd =sc.parallelize(data);
 
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
 
println("Counter value: " + counter);

Python

counter = 0
rdd = sc.parallelize(data)
 
# Wrong: Don't do this!!
rdd.foreach(lambda x: counter += x)
 
print("Counter value: " + counter)

 

本地模式VS集群模式

     主要的挑战是,上述代码的行为是未定义的。在使用单个JVM的本地模式中,上面的代码会在RDD中计算值的总和并把它存储到计数器中。这是因为RDD和计数器变量在驱动节点的同一个内存空间中。

然而,在集群模式下,发生的事情更为复杂,上面的代码可能不会按照目的工作。要执行作业,SparkRDD操作分成任务——每个任务由一个执行器操作。在执行前,Spark计算闭包。闭包是指执行器要在RDD上进行计算时必须对执行节点可见的那些变量和方法(在这里是foreach())。这个闭包被序列化并发送到每一个执行器。在local模式下,只有一个执行器因此所有东西都分享同一个闭包。然而在其他的模式中,就不是这个情况了,运行在不同工作节点上的执行器有它们自己的闭包的一份拷贝。

这里发生的事情是闭包中的变量被发送到每个执行器都是被拷贝的,因此,当计数器在foreach函数中引用时,它不再是驱动节点上的那个计数器了。在驱动节点的内存中仍然有一个计数器,但它对执行器来说不再是可见的了!执行器只能看到序列化闭包中的拷贝。因此,计数器最终的值仍然是0,因为所有在计数器上的操作都是引用的序列化闭包中的值。

在这种情况下要确保一个良好定义的行为,应该使用累加器。Spark中的累加器是一个专门用来在执行被分散到一个集群中的各个工作节点上的情况下安全更新变量的机制。本指南中的累加器部分会做详细讨论。

一般来说,闭包-构造像循环或者本地定义的方法,不应该用来改变一些全局状态。Spark没有定义或者是保证改变在闭包之外引用的对象的行为。一些这样做的代码可能会在local模式下起作用,但那仅仅是个偶然,这样的代码在分布式模式下是不会按照期望工作的。如果需要一些全局的参数,可以使用累加器。

 

打印RDD中的元素

另一个常见的用法是使用rdd.foreach(println)方法或者rdd.map(println)方法试图打印出RDD中的元素。在一台单一的机器上,这样会产生期望的输出并打印出RDD中的元素。然而,在集群模式中,被执行器调用输出到stdout的输出现在被写到了执行器的stdout,并不是在驱动上的这一个,因此驱动上的stdout不会显示这些信息!要在驱动上打印所有的元素,可以使用collect()方法首先把RDD取回到驱动节点如:rdd.collect().foreach(println)。然而,这可能导致驱动内存溢出,因为collect()将整个RDD拿到了单台机器上;如果你只需要打印很少几个RDD的元素,一个更安全的方法是使用take()方法:rdd.take(100).foreach(println)

 

 

键值对的使用

Scala

虽然,在包含任意类型的对象的RDDs中,可以使用大部分的Spark操作,但也有一些特殊的操作只能在键值对的RDDs上使用。最常见的一个就是分布式的洗牌(shuffle)操作,诸如基于key值对元素进行分组或聚合的操作。

Scala中,包含二元组(Tuple2)对象(可以通过简单地(a,b)代码,来构建内置于语言中的元组的RDDs支持这些操作),只要你在程序中导入了org.apache.spark.SparkContext._,就能进行隐式转换。PairRDDFunction类支持键值对的操作,如果你导入了隐式转换,该类型就能自动地对元组RDD的元素进行转换。

比如,下列代码在键值对上使用了reduceByKey操作,来计算在一个文件中每行文本出现的总次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a +b)

我们也可以使用counts.sortByKey(),比如,将键值对以字典序进行排序。最后使用counts.collect()转换成对象的数组形式,返回给驱动程序。

注意:在键值对操作中,如果使用了自定义对象作为建,你必须确保该对象实现了自定义的equals()和对应的hashCode()方法。更多详情请查看Object.hashCode()文档大纲中列出的规定。

 

Java

虽然,在包含任意类型的对象的RDDs中,可以使用大部分的Spark操作,但也有一些特殊的操作只能在键值对的RDDs上使用。最常见的一个就是分布式的洗牌(shuffle)操作,诸如基于key值对元素进行分组或聚合的操作。

java中,可以使用Scala标准库中的scala.Tuple2类来表示键值对,你可以简单地调用new Tuple2(a,b)来创建一个元组,然后使用tuple._1()tuple._2()方法来访问元组的字段。

使用JavaPairRDD来表示键值对RDDs。你可以使用指定版本的map操作,从JavaRDDs构建JavaPairRDDs,比如mapToPairflatMapToPairJavaPairRDD支持标准的RDD函数,也支持特殊的键值函数。

例如,下面的代码在键值(key-value)对上使用 reduceByKey操作来计算在一个文件中每行文本出现的总次数:

JavaRDD<String> lines =sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs =lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts =pairs.reduceByKey((a, b) -> a + b);

我们也可以使用 counts.sortByKey()例如,将键值对以字典序(alphabetically)进行排序。后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driverprogram)

注意:在键值(key-value)对操作中,如果使用了自定义对象作为键,你必须确保该对象实现了自定义的 equals()和对应的 hashCode()方法。更多详情请查看 Object.hashCode() documentation文档大纲中列出的规定。

 

Python

虽然在包含任意类型的对象的 RDDs中,可以使用大部分的 Spark操作,但也有一

些特殊的操作只能在键值(key-value)对的 RDDs上使用。最常见的一个就是分布式的洗牌("shuffle")操作,诸如基于 key值对元素进行分组或聚合的操作。

Python中, RDDs支持的操作包含 Python内置的元组(tuples)操作,比如 (1, 2)。你可以简单地创建这样的元组,然后调用期望的操作。

例如,下面的代码在键值(key-value)对上使用 reduceByKey操作来计算在一个文件中每行文本出现的总次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们也可以使用 counts.sortByKey()例如,按照字典序(alphabetically)排序键值对。最后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driver program)

 

转换

下表中列出了 Spark支持的一些常见的转换 (Transformations)。详情请参考 RDDAPI文档 (Scala, Java, Python) pair RDD函数文档 (Scala, Java)

Transformation

Meaning

map(func)

返回一个新分布式数据集,由每一个输入元素经过 func函数转换后组成。

filter(func)

返回一个新数据集,由经过 func函数计算后返回值为 true的输入元素组成。

flatMap(func)

类似于 map,但是每一个输入元素可以被映射为 0或多个输出元素(因此 func应该返回一个序列(Seq),而不是单一元素)

mapPartitions(func)

类似于 map,但独立地在 RDD的每一个分区(partition,对应块(block))上运行,当在类型为 T RDD上运行时, func的函数类型必须是Iterator<T> => Iterator<U>

mapPartitionsWithIndex(func)

类似于 mapPartitions func带有一个整数参数表示分区(partition)的索引值。当在类型为 T RDD上运行时, func的函数类型必须是(Int, Iterator<T>) => Iterator<U>

sample(withReplacementfractionseed)

根据 fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换, seed用于指定随机数生成器种子。

union(otherDataset)

返回一个新的数据集,新数据集由源数据集和参数数据集的元素联合(union)而成。

intersection(otherDataset)

返回一个新的数据集,新数据集由源数据集和参数数据集的元素的交集(intersection)组成。

distinct([numTasks]))

返回一个新的数据集,新数据集由源数据集过滤掉多余的重复元素只保留一个而成。

groupByKey([numTasks])

在一个 (K, V)对的数据集上调用,返回一个 (K, Iterable<V>)对的数据集。

注意:如果你想在每个key上分组执行聚合(如总和或平均值)操作,使用 reduceByKey combineByKey会产生更好的性能。

注意:默认情况下,输出的并行数依赖于父 RDD(parent RDD)的分区数(number of partitions)你可以通过传递可选的第二个参数 numTasks来设置不同的任务数。

reduceByKey(func, [numTasks])

在一个 (K, V)对的数据集上调用时,返回一个 (K, V)对的数据集,使用指定的 reduce函数 func将相同 key的值聚合到一起,该函数的类型必须是 (V,V) => V。类似 groupByKey reduce的任务个数是可以通过第二个可选参数来配置的。

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

在一个 (K, V)对的数据集上调用时,返回一个 (K, U)对的数据集,对每个键的值使用给定的组合函数(combine functions)和一个中性的“零”值进行聚合。允许聚合后的值类型不同于输入的值类型,从而避免了不必要的内存分配。如同 groupByKey可以通过设置第二个可选参数来配置 reduce任务的个数。

sortByKey([ascending], [numTasks])

在一个 (K, V)对的数据集上调用,其中, K必须实现Ordered,返回一个

按照 Key进行排序的 (K, V)对数据集,升序或降序由布尔参数 ascending决定。

join(otherDataset, [numTasks])

在类型为 (K, V) (K, W)类型的数据集上调用时,返回一个相同 key

对应的所有元素对在一起的 (K, (V, W))对的数据集。也支持外联(Outer

joins)通过使用 leftOuterJoin rightOuterJoin.

cogroup(otherDataset, [numTasks])

在类型为 (K, V) (K, W)的数据集上调用,返回一个 (K, Iterable<V>,

Iterable<W>)元组(tuples)的数据集。这个操作也可以称之为 groupWith

cartesian(otherDataset)

笛卡尔积,在类型为 T U类型的数据集上调用时,返回一个 (T, U)对的数据集(所有元素交互进行笛卡尔积)

pipe(command[envVars])

以管道(Pipe)方式将 RDD的各个分区(partition)传递到 shell命令,比如一个 Perl bash脚本中。 RDD的元素会被写入进程的标准输入(stdin),并且将作为字符串 RDD(RDD of strings),在进程的标准输出(stdout)上输出一行行数据。

coalesce(numPartitions)

RDD的分区数降低到指定的 numPartitions过滤掉一个大数据集

之后再执行操作会更加有效。

repartition(numPartitions)

随机地对 RDD的数据重新洗牌(Reshuffle)以便创建更多或更少的分区,对它们进行平衡。总是对网络上的所有数据进行洗牌(shuffles)

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器对RDD进行重新分区,在每个结果分区中,将记录按照key值进行排序。这在每个分区中比先调用repartition再排序效率更高,因为它可以推动排序到分牌机器上。

 

动作

下表中列出了 Spark支持的一些常见的动作 (actions)。详情请参考 RDD API文档

(Scala,Java, Python) pair RDD函数文档(Scala, Java)

Action

Meaning

reduce(func)

通过函数 func (接受两个参数,返回一个参数),聚集数据集中的所有元素。该函数应该是可交换和可结合的,以便它可以正确地并行计算。

collect()

在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作,并返回一个足够小的数据子集后再使用会比较有用

count()

返回数据集的元素的个数。

first()

返回数据集的第一个元素。 (类似于 take(1)).

take(n)

返回一个由数据集的前 n个元素组成的数组。注意,这个操作目前不能并行执行,而是由驱动程序(driver program)计算所有的元素。

takeSample(withReplacement,num, [seed])

返回一个数组,由数据集中随机采样的 num个元素组成,可以选择是否用随机数替换不足的部分,可以指定可选参数seed,预先指定一个随机数生成器的种子。

takeOrdered(n[ordering])

返回一个由数据集的前 n个元素,并使用自然顺序或定制顺序对这些元素进行排序。

saveAsTextFile(path)

将数据集的元素,以 text file ( text file的集合)的形式,保存到本地文件系统的指定目录, Spark会对每个元素调用 toString方法,然后转换为文件中的文本行。

saveAsSequenceFile(path
(Java and Scala)

将数据集的元素,以 Hadoop sequencefile的格式,保存到各种文件系统的指定路径下,包括本地系统, HDFS或者任何其它 hadoop支持的文件系统。该方法只能用于键值(key-value)对的 RDDs,或者实现了 HadoopWritable接口的情况下。在 Scala中,也可以用于支持隐式转换为 Writable的类型。 (Spark包括了基本类型的转换,例如 Int Double String,等等)

saveAsObjectFile(path
(Java and Scala)

以简单地 Java序列化方式将数据集的元素写入指定的路径,对应的可以用 SparkContext.objectFile()加载该文件。

countByKey()

只对 (K,V)类型的 RDD有效。返回一个 (K Int)对的 hashmap,其中 (K,Int)对表示每一个 key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行 func函数。这通常用于副作用(sideeffects),例如更新一个累加器变量(accumulator variable)(参见下文),或者和外部存储系统进行交互.

 

洗牌操作

Spark触发一个事件后进行的一些操作成为洗牌。洗牌是Spark重新分配数据的机制,这样它就可以跨分区分组。这通常涉及在执行器和机器之间复制数据,这就使得洗牌是一个复杂和高代价的操作。

 

背景

为了理解在洗牌的时候发生了什么,我们可以考虑reduceByKey操作的例子。reduceByKey操作产生了一个新的RDD,在这个RDD中,所有的单个的值被组合成了一个元组,key和执行一个reduce函数后的结果中与这个key有关的所有值。面临的挑战是一个key的所有的值并不都是在同一个分区上的,甚至不是一台机器上的,但是他们必须是可连接的以计算结果。

Spark中,数据一般是不会跨分区分布的,除非是在一个特殊的地方为了某种特定的目的。在计算过程中,单个任务将在单个分区上操作——因此,为了组织所有数据执行单个reduceByKey中的reduce任务,Spark需要执行一个all-to-all操作。它必须读取所有分区,找到所有key的值,并跨分区把这些值放到一起来计算每个key的最终结果——这就叫做洗牌。

尽管在每个分区中新洗牌的元素集合是确定性的,分区本身的顺序也同样如此,这些元素的顺序就不一定是了。如果期望在洗牌后获得可预测的有序的数据,可以使用:

mapPartitions 来排序每个分区,例如使用.sorted

repartitionAndSortWithinPartitions 在重新分区的同时有效地将分区排序

sortBy来创建一个全局排序的RDD

可以引起洗牌的操作有重分区例如repartitioncoalesce,‘ByKey操作(除了计数)像groupByKeyreduceByKey,还有join操作例如cogroupjoin

 

性能影响

Shuffle是一个代价高昂的操作,因为它调用磁盘I/O,数据序列化和网络I/O。要组织shuffle的数据,Spark生成一个任务集合——map任务来组织数据,并使用一组reduce任务集合来聚合它。它的命名来自与MapReduce,但并不直接和Sparkmapreduce操作相关。

在内部,单个的map任务的结果被保存在内存中,直到他们在内存中存不下为止。然后,他们基于目标分区进行排序,并写入到一个单个的文件中。在reduce这边,任务读取相关的已经排序的块。

某些shuffle操作会消耗大量的堆内存,因为他们用在内存中的数据结构在转换操作之前和之后都要对数据进行组织。特别的,reduceByKeyaggregateByKeymap侧创建这些结构,‘ByKey操作在reduce侧生成这些结构。当数据在内存中存不下时,Spark会将他们存储到磁盘,造成额外的磁盘开销和增加垃圾收集。

Shuffle也会在磁盘上产生大量的中间文件。在Spark1.3中,这些文件直到Spark停止运行时才会从Spark的临时存储中清理掉,这意味着长时间运行Spark作业会消耗可观的磁盘空间。这些做了之后如果lineage重新计算了,那shuffle不需要重新计算了。在配置Spark上下文时,临时存储目录由spark.local.dir配置参数指定。

Shuffle的行为可以通过调整各种配置参数来调整。请看Spark配置指南中的Shuffle Behavior部分。

 

RDD持久化

Spark最重要的一个功能,就是在不同操作间,将一个数据集持久化(persisting) (缓存(caching))到内存中。当你持久化(persist)一个 RDD,每一个节点都会把它计算的所有分区(partitions)存储在内存中,并在对数据集 (或者衍生出的数据集)执行其他动作(actioins)时重用。这将使得后续动作(actions)的执行变得更加迅速(通常快 10 )缓存(Caching)是用 Spark 构建迭代算法和快速地交互使用的关键。

你可以使用 persist() cache()方法来持久化一个 RDD。在首次被一个动作(action)触发计算后,它将会被保存到节点的内存中。 Spark 的缓存是带有容错机制的,如果 RDD丢失任何一个分区的话,会自动地用原先构建它的转换(transformations)操作来重新进行计算。

此外,每一个被持久化的 RDD都可以用不同的存储级别(storage level)进行存储,比如,允许你持久化数据集到硬盘,以序列化的 Java对象(节省空间)存储到内存,跨节点复制,或者以off-heap的方式存储在 Tachyon。这些级别的选择,是通过将一个 StorageLevel对象 (Scala Java, Python)传递到 persist()方法中进行设置的。 cache()方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY (将反序列化 (deserialized)的对象存入内存)。完整的可选存储级别如下:

Storage Level

Meaning

MEMORY_ONLY

RDD以反序列化(deserialized)Java对象存储到 JVM。如果 RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是默认的级别。

MEMORY_AND_DISK

RDD以反序列化(deserialized) Java对象存储到 JVM。如果 RDD不能被内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取。

MEMORY_ONLY_SER

RDD序列化(serialized) Java对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化(deserialized)的空间利用率更高,尤其当使用快速序列化器(fast serializer),但在读取时会比较耗 CPU

MEMORY_AND_DISK_SER

类似于 MEMORY_ONLY_SER,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算。

DISK_ONLY

只将 RDD分区存储在硬盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

与上述的存储级别一样,但是将每一个分区都复制到两个集群节点上。

OFF_HEAP (experimental)

(serialized format) RDD TachyonMEMORY_ONLY_SER OFF_HEAP 降低了垃圾收集(garbage collection)的开销,并使 executors变得更小而且共享内存池,这在大堆(heaps)和多应用并行的环境下是非常吸引人的。而且,由于 RDDs驻留于 Tachyon中, executor的崩溃不会导致内存中的缓存丢失。在这种模式下, Tachyon中的内存是可丢弃的。因此, Tachyon不会尝试重建一个在内存中被清除的分块。

注意: Python中,存储对象时总是使用 Pickle库来序列化(serialized),而不管你是否选择了一个序列化的级别。Spark也会自动地持久化一些洗牌(shuffle)操作(比如,reduceByKey )的中间数据,即使用户没有调用 persist。这么做是为了避免在一个节点上的洗牌(shuffle)过程失败时,重新计算整个输入。我们仍然建议用户在结果 RDD 上调用 persist如果希望重用它的话。

 

如何选择存储级别?

Spark 的存储级别旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下方法进行选择:

如果你的 RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是 CPU使用效率最高的选项,它使得 RDDs的操作尽可能的快。

如果不行,试着使用 MEMORY_ONLY_SER并且选择一个快速序列化库使对象在有比较高的空间使用率(space-efficient)的情况下,依然可以较快被访问。

尽可能不要存储到硬盘上,除非计算数据集的函数的计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,可能和从硬盘中读取差不多快。

如果你想有快速的故障恢复能力,使用复制存储级别(例如:用 Spark来响应 web应用的请求)。所有的存储级别都有通过重新计算丢失的数据来恢复错误的容错机制,但是复制的存储级别可以让你在 RDD 上持续地运行任务,而不需要等待丢失的分区被重新计算。

在大量的内存或多个应用程序的环境下,试验性的 OFF_HEAP模式具有以下几个优点:

o 允许多个 executors共享 Tachyon中相同的内存池。

o 极大地降低了垃圾收集器(garbage collection)的开销。

o 即使个别的 executors崩溃了,缓存的数据也不会丢失。

 

移除数据

Spark 会自动监控各个节点上的缓存使用情况,并使用最近最少使用算法(least-recently-used (LRU))删除老的数据分区。如果你想手动移除一个 RDD,而不是等它自动从缓存中清除,可以使用 RDD.unpersist()方法。

 

共享变量

一般来说,当一个函数被传递给一个在远程集群节点上运行的 Spark操作(例如 map reduce) 时,它操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器,而且在远程机器上对变量的所有更新都不会被传播回驱动程序。通常看来,读-写任务间的共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量(broadcast variables)和累加器(accumulators)

 

广播变量

广播变量允许程序员保留一个只读的变量,缓存在每一台机器上,而不是每个任务保存一份拷贝。它们可以这样被使用,例如,以一种高效的方式给每个节点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的代价。

Spark动作的执行是通过一个阶段的集合,通过分布式的Shuffle操作分离。Spark自动广播在每个阶段里任务需要的共同数据。以这种方式广播的数据以序列化的形式缓存并在运行每个任务之前进行反序列化。这意味着显式地创建广播变量只在当多个阶段之间需要相同的数据或者是当用反序列化的形式缓存数据特别重要的时候。

广播变量是通过调用 SparkContext.broadcast(v)方法从变量 v创建的。广播变量是一个 v的封装器,它的值可以通过调用 value方法获得。如下代码展示了这个:

Scala

scala>val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
 
scala>broadcastVar.value
res0:Array[Int] = Array(1, 2, 3)

 

Java

Broadcast<int[]>broadcastVar = sc.broadcast(new int[] {1, 2, 3});
 
broadcastVar.value();
// returns[1, 2, 3]

Python

>>>broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcastobject at 0x102789f10>
 
>>>broadcastVar.value
[1, 2, 3]

在广播变量被创建后,它应该在集群运行的任何函数中,代替 v值被调用,从而 v值不需要被再次传递到这些节点上。另外,对象 v不能在广播后修改,这样可以保证所有节点具有相同的广播变量的值(比如,后续如果变量被传递到一个新的节点)

 

累加器

累加器是一种只能通过具有结合性的操作(associative operation)进行“(added)”的变量,因此可以高效地支持并行。它们可以用来实现计数器( MapReduce )和求和器。 Spark原生就支持数值类型的累加器,开发者可以自己添加新的支持类型。如果创建了一个命名的累加器(accumulators)这些累加器将会显示在 Spark UI 界面上。这对于了解当前运行阶段(stages)的进展情况是非常有用的(注意:这在 Python 中尚未支持)

一个累加器可以通过调用 SparkContext.accumulator(v)方法从一个初始值 v中创建。运行在集群上的任务,可以通过使用 add方法或 +=操作符( Scala Python)来给它加值。然而,它们不能读取这个值。只有驱动程序可以使用 value方法来读取累加器的值。

以下代码展示了如何利用一个累加器,将一个数组里面的所有元素相加:

Scala

scala> val accum = sc.accumulator(0,"My Accumulator")
accum: spark.Accumulator[Int] = 0
 
scala> sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s
 
scala> accum.value
res2: Int = 10

 

Java

Accumulator<Integer> accum =sc.accumulator(0);
 
sc.parallelize(Arrays.asList(1, 2, 3,4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s
 
accum.value();
// returns 10

 

Python

>>> accum = sc.accumulator(0)
Accumulator<id=0, value=0>
 
>>> sc.parallelize([1, 2, 3,4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s
 
scala> accum.value
10

 

虽然代码可以使用内置支持的 Int类型的累加器,但程序员也可以通过子类化(subclassing) AccumulatorParam来创建自己的类型。 AccumulatorParam接口有两个方法: zero,为你的数据类型提供了一个“零值(zero value)”,以及 addInPlace提供了两个值相加的方法。比如,假设我们有一个表示数学上向量的 Vector类,我们可以这么写:

Scala

object VectorAccumulatorParam extendsAccumulatorParam[Vector] {
  defzero(initialValue: Vector): Vector = {
   Vector.zeros(initialValue.size)
  }
  defaddInPlace(v1: Vector, v2: Vector): Vector = {
    v1 +=v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(newVector(...))(VectorAccumulatorParam)

Scala中, Spark也支持更通用的 Accumulable接口去累加数据,其结果类型和累加的元素不同(比如,构建一个包含所有元素的列表),并且SparkContext.accumulableCollection方法可以累加普通的 Scala集合(collection)类型。

 

Java

class VectorAccumulatorParam implementsAccumulatorParam<Vector> {
  publicVector zero(Vector initialValue) {
   return Vector.zeros(initialValue.size());
  }
  publicVector addInPlace(Vector v1, Vector v2) {
   v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum =sc.accumulator(new Vector(...), new VectorAccumulatorParam());

Java中, Spark也支持更通用的 Accumulable接口去累加数据,其结果类型和累加的元素不同(比如,构建一个包含所有元素的列表)

 

Python

class VectorAccumulatorParam(AccumulatorParam):
    defzero(self, initialValue):
       return Vector.zeros(initialValue.size)
 
    defaddInPlace(self, v1, v2):
       v1 += v2
       return v1
 
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...),VectorAccumulatorParam())

 

因为累加器的更新只在action中执行,Spark确保每个任务对累加器的更新都只会被应用一次,例如,重启任务将不会更新这个值。在转换中,用户应该清楚如果任务或者作业阶段是重复运行的,每个任务的更新可能会应用不止一次。

累加器不会改变Spark的懒惰评价模型。如果它们在一个RDD的操作中正在被更新,他们的值只会被更新一次,RDD作为动作的一部分被计算。因此,累加器更新当在执行一个懒惰转换,例如map()时,并不保证被执行。下面的代码段演示了这个属性:

Scala

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actionshave caused the `map` to be computed.

 

Java

Accumulator<Integer> accum =sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x);});
// Here, accum is still 0 because no actionshave caused the `map` to be computed.

 

Python

accum = sc.accumulator(0)
def g(x):
 accum.add(x)
  returnf(x)
data.map(g)
# Here, accum is still 0 because no actionshave caused the `map` to be computed.

 

把代码部署到集群上

应用程序提交指南(application submission guide)描述了如何将应用程序提交到一个集群,简单地说,一旦你将你的应用程序打包成一个 JAR(对于 Java/Scala)或者一组的 .py .zip文件 (对于 Python) bin/spark-submit 脚本可以让你将它提交到支持的任何集群管理器中。

 

Java/Scala中启动Spark作业

Org.apache.spark.launcher包中提供了相关类来启动Spark作业作为子线程的简单的Java API

 

单元测试

Spark 对单元测试非常友好,可以使用任何流行的单元测试框架。在你的测试中简单地创建一个 SparkContext并将 master URL设置成 local,运行你的各种操作,然后调用 SparkContext.stop()结束测试。确保在 finally块或测试框架的 tearDown方法中调用 context stop方法,因为 Spark不支持在一个程序中同时运行两个contexts

 

Spark1.0之前版本的迁移

Scala

Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,现在,其中的 API,除了标识为试验性(experimental)开发者的(developer) API”的,在将来的版本中都会被支持。 Scala用户而言,唯一的改变在于组操作(grouping operations)比如, groupByKey cogroup join,其返回值已经从 (Key, Seq[Value])对修改为 (Key,Iterable[Value])

迁移指南也可以从 Spark Streaming MLlib GraphX获取。

 

Java

Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,现在,其中的 API,只要不是标识为试验性(experimental)开发者的(developer) API”的,在将来的版本中都会被支持。其中对 Java API做了一些修改:

•对于 org.apache.spark.api.java.function中的类函数(Function classes),在 1.0版本中变成了接口,这意味着旧的代码中 extends Function应该需要为 implement Function

•增加了 map转换(transformations)的新变体,如 mapToPair mapToDouble用于创建指定数据类型的 RDDs

•组操作(grouping operations) groupByKey cogroup join的返回值也被修改了,从原先返回 (Key, List<Value>)对改为 (Key,Iterable<Value>)

迁移指南也可以从 Spark Streaming MLlib GraphX获取。

 

Python

Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,现在,其中的 API,只要不是

标识为试验性(experimental)开发者的(developer) API”的,在将来的版本中

都会被支持。 Python用户而言,唯一的修改在于分组操作(grouping operations),比

groupByKeycogroupjoin其返回值从 (key, list of values)对修改为 (key,

iterableof values)

迁移指南也可以从 Spark Streaming MLlib GraphX获取。

 

下一步

你可以在 Spark的网站上看到 spark程序的样例。另外,Spark examples目录 (Scala, Java, PythonR)中也包含了一些样例。你可以通过将类名传递给 spark bin/run-example脚本来运行 Java Scala的样例,例如:

<span lang="EN-US" style="font-size: 10.5pt; font-family: Helvetica, sans-serif; color: rgb(87, 87, 87); background-image: initial; background-attachment: initial; background-size: initial; background-origin: initial; background-clip: initial; background-position: initial; background-repeat: initial;">./bin/run-example SparkPi</span>

对于 Python样例,要使用 spark-submit

./bin/spark-submitexamples/src/main/python/pi.py

对于R样例,使用spark-submit

./bin/spark-submitexamples/src/main/r/dataframe.R

为了帮助优化你的程序,在配置(configuration)调优(tuning)的指南上提供了最佳实践信息。它们在确保将你的数据用一个有效的格式存储在内存上,是非常重要的。对于部署的帮助信息,可以查看集群模式概述(cluster mode overview)描述了分布式操作以及支持集群管理器所涉及的组件。

最后,完整的 API文档可以查看 Scala, JavaPythonR

相关内容