当前位置 博文首页 > ?:Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐
熟悉 Audioscrobbler 数据集
基于该数据集选择合适的 MLlib 库算法进行数据处理
进行音乐推荐(或用户推荐)
计算AUC评分最高的参数
利用AUC评分最高的参数,给用户推荐艺术家
对多个用户进行艺术家推荐
利用AUC评分最高的参数,给艺术家推荐喜欢他的用户
1、安装Hadoop和Spark
具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:
Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894
提示:如果IDEA未构建Spark项目,可以转接到以下的博客:
IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536
2、启动Hadoop与Spark
查看3个节点的进程
master
slave1
slave2
3、将文件上传到 HDFS
Shell命令:
[root@master ~]# cd /opt/data/profiledata_06-May-2005/
[root@master profiledata_06-May-2005]# ls
[root@master profiledata_06-May-2005]# hadoop dfs -put artist_alias.txt artist_data.txt user_artist_data.txt /spark/input
4、实现音乐推荐
源代码:
package com.John.SparkProject
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* @author John
* @Date 2021/5/25 12:49
*/
object project02 {
def main(args: Array[String]): Unit = {
/**
* 前期环境配置以及数据准备
*/
// 创建一个SparkSession对象
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Project02_RecommenderApp")
.set("spark.sql.crossJoin.enabled", "true")
val spark = new SparkSession.Builder()
.config(conf)
.getOrCreate()
import spark.implicits._
// 导入 artist_data.txt 文件 (每个艺术家的 ID 和对应的名字)
// 字段名分别是: artistid artist_name
val rawArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_data.txt")
val artistIdDF = transformArtistData(rawArtistData)
val artistIdDFtest = transformArtistData1(rawArtistData)
// 导入 artist_alias.txt 文件 (将拼写错误的艺术家 ID 或ID 变体对应到该艺术家的规范 ID)
// 字段名分别是: badid goodid
val rawAliasData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_alias.txt")
val artistAlias = transformAliasData(rawAliasData).collect().toMap
// 导入 user_artist_data.txt 文件 (用户音乐收听数据)
// 字段名分别是 userid artistid playcount
val rawUserArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/user_artist_data.txt")
// 整合数据
val allDF = transformUserArtistData(spark, rawUserArtistData, artistAlias)
allDF.persist()
// 拆分训练集和测试集
val Array(trainDF, testDF) = allDF.randomSplit(Array(0.9, 0.1))
trainDF.persist()
// 查看一下指定 user 听过的艺术家
allDF.join(artistIdDFtest,"artist").select("name").filter("user='2093760'").show(5)
/**
* 根据题目要求参数
* 给用户推荐艺术家
*/
// // 根据题目要求构建模型
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(10) // 模型潜在因素个数
// .setRegParam(0.01) // 正则化参数
// .setAlpha(1.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("user")
// .setItemCol("artist")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 用训练数据训练模型
// val model = als.fit(trainDF)
// // 释放缓存资源
// trainDF.unpersist()
// // 开始推荐
// val userID = 2093760
// val artistNum = 5
// val recommendDF = recommend(model, userID, artistNum, artistIdDF)
//
// val strings = recommendDF.map(_.mkString("|")).collect()
// println(strings.toBuffer)
/**
* 计算AUC评分最高的参数
* 原理:循环指定参数暴力计算,根据AUC计算出评分最高的参数进行建模
*/
// // 艺术家id数据,用于AUC评分
// val allArtistIds = allDF.select("artist").as[Int].distinct().collect()
// val bAllArtistIds = spark.sparkContext.broadcast(allArtistIds)
//
// // 网格搜索
// val evaluations =
// // 利用for循环,生成不同的超参数配置
// for (rank <- Seq(5, 30);
// regParam <- Seq(4.0, 0.0001);
// alpha <- Seq(1.0, 40.0))
// yield {
// // 构建模型
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(rank)
// .setRegParam(regParam)
// .setAlpha(alpha)
// .setMaxIter(5)
// .setUserCol("user")
// .setItemCol("artist")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// val model = als.fit(trainDF)
// val auc = areaUnderCurve(testDF, bAllArtistIds, model.transform)
// // 释放资源
// model.userFactors.unpersist()
// model.itemFactors.unpersist()
// (auc, (rank, regParam, alpha))
// }
// // 按评分降序输出各参数信息
// evaluations.sorted.reverse.foreach(println)
// (0.9134340440577203,(30,4.0,40.0)) // 最优参数
// (0.9124295941963009,(30,4.0,1.0))
// (0.9121292259762062,(30,1.0E-4,40.0))
// (0.9111586767382363,(5,4.0,40.0))
// (0.9097682726329872,(5,1.0E-4,40.0))
// (0.9089218752871897,(5,4.0,1.0))
// (0.9038315464345514,(5,1.0E-4,1.0))
// (0.8951870697645603,(30,1.0E-4,1.0))
/**
* 利用AUC评分最高的参数
* 给用户推荐艺术家
*/
// // 利用AUC评分最高的参数进行模型构建
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(30) // 模型潜在因素个数
// .setRegParam(4.0) // 正则化参数
// .setAlpha(40.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("user")
// .setItemCol("artist")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 训练模型
// val model = als.fit(trainDF)
// // 释放缓存资源
// trainDF.unpersist()
// // 开始推荐
// val userID = 2093760
// val artistNum = 5
// val recommendDF = recommend(model, userID, artistNum, artistIdDF)
// recommendDF.show()
//
// val strings = recommendDF.map(_.mkString("|")).collect()
// println(strings.toBuffer)
/**
* 用测试数据对10个用户进行推荐
*/
// // 利用AUC评分最高的参数进行模型构建
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(30) // 模型潜在因素个数
// .setRegParam(4.0) // 正则化参数
// .setAlpha(40.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("user")
// .setItemCol("artist")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 训练模型
// val model = als.fit(trainDF)
// //用测试数据对100个用户进行推荐
// val someUsers = testDF.select("user").as[Int].distinct.take(10)
// // 推荐
// someUsers.map { user =>
// val recommendDF = recommend(model, user, 5, artistIdDF)
// val strings = recommendDF.map(_.mkString("|")).collect()
// (user, strings.toBuffer)
// }.foreach(println)
/**
* 利用AUC评分最高的参数
* 给艺术家推荐喜欢他的用户
*/
// //利用AUC评分最高的参数进行模型构建
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(30) // 模型潜在因素个数
// .setRegParam(4.0) // 正则化参数
// .setAlpha(40.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("artist")
// .setItemCol("user")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 训练模型
// val model = als.fit(trainDF)
// // 释放缓存资源
// trainDF.unpersist()
// // 开始推荐
// val artistID = 930
// val userNum = 100
// val recommendDF = recommendArtist(model, artistID, userNum, allDF)
// val strings = recommendDF.select("user").map(_.mkString("|")).collect()
// println(strings.toBuffer)
// 关闭spark
spark.stop()
}
/**
* 规范艺术家的ID,合并数据,创建一个总的数据集
*
* @param spark SparkSession
* @param rawUserArtistDS 用户和艺术家的关系数据集
* @param artistAlias 艺术家别名id,用于补全
* @return
*/
def transformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = {
import spark.implicits._
// 广播变量
// 广播变量主要用于在迭代中一直需要被访问的只读变量
// 它将此变量缓存在每个 executor 里,以减少集群网络传输消耗
val bArtistAlias = spark.sparkContext.broadcast(artistAlias)
rawUserArtistDS.map(line => {
val Array(userId, artistId, count) = line.split(' ').map(_.toInt) //以空格分隔每一行,并将数据转换为int类型
val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId) // 如果有值,那就可以得到这个值,如果没有就会得到一个默认值
(userId, finalArtistId, count)
}).toDF("user", "artist", "count").cache() // 设置字段名称, cache() 以指示 Spark 在 RDD 计算好后将其暂时存储在集群的内存里
}
/**
* 将 artist_data.txt 文件的数据转化为dataframe
*
* @param rawArtistData Dataset类型的artist_data数据
* @return
*/
def transformArtistData(rawArtistData: Dataset[String]): DataFrame = {
// 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
import rawArtistData.sparkSession.implicits._
// 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
rawArtistData.flatMap(line => {
val (id, name) = line.span(_ != '\t')
try {
if (name.nonEmpty)
Some(id.toInt, name.trim)
else
None
} catch {
case _: Exception => None
}
}).toDF("id", "name").cache()
}
/**
* 将 artist_data.txt 文件的数据转化为dataframe
*
* @param rawArtistData Dataset类型的artist_data数据
* @return
*/
def transformArtistData1(rawArtistData: Dataset[String]): DataFrame = {
// 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
import rawArtistData.sparkSession.implicits.