Spark 基于item和user 的协同过滤实现,sparkitem


1、简介

mahout已经提供了 item-based cf 算法,但是要想在 mahout 算法上修改item-based cf 相对来说比较繁琐,比如改进相似度或者改进推荐结果计算公式,更加令人头疼的是mahout 修改比较难调试,相比来说 spark 实现起来更加简单方便,同时 spark shell 在调试阶段特别方便,另一方面 mahout 执行速度比较慢。本文使用 spark 编写了 item-based cf 和 user-based cf 两种协同过滤算法。同时,本文使用的是cosine相似度,并对cosine相似度进行了改进。

2、相似度计算

要想理解协同过滤算法,必须弄清楚各种相似度计算公式,下面介绍相似度计算公式:

2.1 欧几里得相似度

欧几里得相似度根据欧几里得距离计算而来,距离越近相似度越高,反之相反。

欧几里得距离公式
 dX,Y=i=1n(xiyi)2
欧几里得相似度公式
 sim(x,y)=11+d(x,y)

2.2 皮尔逊相似度

皮尔逊相关系数,即概率论中的相关系数,取值范围【-1,+1】。当大于零时,两个变量正相关,当小于零时表示两个向量负相关。

计算公式为
ρX,Y=cov(X,Y)σxσy=E((Xμx)(Yμy))σxσy=E(XY)E(X)E(Y)E(X2)E2(X)E(Y2)E2(Y)\end{equation}

2.3 cosine相似度

根据两个向量的夹角计算相似度,夹角越小相似度越高。

计算公式为
 simX,Y=XY||X||||Y||=ni=1(xiyi)ni=1(xi)2ni=1(yi)2

本文使用改进cosine相似度公式为
 simX,Y=XYnumXY||X||||Y||numXlog10(10+numY)
改进公式考虑到了两个向量相同个体个数、X向量大小、Y向量大小,注意 simX,YsimY,X

2.4 Tanimoto 相似度

Tanimoto相似度也称为Jaccard系数,是Cosine相似度扩展,多用于文档相似度就算。此相似度不考虑评价值,只考虑两个集合共同个体数量。

公式为
sim(x,y)=XY||X||+||Y||||XY||

3、预测用户评分公式

下面公式为根据用户相似度和用户对物品的评分矩阵预测用户对未知物品评分值。

item-based 计算公式为:
predu,p=iratedItems(u)sim(i,p)ru,iiratedItems(u)sim(i,p)

公式中u指用户,p值物品,ratedItems(u)指用户u评价过的物品,sim指相似度(item之间的),r指用户对物品评分。

user-based 计算公式为:
predu,p=iNsim(u,i)ri,piNsim(i,p)

公式中u指用户,p指物品,i指和u具有相似度且对p有评分的用户,sim指用户之间相似度,r值用户对物品评分。

4、spark item-based 协同过滤 代码

package cf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object CollaborativeFilterItemBased_modify
{
  def main(args: Array[String]) {
    /**
     * Parameters to regularize correlation.
     */
//    val PRIOR_COUNT = 10
//    val PRIOR_CORRELATION = 0

    val sparkConf = new SparkConf().setAppName("cf item-based")
    val sc = new SparkContext(sparkConf)

    val hiveql = new HiveContext(sc)
    import hiveql.implicits._

    // extract (userid, itemid, rating) from ratings data
    val oriRatings = sc.textFile("/path/to/file").map(line => {
        val fields = line.split("\t")
        (fields(0).toLong, fields(1).toLong, fields(2).toInt)
    })

    //filter redundant (user,item,rating),this set user favorite (best-loved) 100 item
    val ratings = oriRatings.groupBy(k=>k._1).flatMap(x=>(x._2.toList.sortWith((x,y)=>x._3>y._3).take(100)))


    // get num raters per movie, keyed on item id,,item2manyUser formating as (item,(user,item,rating))
    val item2manyUser = ratings.groupBy(tup => tup._2)
    val numRatersPerItem = item2manyUser.map(grouped => (grouped._1, grouped._2.size))

    // join ratings with num raters on item id,,ratingsWithSize formating as (user,item,rating,numRaters)
    val ratingsWithSize = item2manyUser.join(numRatersPerItem).
      flatMap(joined => {
        joined._2._1.map(f => (f._1, f._2, f._3, joined._2._2))
    })
   // ratingsWithSize now contains the following fields: (user, item, rating, numRaters).

    // dummy copy of ratings for self join ,formating as ()
    val ratings2 = ratingsWithSize.keyBy(tup => tup._1)

    // join on userid and filter item pairs such that we don't double-count and exclude self-pairs

    //***计算半矩阵,减少计算量
    val ratingPairs =ratings2.join(ratings2).filter(f => f._2._1._2 < f._2._2._2)

    // compute raw inputs to similarity metrics for each item pair

    val tempVectorCalcs =
      ratingPairs.map(data => {
        val key = (data._2._1._2, data._2._2._2)
        val stats =
          (data._2._1._3 * data._2._2._3, // rating 1 * rating 2
            data._2._1._3,                // rating item 1
            data._2._2._3,                // rating item 2
            math.pow(data._2._1._3, 2),   // square of rating item 1
            math.pow(data._2._2._3, 2),   // square of rating item 2
            data._2._1._4,                // number of raters item 1
            data._2._2._4)                // number of raters item 2
        (key, stats)
      })
    val vectorCalcs = tempVectorCalcs.groupByKey().map(data => {
        val key = data._1
        val vals = data._2
        val size = vals.size
        val dotProduct = vals.map(f => f._1).sum
        val ratingSum = vals.map(f => f._2).sum
        val rating2Sum = vals.map(f => f._3).sum
        val ratingSq = vals.map(f => f._4).sum
        val rating2Sq = vals.map(f => f._5).sum
        val numRaters = vals.map(f => f._6).max
        val numRaters2 = vals.map(f => f._7).max
        (key, (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2))
      })
      //.filter(x=>x._2._1>1)

    val inverseVectorCalcs = vectorCalcs.map(x=>((x._1._2,x._1._1),(x._2._1,x._2._2,x._2._4,x._2._3,x._2._6,x._2._5,x._2._8,x._2._7)))
    val vectorCalcsTotal = vectorCalcs ++ inverseVectorCalcs

    // compute similarity metrics for each item pair
    // modify formula as : cosSim *size/(numRaters*math.log10(numRaters2+10))
    val tempSimilarities =
      vectorCalcsTotal.map(fields => {
        val key = fields._1
        val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2
        val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq))*size/(numRaters*math.log10(numRaters2+10))
        (key._1,(key._2, cosSim))
      })

     val similarities = tempSimilarities.groupByKey().flatMap(x=>{
       x._2.map(temp=>(x._1,(temp._1,temp._2))).toList.sortWith((a,b)=>a._2._2>b._2._2).take(50)
     })

    val similarTable = similarities.map(x=>(x._1,x._2._1,x._2._2)).toDF()
    hiveql.sql("use DatabaseName")
    similarTable.insertInto("similar_item_test",true)

      // ratingsInverse format (item,(user,raing))
      val ratingsInverse = ratings.map(rating=>(rating._2,(rating._1,rating._3)))

    //  statistics format ((user,item),(sim,sim*rating)),,,, ratingsInverse.join(similarities) fromating as (Item,((user,rating),(item,similar)))
      val statistics = ratingsInverse.join(similarities).map(x=>((x._2._1._1,x._2._2._1),(x._2._2._2,x._2._1._2*x._2._2._2)))

    // predictResult fromat ((user,item),predict)
      val predictResult = statistics.reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,x._2._2/x._2._1))
//      val predictResult = statistics.reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,x._2._2))

      val filterItem = oriRatings.map(x=>((x._1,x._2),Double.NaN))
      val totalScore = predictResult ++ filterItem

//      val temp = totalScore.reduceByKey(_+_)

      val finalResult = totalScore.reduceByKey(_+_).filter(x=> !(x._2 equals(Double.NaN))).
        map(x=>(x._1._1,x._1._2,x._2)).groupBy(x=>x._1).flatMap(x=>(x._2.toList.sortWith((a,b)=>a._3>b._3).take(50)))

//     val aa = finalResult.map(x=>x._1).distinct().count

      val recommendTable = finalResult.toDF()
      hiveql.sql("use DatabaseName")
      recommendTable.insertInto("recommend_item_test",true)
  }

  // *************************
  // * SIMILARITY MEASURES
  // *************************

  /**
   * The correlation between two vectors A, B is
   *   cov(A, B) / (stdDev(A) * stdDev(B))
   *
   * This is equivalent to
   *   [n * dotProduct(A, B) - sum(A) * sum(B)] /
   *     sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
   */
  def correlation(size : Double, dotProduct : Double, ratingSum : Double,
                  rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {

    val numerator = size * dotProduct - ratingSum * rating2Sum
    val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
      scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)

    numerator / denominator
  }

  /**
   * Regularize correlation by adding virtual pseudocounts over a prior:
   *   RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
   * where w = # actualPairs / (# actualPairs + # virtualPairs).
   */
  def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double,
                             rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double,
                             virtualCount : Double, priorCorrelation : Double) = {

    val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
    val w = size / (size + virtualCount)

    w * unregularizedCorrelation + (1 - w) * priorCorrelation
  }

  /**
   * The cosine similarity between two vectors A, B is
   *   dotProduct(A, B) / (norm(A) * norm(B))
   */
  def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
    dotProduct / (ratingNorm * rating2Norm)
  }

  /**
   * The Jaccard Similarity between two sets A, B is
   *   |Intersection(A, B)| / |Union(A, B)|
   */
  def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
    val union = totalUsers1 + totalUsers2 - usersInCommon
    usersInCommon / union
  }
}

5、spark user-based 协同过滤 代码

package cf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object CollaborativeFilterUserBased_modify
{
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("cf user-based")
    val sc = new SparkContext(sparkConf)

    val hiveql = new HiveContext(sc)
    import hiveql.implicits._

    // extract (userid, itemid, rating) from ratings data
    val oriRatings = sc.textFile("/path/to/file").map(line => {
      val fields = line.split("\t")
      (fields(0).toLong, fields(1).toLong, fields(2).toInt)
    })

    //filter redundant (user,item,rating),this set user favorite (best-loved) 100 item
    val ratings = oriRatings.groupBy(k=>k._1).flatMap(x=>(x._2.toList.sortWith((x,y)=>x._3>y._3).take(100)))


    // one user corresponding many item
    val user2manyItem = ratings.groupBy(tup=>tup._1)
    //one user corresponding number of item
    val numPrefPerUser = user2manyItem.map(grouped=>(grouped._1,grouped._2.size))
    //join ratings with user's pref num
    //ratingsWithSize now contains the following fields: (user, item, rating, numPrefs).
    val ratingsWithSize = user2manyItem.join(numPrefPerUser).
      flatMap(joined=>{
        joined._2._1.map(f=>(f._1,f._2,f._3,joined._2._2))
      })
    //(user, item, rating, numPrefs) ==>(item,(user, item, rating, numPrefs))
    val ratings2 = ratingsWithSize.keyBy(tup=>tup._2) 
    //ratingPairs format (t,iterator((u1,t,pref1,numpref1),(u2,t,pref2,numpref2))) and u1<u2
    //this don't double-count and exclude self-pairs
    val ratingPairs = ratings2.join(ratings2).filter(f=>f._2._1._1<f._2._2._1)


    val tempVectorCalcs = ratingPairs.map(data=>{
      val key = (data._2._1._1,data._2._2._1)
      val stats = 
        (data._2._1._3*data._2._2._3,//rating 1 * rating 2
         data._2._1._3, //rating user 1
         data._2._2._3, //rating user 2
         math.pow(data._2._1._3, 2), //square of rating user 1
         math.pow(data._2._2._3,2), //square of rating user 2
         data._2._1._4,  //num prefs of user 1
         data._2._2._4) //num prefs of user 2
        (key,stats)
    })
      val vectorCalcs = tempVectorCalcs.groupByKey().map(data=>{
      val key = data._1
      val vals = data._2
      val size = vals.size
      val dotProduct = vals.map(f=>f._1).sum
      val ratingSum = vals.map(f=>f._2).sum
      val rating2Sum = vals.map(f=>f._3).sum
      val ratingSeq = vals.map(f=>f._4).sum
      val rating2Seq = vals.map(f=>f._5).sum
      val numPref = vals.map(f=>f._6).max
      val numPref2 = vals.map(f=>f._7).max
      (key,(size,dotProduct,ratingSum,rating2Sum,ratingSeq,rating2Seq,numPref,numPref2))
    })

    //due to matrix is not symmetry(对称) , use half matrix build full matrix
    val inverseVectorCalcs = vectorCalcs.map(x=>((x._1._2,x._1._1),(x._2._1,x._2._2,x._2._4,x._2._3,x._2._6,x._2._5,x._2._8,x._2._7)))
    val vectorCalcsTotal = vectorCalcs ++ inverseVectorCalcs

    // compute similarity metrics for each movie pair,  similarities meaning user2 to user1 similarity
    val tempSimilarities =
      vectorCalcsTotal.map(fields => {
        val key = fields._1
        val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2
        val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq))*
          size/(numRaters*math.log10(numRaters2+10))
//        val corr = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
        (key._1,(key._2, cosSim))
      })



    val similarities = tempSimilarities.groupByKey().flatMap(x=>{
      x._2.map(temp=>(x._1,(temp._1,temp._2))).toList.sortWith((a,b)=>a._2._2>b._2._2).take(50)
    })
    val temp = similarities.filter(x=>x._2._2.equals(Double.PositiveInfinity))

    val similarTable = similarities.map(x=>(x._1,x._2._1,x._2._2)).toDF()
    hiveql.sql("use DatabaseName")
    similarTable.insertInto("similar_user_test",true)


    // ratings format (user,(item,raing))
    val ratingsInverse = ratings.map(rating=>(rating._1,(rating._2,rating._3)))

    //statistics format ((user,item),(sim,sim*rating)),,,, ratingsInverse.join(similarities) fromating as (user,((item,rating),(user2,similar)))
    val statistics = ratingsInverse.join(similarities).map(x=>((x._2._2._1,x._2._1._1),(x._2._2._2,x._2._1._2*x._2._2._2)))

    // predictResult fromat ((user,item),predict)
    val predictResult = statistics.reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,x._2._2/x._2._1))


    val filterItem = ratings.map(x=>((x._1,x._2),Double.NaN))
    val totalScore = predictResult ++ filterItem

    val finalResult = totalScore.reduceByKey(_+_).filter(x=> !(x._2 equals(Double.NaN))).
      map(x=>(x._1._1,x._1._2,x._2)).groupBy(x=>x._1).flatMap(x=>(x._2.toList.sortWith((x,y)=>x._3>y._3).take(50)))

    val recommendTable = finalResult.toDF()
    hiveql.sql("use DatabaseName")
    recommendTable.insertInto("recommend_user_test",true)
  }

  // *************************
  // * SIMILARITY MEASURES
  // *************************

  /**
   * The correlation between two vectors A, B is
   *   cov(A, B) / (stdDev(A) * stdDev(B))
   *
   * This is equivalent to
   *   [n * dotProduct(A, B) - sum(A) * sum(B)] /
   *     sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
   */
  def correlation(size : Double, dotProduct : Double, ratingSum : Double,
                  rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {

    val numerator = size * dotProduct - ratingSum * rating2Sum
    val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
      scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)+1

    numerator / denominator
  }

  /**
   * Regularize correlation by adding virtual pseudocounts over a prior:
   *   RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
   * where w = # actualPairs / (# actualPairs + # virtualPairs).
   */
  def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double,
                             rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double,
                             virtualCount : Double, priorCorrelation : Double) = {

    val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
    val w = size / (size + virtualCount)

    w * unregularizedCorrelation + (1 - w) * priorCorrelation
  }

  /**
   * The cosine similarity between two vectors A, B is
   *   dotProduct(A, B) / (norm(A) * norm(B))
   */
  def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
    dotProduct / (ratingNorm * rating2Norm)
  }

  /**
   * The Jaccard Similarity between two sets A, B is
   *   |Intersection(A, B)| / |Union(A, B)|
   */
  def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
    val union = totalUsers1 + totalUsers2 - usersInCommon
    usersInCommon / union
  }
}

参考文献:http://mlnick.github.io/blog/2013/04/01/movie-recommendations-and-more-with-spark/

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容