【Spark1.3官方翻译】Spark快速入门,spark1.3spark


英文标题:Quick Start

英文原址:http://spark.apache.org/docs/latest/quick-start.html

Spark Version:1.3.0

1,       使用Spark-Shell进行交互式分析

1.1   基本使用

Spark-shell为学习API提供了简单的方式,它是一个非常强大的进行交互式数据分析的工具,在Scala或Python中都有提供。在Spark目录中运行下面的命令:

./bin/spark-shell

Spark的主要抽象是一个分布式数据集合,叫做弹性分布式数据集(RDD),可以通过读取HDFS文件或转换其它RDD来创建RDD,下面示例表示的是通过读取一个HDFS文件来创建RDD:

val textFile = sc.textFile(“README.md”)

RDD有transformation和action两种操作,前者返回一个指向新RDD的指针,后者返回值。下面是一些action操作:

textFile.count()

textFile.first()

下面是一些transformation操作,用filter过滤出文件中的Spark行得到一个新的RDD:

val linesWithSpark = textFile.filter(line => line.contains(“Spark”))

可以把transformation和action串起来:

textFile.filter(line => line.contains(“Spark”)).count()//计算了README.md文件中有多少行包含Spark这个单词

 

1.2   更多RDD操作

可以用transformation和action来做很多更复杂的计算,例如下面的示例是找出所有行中单行的最多单词数

textFile.map(line => line.split(“ “).size).reduce((a,b) => if(a>b) a else b)

这行代码首先将一行映射成一个整数值(单词数)生成一个RDD,在这个RDD上调用reduce找出最大size最大的一个。Map和reduce的参数是Scala中的函数,可以使用其大量的语言特性或Scala/Java库,例如,可以使用其他语言声明的包使得上面的代码更易理解:

import java.lang.Math

textFile.map(line => line.split(“ “).size).reduce((a,b) => Math.max(a,b))

一个通用的数据模型是MapReduce,Spark可以轻松地实现MapReduce:

val wordCounts = textFile.flatMap(line => line.split(“ “)).map(word => (word,1)).reduceByKey((a,b) => a+b)

这里结合了flatMap,map,reduceByKey几个转换计算每一个单词的数以(String,Int)对作为一个RDD,在Shell中合并这些单词数,可以用collect这个action:

wordCounts.collect

 

1.3   缓存(Caching)

Spark支持将数据集放到集群的内存中进行缓存,这对于要重复使用的数据非常管用,例如查询小的热点数据集或者运行类似于PageRank的迭代算法,下面的示例将linesWithSpark数据集进行缓存:

linesWithSpark.cache()

linesWithSpark.count()

linesWithSpark.count()

看上去用Spark缓存一个100行的文件挺愚蠢的,但是有意思的是相同的函数也可以被用在很大的数据上,而且效果一样,即使它们跨上百个节点存储,你可以用bin/spark-shell交互式的连接到一个集群。

2,       独立的应用程序

使用Spark API写一个独立应用程序,下面从一个简单的程序开始(Scala&SBT):

/* SimpleApp.scala */

importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._

importorg.apache.spark.SparkConf

 

objectSimpleApp{

 def main(args:Array[String]){

   val logFile="YOUR_SPARK_HOME/README.md"// Should be some file on your system

   val conf=newSparkConf().setAppName("Simple Application")

   val sc=newSparkContext(conf)

   val logData= sc.textFile(logFile,2).cache()

   val numAs= logData.filter(line=> line.contains("a")).count()

   val numBs= logData.filter(line=> line.contains("b")).count()

    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

 }

}

注意,应用程序必须定义一个main()函数而不是继承scala.App,scala.App的子类可能不会正确的运行(Why?)

这个程序分别计算了README.md文件中包含a和b的行数,在程序中你要用自己的Spark_HOME替换掉YOUR_SPARK_HOME。独立程序不像Spark-Shell启动时会初始化自己的SparkContext,而是需要将初始化SparkContext作为程序的一部分。

给SparkContext构造器传递一个SparkConf对象,这个对象包含了应用程序的相关信息。应用程序依赖于Spark API,所以还要有一个SBT的配置文件,下面的simple.sbt解释了Spark依赖,此依赖还添加了Spark依赖的一些库。

name:="Simple Project"
version:="1.0"
scalaVersion:="2.10.4"
libraryDependencies+="org.apache.spark"%%"spark-core"%"1.3.0"

为了使SBT正常工作,得根据目录结构来布局SimpleApp.scala和simple.sbt,如果位置正确的话,就可以创建包含独立应用程序的Jar包了,然后使用spark-submit脚本运行我们的程序。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp"\
  --master local[4]\
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

转载请注明出处。

 

相关内容

    暂无相关文章