当前位置 博文首页 > ?:Spark大数据分析与实战:基于Spark框架实现TopN

    ?:Spark大数据分析与实战:基于Spark框架实现TopN

    作者:[db:作者] 时间:2021-07-03 19:14

    Spark大数据分析与实战:基于Spark框架实现TopN


    基于Spark框架实现TopN

    一、实验背景:

    基于Spark框架实现TopN

    二、实验目的:

    获取蜀国武将中武力值最高的5位,即通过分布式计算框架实现从原始数据查询出武力最高的Top5

    三、实验步骤:

    1. 安装Hadoop和Spark
    2. 启动Hadoop与Spark
    3. 创建 rank.txt 文件
    4. 将 rank.txt 文件上传到 HDFS 上
    5. 实现TopN计算
    6. 查看 HDFS 上的结果

    四、实验过程:

    1、安装Hadoop和Spark

    具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

    Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
    Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

    提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

    IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

    2、

    查看3个节点的进程

    master在这里插入图片描述
    slave1
    在这里插入图片描述
    slave2
    在这里插入图片描述

    3、创建 rank.txt 文件

    Shell命令:

    [root@master ~]# cd /opt/data
    [root@master data]# vim rank.txt
    

    在这里插入图片描述
    4、将 rank.txt 文件上传到 HDFS 上

    Shell命令:

    [root@master data]# hadoop dfs -mkdir /spark
    [root@master data]# hadoop dfs -mkdir /spark/input
    [root@master data]# hadoop dfs -put rank.txt /spark/input/
    

    在这里插入图片描述
    5、实现TopN计算

    package com.John.SparkProject
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    /**
     * @author John
     * @Date 2021/5/24 16:41
     */
    object project01 {
      def main(args: Array[String]): Unit = {
    
        // 创建一个SparkSession对象
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("Project01_TOP")
          .set("spark.sql.crossJoin.enabled", "true")
        val spark = new SparkSession.Builder()
          .config(conf)
          .getOrCreate()
    
        // 加载文件
        val text = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/rank.txt")
    
        // 跳过表头
        import spark.implicits._
        val header = text.first()
        val dsData = text.filter(_ != header)
    
        // 读取每一行,设置分隔符为空格,转换数据类型,并设置表头
        val dfData = dsData.map(line => {
          val Array(id, name, experience,country) = line.split(' ')//以空格分隔每一行,并将数据转换为int类型
          (id.toInt, name, experience.toInt,country) // 设置字段名称
        }).toDF("序号", "姓名", "武力值","国家")
    
        // 根据指定列倒序排列,并取前五条数据
        val setRes = dfData.orderBy(-dfData("武力值")).limit(5)
    
        //展示数据
        setRes.show()
    
        // 以json格式保存数据
        setRes.write.format("json").save("hdfs://192.168.254.122:9000/spark/output/")
      }
    }
    

    在这里插入图片描述
    6、查看 HDFS 上的结果

    Shell命令:

    [root@master ~]# hadoop dfs -cat /spark/output/part-00000-af1dda43-b059-4da1-997e-2196db88178e-c000.json
    

    在这里插入图片描述

    五、实验结论:

    实验主要考察了大家对HDFS的操作,和Spark对数据操作以及关联HDFS。

    实验难度不是特别大,大家可以进一步的推广尝试!

    cs