利用spark构建分布式电影协同过滤推荐系统

一、数据采集

使用MovieLen的开放数据集作为数据源,包含了6000个用户对4000个电影的评分数据,大概有100万条评分数据。数据集也可以从这个网址下载。数据集一共有3个文件:

movie.date(电影ID::电影名称::标签)

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy

rating.dat(用户id::电影ID::评分::时间戳)

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291

user.dat(用户ID::性别::年龄::Occupation::Zip-code)

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455

二、数据处理

使用spark的Mlib库需要添加依赖,主要需要注释掉scope这一行,不然还是会报错

object mllib is not a member of package org.apache.spark

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.4.0</version>
      <!-- <scope>runtime</scope> -->
    </dependency>  
  • 1.读取评分数据和电影数据
package Sparkstreaming
 
import org.apache.spark.{SparkConf,SparkContext}
object Movie {
  def main(args: Array[String]): Unit = {
    //设置spark环境
    val conf=new SparkConf().setMaster("local[2]").setAppName("movie")
    val sc=new SparkContext(conf)
    //读取电影,转成map类型
    val movie=sc.textFile("D:\\Python\\ml-1m\\movies.dat").map{
      line=>
        val str=line.split("::")
        //获取电影ID和电影名称
        (str(0).toInt,str(1))
    }.collect().toMap
    //println(movie)
    //读取评分数据
    val rate=sc.textFile("D:\\Python\\ml-1m\\ratings.dat").map{
      line=>
        val str=line.split("::")
        //生成根据时间戳排列的用户电影评分数据(0,4344,1974,1.0)
        val rating=Rating(str(0).toInt,str(1).toInt,str(2).toDouble)
        val timestamp=str(3).toLong % 10
        (timestamp,rating)
    }
    //rate.foreach(print)
    val moviecount=rate.map(_._3).distinct().count()
    val usercount=rate.map(_._2).distinct().count()
    val totalcount=rate.count()
    println("一共有"+totalcount+"条数据,其中电影条数为"+moviecount+"用户数为"+usercount)
 
  }
}  

结果:

image

  • 2.划分测试集和训练集,计算均方误差RMSE 得出最佳参数:
// 划分训练集和测试集(训练集60%,测试集40%)
    val splits = rate.randomSplit(Array(0.6, 0.4), seed =2)
    val training= splits(0).repartition(4).cache().map(_._2)
    val test = splits(1).repartition(4).cache().map(_._2)
    //定义函数计算均方误差RMSE
    def Rmse(model: MatrixFactorizationModel,data:RDD[Rating]):
    Double={
      val predict=model.predict(data.map(x=>(x.user,x.product)))map{x=>((x.user,x.product),x.rating)}
      val actual=data.map(x=>((x.user,x.product),x.rating))
      val predictrate=predict.join(actual).values
      math.sqrt(predictrate.map(x=>(x._1-x._2)*(x._1-x._2)).mean())
    }
    //测试合适的参数
    val ranks=List(8,12)//矩阵的秩
    val regus=List(1.0,10.0)//正则系数
    val iters=List(10,20)//迭代次数
    var bestmodel:Option[MatrixFactorizationModel]=None
    var bestRmse=Double.MaxValue
    var bestrank=0
    var bestregu = -1.0
    var bestiter = -1
    for (rank<-ranks;regu<-regus;iter<-iters){
      val model=ALS.train(training,rank,iter,regu)
      val valiRmse=Rmse(model,test)
      if (valiRmse<bestRmse){
        bestmodel=Some(model)
        bestRmse=valiRmse
        bestrank=rank
        bestregu=regu
        bestiter=iter
      }
 
    }
    val testRmse=Rmse(bestmodel.get,test)
    println("bestrank:"+bestrank+",bestiter:"+bestiter+",bestregu:"+bestregu+",testRmse:"+testRmse)
    

过程可能有的久,结果:

image

所以轶是8,正则系数是1.0,迭代次数是10,均方误差为1.35

  • 3.使用该模型为用户进行推荐
//使用该模型为用户推荐电影
      val userid=1
      // 求出第一个用户所有评分过的电影
      val user1RatedMovieIds = rate.filter(_._2.user==userid).map(_._2.product).collect.toSeq
      // 求出第一个用户没有评分过的电影
      val cands = sc.parallelize(movie.keys.filter(!user1RatedMovieIds.contains(_)).toSeq)
      // 使用模型来对潜在推荐电影进行评分,并按照预测评分进行排序,取前10
      val recommendations = model.predict(cands.map((userid,_))).collect.sortBy(- _.rating).take(10)
      var i =1
      println("的推荐结果为:")
      recommendations.foreach{rec => println("%2d".format(i)+": "+movie(rec.product)+", predictRating: "+rec.rating);i+=1}

结果:

image

完整代码:

package Sparkstreaming
 
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
object Movie {
  def main(args: Array[String]): Unit = {
    //设置spark环境
    val conf=new SparkConf().setMaster("local[2]").setAppName("movie")
    val sc=new SparkContext(conf)
    sc.setLogLevel("WARN")
    //读取电影,转成map类型
    val movie=sc.textFile("D:\\Python\\ml-1m\\movies.dat").map{
      line=>
        val str=line.split("::")
        //获取电影ID和电影名称
        (str(0).toInt,str(1))
    }.collect().toMap
    //println(movie)
    //读取评分数据
    val rate=sc.textFile("D:\\Python\\ml-1m\\ratings.dat").map{
      line=>
        val str=line.split("::")
        //生成根据时间戳排列的用户电影评分数据(0,4344,1974,1.0)
        val rating=Rating(str(0).toInt,str(1).toInt,str(2).toDouble)
        val timestamp=str(3).toLong % 10
        (timestamp,rating)
 
    }
    //rate.foreach(print)
    val moviecount=rate.map(_._2.product).distinct().count()
    val usercount=rate.map(_._2.user).distinct().count()
    val totalcount=rate.count()
    println("一共有"+totalcount+"条数据,其中电影条数为"+moviecount+"用户数为"+usercount)
    // 划分训练集和测试集(训练集60%,测试集40%)
    val splits = rate.randomSplit(Array(0.6, 0.4), seed =2)
    val training= splits(0).repartition(4).cache().map(_._2)
    val test = splits(1).repartition(4).cache().map(_._2)
    //定义函数计算均方误差RMSE
    def Rmse(model: MatrixFactorizationModel,data:RDD[Rating]):
    Double={
      val predict=model.predict(data.map(x=>(x.user,x.product)))map{x=>((x.user,x.product),x.rating)}
      val actual=data.map(x=>((x.user,x.product),x.rating))
      val predictrate=predict.join(actual).values
      math.sqrt(predictrate.map(x=>(x._1-x._2)*(x._1-x._2)).mean())
    }
    //测试合适的参数
    val ranks=List(8,12)//矩阵的秩
    val regus=List(1.0,10.0)//正则系数
    val iters=List(10,20)//迭代次数
    var bestmodel:Option[MatrixFactorizationModel]=None
    var bestRmse=Double.MaxValue
    var bestrank=0
    var bestregu = -1.0
    var bestiter = -1
    for (rank<-ranks;regu<-regus;iter<-iters){
      val model=ALS.train(training,rank,iter,regu)
      val valiRmse=Rmse(model,test)
      if (valiRmse<bestRmse){
        bestmodel=Some(model)
        bestRmse=valiRmse
        bestrank=rank
        bestregu=regu
        bestiter=iter
      }
      val testRmse=Rmse(bestmodel.get,test)
      println("bestrank:"+bestrank+",bestiter:"+bestiter+",bestregu:"+bestregu+",testRmse:"+testRmse)
      //bestrank:8,bestiter:10,bestregu:1.0,testRmse:1.352360900171576
      //使用该模型为用户推荐电影
      val userid=1
      // 求出第一个用户所有评分过的电影
      val user1RatedMovieIds = rate.filter(_._2.user==userid).map(_._2.product).collect.toSeq
      // 求出第一个用户没有评分过的电影
      val cands = sc.parallelize(movie.keys.filter(!user1RatedMovieIds.contains(_)).toSeq)
      // 使用模型来对潜在推荐电影进行评分,并按照预测评分进行排序,取前10
      val recommendations = model.predict(cands.map((userid,_))).collect.sortBy(- _.rating).take(10)
      var i =1
      println("推荐结果为:")
      recommendations.foreach{rec => println("%2d".format(i)+": "+movie(rec.product)+", predictRating: "+rec.rating);i+=1}
 
 
    }
 
 
 
 
 
 
  }
}