当前位置 博文首页 > ?:Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐

    ?:Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐

    作者:[db:作者] 时间:2021-07-03 19:14

    Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐


    基于Spark MLlib 实现音乐推荐

    一、实验背景:

    熟悉 Audioscrobbler 数据集

    基于该数据集选择合适的 MLlib 库算法进行数据处理

    进行音乐推荐(或用户推荐)

    二、实验目的:

    计算AUC评分最高的参数

    利用AUC评分最高的参数,给用户推荐艺术家

    对多个用户进行艺术家推荐

    利用AUC评分最高的参数,给艺术家推荐喜欢他的用户

    三、实验步骤:

    1. 安装Hadoop和Spark
    2. 启动Hadoop与Spark
    3. 将文件上传到 HDFS
    4. 实现音乐推荐

    四、实验过程:

    1、安装Hadoop和Spark

    具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

    Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
    Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

    提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

    IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

    2、启动Hadoop与Spark

    查看3个节点的进程

    master在这里插入图片描述
    slave1
    在这里插入图片描述
    slave2
    在这里插入图片描述

    3、将文件上传到 HDFS

    Shell命令:

    [root@master ~]# cd /opt/data/profiledata_06-May-2005/
    [root@master profiledata_06-May-2005]# ls
    [root@master profiledata_06-May-2005]# hadoop dfs -put artist_alias.txt artist_data.txt user_artist_data.txt /spark/input
    

    在这里插入图片描述

    4、实现音乐推荐

    源代码:

    package com.John.SparkProject
    
    import org.apache.spark.SparkConf
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.ml.recommendation.{ALS, ALSModel}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    import scala.collection.mutable.ArrayBuffer
    import scala.util.Random
    
    
    /**
     * @author John
     * @Date 2021/5/25 12:49
     */
    object project02 {
      def main(args: Array[String]): Unit = {
    
        /**
         * 前期环境配置以及数据准备
         */
        // 创建一个SparkSession对象
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("Project02_RecommenderApp")
          .set("spark.sql.crossJoin.enabled", "true")
        val spark = new SparkSession.Builder()
          .config(conf)
          .getOrCreate()
    
        import spark.implicits._
        // 导入 artist_data.txt 文件 (每个艺术家的 ID 和对应的名字)
        // 字段名分别是: artistid artist_name
        val rawArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_data.txt")
        val artistIdDF = transformArtistData(rawArtistData)
        val artistIdDFtest = transformArtistData1(rawArtistData)
    
        // 导入 artist_alias.txt 文件 (将拼写错误的艺术家 ID 或ID 变体对应到该艺术家的规范 ID)
        // 字段名分别是: badid goodid
        val rawAliasData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_alias.txt")
        val artistAlias = transformAliasData(rawAliasData).collect().toMap
    
        // 导入 user_artist_data.txt 文件 (用户音乐收听数据)
        // 字段名分别是 userid artistid playcount
        val rawUserArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/user_artist_data.txt")
    
        // 整合数据
        val allDF = transformUserArtistData(spark, rawUserArtistData, artistAlias)
        allDF.persist()
    
        // 拆分训练集和测试集
        val Array(trainDF, testDF) = allDF.randomSplit(Array(0.9, 0.1))
        trainDF.persist()
    
        // 查看一下指定 user 听过的艺术家
        allDF.join(artistIdDFtest,"artist").select("name").filter("user='2093760'").show(5)
    
        /**
         * 根据题目要求参数
         * 给用户推荐艺术家
         */
        //        // 根据题目要求构建模型
        //        val als = new ALS()
        //              .setSeed(Random.nextLong())
        //              .setImplicitPrefs(true)
        //              .setRank(10) // 模型潜在因素个数
        //              .setRegParam(0.01) // 正则化参数
        //              .setAlpha(1.0) // 管理偏好观察值的 基线置信度
        //              .setMaxIter(5) // 最大迭代次数
        //              .setUserCol("user")
        //              .setItemCol("artist")
        //              .setRatingCol("count")
        //              .setPredictionCol("prediction")
        //        // 用训练数据训练模型
        //        val model = als.fit(trainDF)
        //        // 释放缓存资源
        //        trainDF.unpersist()
        //        // 开始推荐
        //        val userID = 2093760
        //        val artistNum = 5
        //        val recommendDF = recommend(model, userID, artistNum, artistIdDF)
        //
        //        val strings = recommendDF.map(_.mkString("|")).collect()
        //        println(strings.toBuffer)
    
        /**
         * 计算AUC评分最高的参数
         * 原理:循环指定参数暴力计算,根据AUC计算出评分最高的参数进行建模
         */
        //    // 艺术家id数据,用于AUC评分
        //    val allArtistIds = allDF.select("artist").as[Int].distinct().collect()
        //    val bAllArtistIds = spark.sparkContext.broadcast(allArtistIds)
        //
        //    // 网格搜索
        //    val evaluations =
        //    // 利用for循环,生成不同的超参数配置
        //      for (rank <- Seq(5, 30);
        //           regParam <- Seq(4.0, 0.0001);
        //           alpha <- Seq(1.0, 40.0))
        //        yield {
        //          // 构建模型
        //          val als = new ALS()
        //            .setSeed(Random.nextLong())
        //            .setImplicitPrefs(true)
        //            .setRank(rank)
        //            .setRegParam(regParam)
        //            .setAlpha(alpha)
        //            .setMaxIter(5)
        //            .setUserCol("user")
        //            .setItemCol("artist")
        //            .setRatingCol("count")
        //            .setPredictionCol("prediction")
        //          val model = als.fit(trainDF)
        //          val auc = areaUnderCurve(testDF, bAllArtistIds, model.transform)
        //          // 释放资源
        //          model.userFactors.unpersist()
        //          model.itemFactors.unpersist()
        //          (auc, (rank, regParam, alpha))
        //        }
        //    // 按评分降序输出各参数信息
        //    evaluations.sorted.reverse.foreach(println)
        //        (0.9134340440577203,(30,4.0,40.0)) // 最优参数
        //        (0.9124295941963009,(30,4.0,1.0))
        //        (0.9121292259762062,(30,1.0E-4,40.0))
        //        (0.9111586767382363,(5,4.0,40.0))
        //        (0.9097682726329872,(5,1.0E-4,40.0))
        //        (0.9089218752871897,(5,4.0,1.0))
        //        (0.9038315464345514,(5,1.0E-4,1.0))
        //        (0.8951870697645603,(30,1.0E-4,1.0))
    
        /**
         * 利用AUC评分最高的参数
         * 给用户推荐艺术家
         */
        //    // 利用AUC评分最高的参数进行模型构建
        //    val als = new ALS()
        //      .setSeed(Random.nextLong())
        //      .setImplicitPrefs(true)
        //      .setRank(30) // 模型潜在因素个数
        //      .setRegParam(4.0) // 正则化参数
        //      .setAlpha(40.0) // 管理偏好观察值的 基线置信度
        //      .setMaxIter(5) // 最大迭代次数
        //      .setUserCol("user")
        //      .setItemCol("artist")
        //      .setRatingCol("count")
        //      .setPredictionCol("prediction")
        //    // 训练模型
        //    val model = als.fit(trainDF)
        //    // 释放缓存资源
        //    trainDF.unpersist()
        //    // 开始推荐
        //    val userID = 2093760
        //    val artistNum = 5
        //    val recommendDF = recommend(model, userID, artistNum, artistIdDF)
        //    recommendDF.show()
        //
        //    val strings = recommendDF.map(_.mkString("|")).collect()
        //    println(strings.toBuffer)
    
        /**
         * 用测试数据对10个用户进行推荐
         */
    //    // 利用AUC评分最高的参数进行模型构建
    //    val als = new ALS()
    //      .setSeed(Random.nextLong())
    //      .setImplicitPrefs(true)
    //      .setRank(30) // 模型潜在因素个数
    //      .setRegParam(4.0) // 正则化参数
    //      .setAlpha(40.0) // 管理偏好观察值的 基线置信度
    //      .setMaxIter(5) // 最大迭代次数
    //      .setUserCol("user")
    //      .setItemCol("artist")
    //      .setRatingCol("count")
    //      .setPredictionCol("prediction")
    //    // 训练模型
    //    val model = als.fit(trainDF)
    //    //用测试数据对100个用户进行推荐
    //    val someUsers = testDF.select("user").as[Int].distinct.take(10)
    //    // 推荐
    //    someUsers.map { user =>
    //      val recommendDF = recommend(model, user, 5, artistIdDF)
    //      val strings = recommendDF.map(_.mkString("|")).collect()
    //      (user, strings.toBuffer)
    //    }.foreach(println)
    
    
        /**
         * 利用AUC评分最高的参数
         * 给艺术家推荐喜欢他的用户
         */
    //    //利用AUC评分最高的参数进行模型构建
    //    val als = new ALS()
    //      .setSeed(Random.nextLong())
    //      .setImplicitPrefs(true)
    //      .setRank(30) // 模型潜在因素个数
    //      .setRegParam(4.0) // 正则化参数
    //      .setAlpha(40.0) // 管理偏好观察值的 基线置信度
    //      .setMaxIter(5) // 最大迭代次数
    //      .setUserCol("artist")
    //      .setItemCol("user")
    //      .setRatingCol("count")
    //      .setPredictionCol("prediction")
    //    // 训练模型
    //    val model = als.fit(trainDF)
    //    // 释放缓存资源
    //    trainDF.unpersist()
    //    // 开始推荐
    //    val artistID = 930
    //    val userNum = 100
    //    val recommendDF = recommendArtist(model, artistID, userNum, allDF)
    //    val strings = recommendDF.select("user").map(_.mkString("|")).collect()
    //    println(strings.toBuffer)
    
    
        // 关闭spark
        spark.stop()
      }
    
      /**
       * 规范艺术家的ID,合并数据,创建一个总的数据集
       *
       * @param spark           SparkSession
       * @param rawUserArtistDS 用户和艺术家的关系数据集
       * @param artistAlias     艺术家别名id,用于补全
       * @return
       */
      def transformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = {
        import spark.implicits._
    
        // 广播变量
        // 广播变量主要用于在迭代中一直需要被访问的只读变量
        // 它将此变量缓存在每个 executor 里,以减少集群网络传输消耗
        val bArtistAlias = spark.sparkContext.broadcast(artistAlias)
    
        rawUserArtistDS.map(line => {
          val Array(userId, artistId, count) = line.split(' ').map(_.toInt) //以空格分隔每一行,并将数据转换为int类型
          val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId) // 如果有值,那就可以得到这个值,如果没有就会得到一个默认值
          (userId, finalArtistId, count)
        }).toDF("user", "artist", "count").cache() // 设置字段名称, cache() 以指示 Spark 在 RDD 计算好后将其暂时存储在集群的内存里
      }
    
      /**
       * 将 artist_data.txt 文件的数据转化为dataframe
       *
       * @param rawArtistData Dataset类型的artist_data数据
       * @return
       */
      def transformArtistData(rawArtistData: Dataset[String]): DataFrame = {
        // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
        import rawArtistData.sparkSession.implicits._
    
        // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
        rawArtistData.flatMap(line => {
          val (id, name) = line.span(_ != '\t')
          try {
            if (name.nonEmpty)
              Some(id.toInt, name.trim)
            else
              None
          } catch {
            case _: Exception => None
          }
        }).toDF("id", "name").cache()
      }
    
      /**
       * 将 artist_data.txt 文件的数据转化为dataframe
       *
       * @param rawArtistData Dataset类型的artist_data数据
       * @return
       */
      def transformArtistData1(rawArtistData: Dataset[String]): DataFrame = {
        // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
        import rawArtistData.sparkSession.implicits.