Spark-MapReduce编程-自连接(Scala),mapreducespark


关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接

这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。

object SelfUion {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SelfUnion")
    val sc = new SparkContext(conf)
    val cpFile = sc.textFile("cp.txt")
    //val strs =  Array[String]("a", "b")

    // 1) 生成两张表
    val res = cpFile.flatMap(line => {
      val strs: Array[String] = line.split(" ");
      if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
    })
      // 2) 转化为key-value形式
      .map(line => {
      val kv = line.split(" ")
      (kv(0), kv(1))
    })
      // 3) 列的相等匹配
      .groupByKey()
      // 4) 解析value,得到结果
      .flatMapValues(values => {
      val childList = new ArrayBuffer[String]();
      val parentList = new ArrayBuffer[String]();
      values.foreach(
        name => {
          if (name.startsWith("child_")) childList += name
          else if (name.startsWith("parent_")) parentList += name;
        })
      for (c <- childList; p <- parentList) yield
        c.substring(6) + " " + p.substring(7)
      }).values
      .saveAsTextFile("selfunion_out")

  }
}

cp.txt的内容为:

Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Tom


输出为:

Terry Lucy
Terry Jack
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse

纯Scala实现

另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。

使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.

  def main(args: Array[String]) {
    val list = new ArrayBuffer[String]();
    list+="Tom Lucy"
    list += "Tom Jack"
    list += "Jone Lucy"
    list += "Jone Jack"
    list += "Lucy Mary"
    list += "Lucy Ben"
    list += "Jack Alice"
    list += "Jack Jesse"
    list += "Terry Tom"

    val list2 = list.flatMap(line => {
      val strs: Array[String] = line.split(" ");
      if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
    } )
    //println("list2 : " + list2)
    val mapout = list2.map(line => line.split(" "))
    //println("mapout : " + mapout)
    //for(item <- mapout) print(item(0) + " " + item(1) + " ; " )
    // println()
    val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array
    val res = groupbyout.values.flatMap(values => {
      val childList = new ArrayBuffer[String]();
      val parentList = new ArrayBuffer[String]();
      values.foreach(
        names => {
          if (names(1).startsWith("child_")) childList += names(1)
          else if (names(1).startsWith("parent_")) parentList += names(1);
        })
      for (c <- childList; p <- parentList) yield
         Array[String](c,p)
    })

    for(arr <- res){
      println(arr(0).substring(6) + " " + arr(1).substring(7))
    }

  }

可以看出,MR和函数式编程有很多相似之处

相关内容