使用Spark和Zeppelin探索movie-lens数据,sparkmovie-lens


MovieLens 100k数据包含有100,000条用户与电影的相关数据。
首先下载并解压数据:

wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip ml-100k.zip
cd ml-100k
#用户文件(ID,年龄,性别,职业,邮编)
zhf@ubuntu:~/Downloads/ml-100k$ head -5 u.user 
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
#电影文件(ID,标题,发行日期,等)
zhf@ubuntu:~/Downloads/ml-100k$ head -5 u.item
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
#用户对电影的评分文件(用户ID,电影ID,分数(1~5),时间戳),以\t分隔
zhf@ubuntu:~/Downloads/ml-100k$ head -5 u.data 
196     242     3       881250949
186     302     3       891717742
22      377     1       878887116
244     51      2       880606923
166     346     1       886397596

打开zeppelin

zhf@ubuntu:~/Downloads/incubator-zeppelin$ bin/zeppelin-daemon.sh start
Zeppelin start                                             [  OK  ]

zeppelin使用的是8080端口,直接打开http://localhost:8080就可以了。zeppelin默认是使用scala语言,这里不做修改,同时自动载入了SparkContext。
读入文件:

val user_data = sc.textFile("/home/zhf/Downloads/ml-100k/u.user")
user_data.first()

会打印出如下:

user_data: org.apache.spark.rdd.RDD[String] = /home/zhf/Downloads/ml-100k/u.user MapPartitionsRDD[1] at textFile at <console>:23
res0: String = 1|24|M|technician|85711

对用户文件做简单统计:

val user_fields = user_data.map(line => line.split("\\|"))
val num_users = user_fields.map(fields => fields(0)).count()
val num_genders = user_fields.map(fields => fields(2)).distinct().count()
val num_occupations = user_fields.map(fields => fields(3)).distinct().count()
val num_zipcodes = user_fields.map(fields => fields(4)).distinct().count()

结果:

user_fields: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[81] at map at <console>:25
num_users: Long = 943
num_genders: Long = 2
num_occupations: Long = 21
num_zipcodes: Long = 795

zeppelin支持画图,功能简单但强大,可同时输出表格、柱状图、折线图、饼状图、折线图、点图。
下面将各年龄的用户数用画出来,画图的实现:
println(“%table column_1\tcolumn_2\n”+
value_1\tvalue_2\n+…)

val ages_table = user_fields.map(fields => (fields(1),1)).reduceByKey(_ + _).collect().map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table age\tsize\n" + ages_table)

各年龄用户分布图
由图可见,青年用户最多。
下面看用户的职业:

val count_by_occupation = user_fields.map(fields => (fields(3),1)).reduceByKey(_ + _).collect().toList.sortBy(_._2).map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table occupation\tsize\n" + count_by_occupation)

用户职业分布
学生、教育工作者、程序员等用户较多。
下面看一下电影数据:

val movie_data = sc.textFile("/home/zhf/Downloads/ml-100k/u.item")
movie_data.first()
val num_movies = movie_data.count()

输出:

movie_data: org.apache.spark.rdd.RDD[String] = /home/zhf/Downloads/ml-100k/u.item MapPartitionsRDD[260] at textFile at <console>:24
res118: String = 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
num_movies: Long = 1682

由于电影文件中的发行日期部分存在缺失,需要处理一下,先定义一个函数,把日期错误的统统转为1900年:

def convert_year(date : String) : Int = {
    try
        date.split("-")(2).toInt
    catch{
        case e : Exception => 1900
    }
}

下面看电影发行了多少年(此数据的收集年份是1998年,所以用1998-发行年份,同时过滤年份为1900的数据)

val movie_years = movie_data.map(line => line.split("\\|")).map(fields => fields(2)).map(date => convert_year(date)).filter(year => year != 1900)
val movie_ages = movie_years.map(year => 1998 - year).countByValue().map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table age\tsize\n" + movie_ages)

电影发行年数分布
近5年发行的电影最多,可能是确实比起之前发行量增多,也可能是收集数据的问题。
下面看评分数据:

val rating_data_raw = sc.textFile("/home/zhf/Downloads/ml-100k/u.data")
rating_data_raw.first()
val num_ratings = rating_data_raw.count()

输出:

rating_data_raw: org.apache.spark.rdd.RDD[String] = /home/zhf/Downloads/ml-100k/u.data MapPartitionsRDD[246] at textFile at <console>:24
res114: String = 196    242 3   881250949
num_ratings: Long = 100000

计算一些统计指标:

val rating_data = rating_data_raw.map(line => line.split("\t"))
val ratings  = rating_data.map(fields => fields(2).toInt)
val max_rating = ratings.reduce((x,y) => Math.max(x,y))
val min_rating = ratings.reduce((x,y) => Math.min(x,y))
val mean_rating = ratings.reduce(_ + _) / num_ratings.toDouble //平均值
val median_rating = ratings.collect().toList.sorted.drop(num_ratings.toInt/2).head //中位数
val ratings_per_user = num_ratings / num_users
val ratings_per_movie = num_ratings / num_movies

输出:

rating_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[265] at map at <console>:25
ratings: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[266] at map at <console>:27
max_rating: Int = 5
min_rating: Int = 1
mean_rating: Double = 3.52986
median_rating: Int = 4
ratings_per_user: Long = 106
ratings_per_movie: Long = 59

相对以上的计算,spark也自带了一个统计函数:

ratings.stats()

输出:

res121: org.apache.spark.util.StatCounter = (count: 100000, mean: 3.529860, stdev: 1.125668, max: 5.000000, min: 1.000000)

评分分数分布:

val count_by_rating = ratings.countByValue().map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table rating\tcount\n" + count_by_rating)

评分分数分布
极端评分比较少。

每个用户评分过多少电影:

val user_ratings_by_user = rating_data.map(fields => (fields(0).toInt,fields(1).toInt)).groupByKey().map(x => (x._1,x._2.size))
val user_rating_table = user_ratings_by_user.collect().map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table user\tcount\n" + user_rating_table)

用户评分过电影分布

每个电影被多少人评分过:

val movie_ratings_by_user = rating_data.map(fields => (fields(1).toInt,fields(0).toInt)).groupByKey().map(x => (x._1,x._2.size))
val movie_rating_table = movie_ratings_by_user.collect().map(line => line._1+"\t"+line._2).toList.mkString("\n")
println("%table movie\tcount\n" + movie_rating_table)

电影被评分次数分布

由上,可见Spark处理数据非常方便,zeppelin在数据可视化方面的拥有强大能力,虽然还不能像matplot那样完全自定义画图参数,不过可以在zeppelin中选择%pyspark,这样就可以使用python的各种包了。

参考:【Machine Learning with Spark】

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容

    暂无相关文章