Spark on YARN--WordCount、TopK


原文地址:http://blog.csdn.net/cklsoft/article/details/25568621

1、首先利用http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/搭建好的Eclipse(Scala)开发平台编写scala文件,内容如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object HdfsWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
  //  val file = sc.textFile("D:\\test.txt")
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
 //   println(counts)
    counts.saveAsTextFile(args(2)/*"hdfs://master:9101/user/root/out"*/)
  }
}

2、利用Eclipse的Export Jar File功能将Scala源文件编译成class文件并打包成sc.jar

3、执行run_wc.java脚本:

#! /bin/bash
SPARK_HADOOP_VERSION=2.2.0
SPARK_YARN=true
export SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
export EXEC_JAR=$SPARK_HOME/sc.jar
#examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar 


./bin/spark-class org.apache.spark.deploy.yarn.Client \
 --jar $EXEC_JAR \
 --class HdfsWordCount \
 --args  yarn-standalone \
 --args hdfs://master:9101/user/root/spam.data \
 --args hdfs://master:9101/user/root/out2 \
 --num-workers 1 \
 --master-memory 512m \
 --worker-memory 512m \
 --worker-cores 1


附:

TopK(选出出现频率最高的前k个)代码:

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TopK {
  def main(args: Array[String]) {
    //yarn-standalone hdfs://master:9101/user/root/spam.data 5
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    val sorted=counts.map{
      case(key,val0) => (val0,key)
    }.sortByKey(true,1)
    val topK=sorted.top(args(2).toInt)
    topK.foreach(println)
  }
}


附录2 join操作(题意详见:http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/):

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkJoinTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"SparkJoinTest",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val txtFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val rating=txtFile.map(line =>{
    	val fileds=line.split("::")
    	(fileds(1).toInt,fileds(2).toDouble)
    	}
    )//大括号内以最后一个表达式为值
    val movieScores=rating.groupByKey().map(
        data=>{
          val avg=data._2.sum/data._2.size
       //   if (avg>4.0) 
            (data._1,avg)
        }
    )
    
    val moviesFile=sc.textFile(args(2))
    val moviesKey=moviesFile.map(line =>{
      val fileds=line.split("::")
      (fileds(0).toInt,fileds(1))
      }
    ).keyBy(tuple=>tuple._1)//设置健
    
    val res=movieScores.keyBy(tuple=>tuple._1).join(moviesKey)// (<k,v>,<k,w>=><k,<v,w>>)
    .filter(f=>f._2._1._2>4.0)
    .map(f=>(f._1,f._2._1._2,f._2._2._2))
    res.saveAsTextFile(args(3))
  }
}


相关内容

    暂无相关文章