Spark下的FP-Growth和Apriori(频繁项集挖掘并行化算法),fp-growthapriori


     频繁项集挖掘是一个关联式规则挖掘问题。关联挖掘是数据挖掘中研究最早也是最活跃的领域,其中频繁模式的挖掘是关联挖掘的核心和基础,是产生关联规则挖掘的基础。频繁项集最经典的应用就是超市的购物篮分析。

首先要理解频繁项集中的以下概念。

       频繁项:在多个集合中,频繁出现的元素项。

       频繁项集:在一系列集合中每项都含有某些相同的元素,这些元素形成一个子集,满足一定阀值就是频繁项集。

       K项集:K个频繁项组成的一个集合。

       支持度:包含频繁项集(F)的集合的数目。

       可信度:频繁项与某项的并集的支持度与频繁项集支持度的比值。

      简单来说。频繁项集的挖掘就是将数据集(一般是多行数据,每行数据的第一个元素的交易编号,后面的是物品编号)中出现频率超过支持度的频繁项找出来,而首先找出的单个频繁项的集合就是1-频繁项集。而2-频繁项就是某两个频繁项都同时出现在一行中并且出现频率超过支持度的,那么2-频繁项集就是这些2-频繁项的集合,依次类推,K-频繁项集就是K-频繁项的集合。

      目前针对频繁项集的算法,主要有Apriori,FP-Growth和Eclat算法。

Aporiori

       首先来了解一下Apriori算法的思路:Apriori算法需要对数据集进行多步处理。第一步,统计所有含一个元素项目集出现的频数,并找出那些不小于最小支持度的项目集即1-频繁项集,从第二部开始循环处理直到再没有频繁项集生成。循环过程是:第K步中,根据K-1步生成的(K-1)维频繁项集产生K候选项目集,然后对数据及进行搜索,得到候选项目集的项集支持度,与最小支持度进行对比,从而得到K-频繁项集。

FP-Grwoth

       Apriori算法虽然思路很简单,但是该算法有以下缺点:(1)在每一步产生侯选项目集时循环产生的组合过多,没有排除不应该参与组合的元素;(2)每次计算项集的支持度时,都对数据集中的全部记录进行了一遍扫描比较,

由于本题中数据集较大,这样扫描下来会大大增加计算机的IO开销。这种代价会让计算机奔溃。

针对Apriori算法的性能瓶颈——需要产生大量候选集和需要重复扫描数据集,FP-growth算法应用而生。该算法只进行2次数据集扫描而且不使用候选集,直接压缩数据集成一个频繁模式树(FP树),最后通过这个FP树生成频繁项集。对于本题较大的数据集,FP-growth算法是个不错的选择。

那么首先需要理解FP树的结构和建树过程。

表1

交易ID

物品ID

过滤后

01

a b c e f h i k

a b c

02

b e g j m o s

e j

03

a e h j s

a e j

04

a c

a c

05

a b c j l p

a b c j

06

b e f m n o

b e

假设最小支持度是40%,那么b e j c的支持度就是50%,a是67%。过滤掉非频繁项集后即如上表中最后一列。并使这些元素按照出现的次序排序。

表2

元素

支持度

a

67%

b

50%

c

50%

e

50%

j

50%

这一步其实就是预处理,减少需要计算的频繁项集的候选集,排序的目的是频繁项集关注的是组合而不是排列,在后面生成树的时候需要避免生成重复不必要的分支。

然后遍历过滤后的候选集和出现的次序,构建FP-tree如下。

图1

   

      发现频繁项集的过程和Apriori一样,也是逐步递增的发现,即先找到1频繁项集,然后再在1频繁项集的基础上找2频繁项集。基于上面的FP-tree树,其实已经找到了1频繁项集,即表2中的所有元素。

在找2频繁项集时,需要先抽取条件模式基(以每个频繁项为结尾的,在FP树中所有的前缀路径)

表3

频繁项

条件模式基

A

{}:4

B

{A}:2

C

{A}:1,{A,B}:2,

E

{A}:1,{}:1,{B}:1

J

{A,B,C}:1,{A,E}:1,{E}:1

 

       然后对于表3中的频繁项集元素,用它的条件模式基建立FP树,再找2频繁项集。

如图2为j的FP树

图2

   

        建树的过程和上面是一样的,然后再得到2频繁项集。依次类推,挖掘K频繁项集只需要在K-1频繁项集上挖掘,重复上面的过程即可。

        从这里就可以发现一旦建立了FP树之后就可以不断递归挖掘K频繁项集,对于Hadoop就会产生多次IO操作,严重影响程序运行效率,而Spark这种弹性式内存计算框架可以试中间输出和结果保存在内存中,不需要重复读写HDFS,所以Spark能更好地适用于数据挖掘需要递归的Map-Reduce算法。

Spark下FP-Growth

        在spark下部署FP-growth算法的主要思路分为五步,涉及三步MR。(流程图见图3)

        第一步:计算F_list,也就是计算所有item的support,这一步可以通过MD得到,实质和WordCount一样。

        第二步:数据分组,将F_list中的条目分成G个组,就形成了一个Group_list,这其中每一个Group都包含一组item的集合。

        第三步:并行执行FP-growth,这步和上面所说的普通FP-growth是一样的,只是需要MR来完成。这一步中mapper完成的主要功能是数据集分区,逐个处理数据分区中的事务,将事务分为item,每个item根据Group_list映射到合适的group中去,然后在reduce中并行执行FP-growth算法。

        第四步:聚合,这一步将各台机器上的结果聚合成最终的结果。这一步也需要MR来完成,将各台机器上的频繁项集聚合在一起,并计算支持度。

 图3


算法实现

          算法实现同样也分为四步(详细注释见源代码):

(1)   计算每个item的支持度。

val f = file.map(line => (line.split(" ") .drop(1) .toList.sortWith(_ < _), 1)

 .reduceByKey(_ + _)

在计算支持度之前先将重复的行合并重复行并删除事务编号(每行第一个为事务编号)。

 

 

计算支持度并保存在内存中。

 

val g = f.flatMap(line => {

      var l = List[(String, Int)]()

      for (i <- line._1) {

        l = (i, line._2) :: l

      }

      l

    }).reduceByKey(_ + _) .sortBy(_._2, false) .cache()


(2)   数据分组

其中g_size是分组的个数。item是数据集中事务和出现次数的键值对。将item中数据分成g_size个组,将相关的数据分成一组,每个group中都包含一组item。

val f_list = item.flatMap(t => {

      var pre = -1

      var i = t._1.length - 1

      var result = List[(Int, (List[Int], Int))]()

      while (i >= 0) {

        if ((t._1(i) - 1) / g_size != pre) {

          pre = (t._1(i) - 1) / g_size

          result = (pre, (t._1.dropRight(t._1.length - i - 1), t._2)) :: result

        }

        i -= 1

      }

      result

    })

      .groupByKey()

      .cache()

(3)   并行化执行FP-Growth

其中fp_growth是一个fp树的函数,其中主要包含建树,前缀处理,单支处理和挖掘频繁项。

val d_result = f_list.flatMap(t => {

   fp_growth(t._2, support_num, t._1 * g_size + 1 to (((t._1 + 1) * g_size) min g_count))

})

(4)   合并结果

d_result中包含了所有数据分组的频繁项集,这一步将所有的频繁项集合并,并将结果转换为要求的数据格式。

val temp_result = d_result.map(t => (t._1.map(a => g_list(a - 1)._1), t._2))

val result = temp_result.map( t => ( listToString(t._1)._2,

listToString(t._1)._1 + ":" + t._2.toFloat/line_num.toFloat ) )

.groupBy(_._1)

  代码

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import scala.collection.mutable.Map
import scala.tools.ant.sabbus.Break

object FP_Growth {

  def main(args: Array[String]) = {
    val support_percent = 0.85
    val pnum = 32
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val file = sc.textFile(args(0))
    val line_num = file.count()
    
    //remove the repeat line
    //line key:transcationID 
    //line value:itemIDs
    val f = file.map(line => (
      line.split(" ")
      .drop(1)
      .toList
      .sortWith(_ < _), 1))
      .reduceByKey(_ + _)
    
    
    //compute support rate of every item
    //line key:itemIDs
    //line value:the number of line appear
    //g's element key:itemID
    //g's element value:support rate of every item
    val g = f.flatMap(line => {
      var l = List[(String, Int)]()
      for (i <- line._1) {
        l = (i, line._2) :: l
      }
      l
    })
      .reduceByKey(_ + _)
      .sortBy(_._2, false) //disascend sort
      .cache() //persist this RDD with the default storage level

    //convert g to a array
    val g_list = g.collect.toArray

    //the number of g_list' item
    var g_count = 0
    //convert g_list to a map
    //g_map's element key : itemID
    //g_map's element value : serial number
    val g_map = Map[String, Int]()

    //number for the itemID
    for (i <- g_list) {
      g_count = g_count + 1
      g_map(i._1) = g_count
    }

    //compute the number of serial items
    //t key:itemIDs
    //t value:the number of line appear
    //item's element key : the serial numbers of item
    //item's element value : the number of item appear
    val item = f.map(t => {
      var l = List[Int]()
      for (i <- t._1)
        l = g_map(i) :: l
      (l.sortWith(_ < _), t._2)
    })

    //======================================
    //compute the min_support rating
    val support_num: Int = item.map(t => (1, t._2))
      .reduceByKey(_ + _)
      .first()._2 * support_percent toInt

    val g_size = (g_count + pnum - 1) / pnum
    //divide items into groups
    //t key : the serial numbers of item
    //t value : the number of item appear
    //f_list's element key : groupID
    //f_list's element value : the prefix items and corresponding number of the group
    val f_list = item.flatMap(t => {
      var pre = -1
      var i = t._1.length - 1
      var result = List[(Int, (List[Int], Int))]()
      while (i >= 0) {
        if ((t._1(i) - 1) / g_size != pre) {
          pre = (t._1(i) - 1) / g_size
          result = (pre, (t._1.dropRight(t._1.length - i - 1), t._2)) :: result
        }
        i -= 1
      }
      result
    })
      .groupByKey()
      .cache()

    //parallelize FP-Growth
    //t key : groupID
    //t value : the prefix items and corresponding number of the group
    //d_result's element key : frequent itemset
    //d_result's element value : the number of frequent itemset appear
    val d_result = f_list.flatMap(t => {
      fp_growth(t._2, support_num, t._1 * g_size + 1 to (((t._1 + 1) * g_size) min g_count))
    })

    //save result into required format  
    val temp_result = d_result.map(t => (t._1.map(a => g_list(a - 1)._1), t._2))
    val result = temp_result.map( t => ( listToString(t._1)._2, listToString(t._1)._1 + ":" + t._2.toFloat/line_num.toFloat ) ).groupBy(_._1)
    result.map(t => t._2.map(s => s._2)).saveAsTextFile(args(1))

    sc.stop()
  }
  
  //convert list to string
  def listToString(l: List[String]): (String, Int) = {
    var str = ""
    var count = 0
    for (i <- l) {
      str += i + ","
      count += 1
    }
    str = "\r\n" + str.substring(0, str.size - 1)
    return (str, count)
  }
  /**
  /* fp-tree' growth  
  /* contains make tree,prefix cut,deal with target,deal with single branche and mining frequent itemset
  **/															
  def fp_growth(v: Iterable[(List[Int], Int)], min_support: Int, target: Iterable[Int] = null): List[(List[Int], Int)] = {
    val root = new tree(null, null, 0)
    val tab = Map[Int, tree]()
    val tabc = Map[Int, Int]()
    //make tree
    for (i <- v) {
      var cur = root;
      var s: tree = null
      var list = i._1
      while (!list.isEmpty) {
        if (!tab.exists(_._1 == list(0))) {
          tab(list(0)) = null
        }
        if (!cur.son.exists(_._1 == list(0))) {
          s = new tree(cur, tab(list(0)), list(0))
          tab(list(0)) = s
          cur.son(list(0)) = s
        } else {
          s = cur.son(list(0))
        }
        s.support += i._2
        cur = s
        list = list.drop(1)

      }
    }
    //prefix cut
    for (i <- tab.keys) {
      var count = 0
      var cur = tab(i)
      while (cur != null) {
        count += cur.support
        cur = cur.Gnext
      }
      //modify
      tabc(i) = count
      if (count < min_support) {
        var cur = tab(i)
        while (cur != null) {
          var s = cur.Gnext
          cur.Gparent.son.remove(cur.Gv)
          cur = s
        }
        tab.remove(i)
      }
    }
    //deal with target
    var r = List[(List[Int], Int)]()
    var tail: Iterable[Int] = null
    if (target == null)
      tail = tab.keys
    else {
      tail = target.filter(a => tab.exists(b => b._1 == a))
    }
    if (tail.count(t => true) == 0)
      return r
    //deal with the single branch
    var cur = root
    var c = 1
    while (c < 2) {
      c = cur.son.count(t => true)
      if (c == 0) {
        var res = List[(Int, Int)]()
        while (cur != root) {
          res = (cur.Gv, cur.support) :: res
          cur = cur.Gparent
        }

        val part = res.partition(t1 => tail.exists(t2 => t1._1 == t2))
        val p1 = gen(part._1)
        if (part._2.length == 0)
          return p1
        else
          return decare(p1, gen(part._2)) ::: p1
      }
      cur = cur.son.values.head
    }
    //mining the frequent itemset
    for (i <- tail) {
      var result = List[(List[Int], Int)]()
      var cur = tab(i)
      while (cur != null) {
        var item = List[Int]()
        var s = cur.Gparent
        while (s != root) {
          item = s.Gv :: item
          s = s.Gparent
        }
        result = (item, cur.support) :: result
        cur = cur.Gnext
      }
      r = (List(i), tabc(i)) :: fp_growth(result, min_support).map(t => (i :: t._1, t._2)) ::: r

    }
    r
  }

  def gen(tab: List[(Int, Int)]): List[(List[Int], Int)] = {
    if (tab.length == 1) {
      return List((List(tab(0)._1), tab(0)._2))
    }
    val sp = tab(0)
    val t = gen(tab.drop(1))
    return (List(sp._1), sp._2) :: t.map(s => (sp._1 :: s._1, s._2 min sp._2)) ::: t
    //TODO: sp._2 may not be min
  }

  //笛卡尔积
  def decare[T](a: List[(List[T], Int)], b: List[(List[T], Int)]): List[(List[T], Int)] = {
    var res = List[(List[T], Int)]()
    for (i <- a)
      for (j <- b)
        res = (i._1 ::: j._1, i._2 min j._2) :: res
    res
  }
}

class tree(parent: tree, next: tree, v: Int) {
  val son = Map[Int, tree]()
  var support = 0
  def Gparent = parent
  def Gv = v
  def Gnext = next
}


相关内容