当前位置 博文首页 > qq262593421的博客:SparkSQL保存DataFrame为CSV文件

    qq262593421的博客:SparkSQL保存DataFrame为CSV文件

    作者:[db:作者] 时间:2021-08-30 10:27

    ?

    ReadShipMMSITwo

    package com.xtd.file
    
    import java.io.{ BufferedWriter, File, FileWriter}
    import java.util
    
    import com.xtd.entity.RouteLine
    import com.xtd.example.SparkOpenGIS
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    import org.geotools.data.DataStore
    import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
    
    object ReadRirWriteHBase {
    
      /** mmsi date startTime endTime startPoint endPoint line acrossDays longitude latitude */
      /** mmsi */
      var mmsi:String = _
      /** 上个日期 */
      var lastDate:String = _
      /** 当前日期 */
      var Date:String = _
      /** 下个日期 */
      var nextDate:String = _
      /** 起点时间 */
      var startTime:String = _
      /** 终点时间 */
      var endTime:String = _
      /** 点坐标 */
      var Point:String = _
      /** 起点经度 */
      var startLongitude:String = _
      /** 起点纬度 */
      var startLatitude:String = _
      /** 终点经度 */
      var endLongitude:String = _
      /** 终点纬度 */
      var endLatitude:String = _
      /** 起点坐标 */
      var startPoint:String = _
      /** 终点坐标 */
      var endPoint:String = _
      /** 每天一线 */
      var line:String = _
      /** 跨越天数 */
      var acrossDays:String = _
      /** 船舶经度 */
      var longitude:String = _
      /** 船舶纬度 */
      var latitude:String = _
      /** 当月第一天 */
      var startDate:String = _
      /** 当月最后一天 */
      var endDate:String = _
      /** 当月总天数 */
      var allDays:Int = -1
      /** 每天的记录数 */
      var record:Long = -1
      /** 当月的记录数 */
      var total:Long = -1
      /** 每个csv查询的临时表 */
      var routeTable:DataFrame = _
      /** 当月日期集合Array */
      var dateList:Array[String] = _
      /** 当月日期集合RDD */
      var dateListRDD:RDD[String] = _
      /** RoutePointDataSet */
      //  var routePointDataSet:Dataset[RoutePoint] = null
      /** 当天的起点和终点时间和坐标 */
      var routePointMap:util.TreeMap[String,String] = _
      //  /** 当天的时间Map date,startTime+endTime */
      //  var routeDateMap:util.TreeMap[String,String] = null
      /** routeDataset */
      var routeDataset:Dataset[RouteRDD] = _
      /** listRouteRDD */
      //  var listRouteRDD = new util.ArrayList[RDD[RouteRDD]]
      /** 拼接的线 */
      var lineStringBuffer:StringBuffer = new StringBuffer()
      /** 当日的经纬度集合 */
      var loglat:Dataset[Row] = _
    
      // geomeda-hbase操作对象
      var sparkGIS:SparkOpenGIS = _
      // routeLine线操作对象
      var routeLine: RouteLine = _
      // 获取 HBase 数据源
      var dataStore: DataStore = _
      // 创建 HBase table
      var sft: SimpleFeatureType = _
      // 根据schema建hbase表
      //  dataStore.createSchema(sft)
      // RouteLine对象 转换为 Feature
      var feature: SimpleFeature = _
      // 写入数据到 dataStore
      var flag = false
      // 判断文件行数
      var count = Long.MaxValue
      // 文件列表
      var files:Array[File] = null
      // 当前路径
      var filePath:String = null
      // 当前遍历的文件名称 mmsi.csv
      var fileName:String = null
      // fileRDD
      var fileRDD:RDD[String] = null
      // dateJavaList
      var dateJavaList:util.ArrayList[String] = null
    
      // 将 MMSI 写入文件的对象
      var fileWriter:FileWriter = null
      var bufferedWriter:BufferedWriter = null
    
      def main(args: Array[String]): Unit = {
    
        // 创建sparkSession对象
        val spark = SparkSession
          .builder()
          .appName("ReadRirWriteHBase")
          .master("local[*]")
          .config("spark.some.config.option", "some-value")
          .getOrCreate()
        val sc = spark.sparkContext
    
        // 遍历路径
    //    val dirstr = "E:\\HistoryData\\ArcticOceanData\\ArcticOceanData\\spark\\finish\\201707"
    //    val dirstr = "E:\\HistoryData\\ArcticOceanData\\ArcticOceanData\\finish\\201707"
        val dirstr = args(0)
    
        /*************************************************************************************************************************************/
        val pathdir = new File(dirstr)
        val data = pathdir.getName
        val pardir = pathdir.getParent
        println("pathdir",pathdir)
        println("data",data)
        println("pardir",pardir)
        val filelist =  pathdir.list
        val filesRDD = sc.parallelize(filelist).filter(x => x.length == 9+4)
        println("------------一共N条9位数据:"+filesRDD.count())
        /************************************************************************************************************************************/
    
    //    val dir = "file:///D:/Hadoop/ship/EE船舶数据(英文字段).csv"
        val dir = args(1)
    //    val file = new File(dir)
        val df = spark.read.option("header","true").option("inferSchema","true").csv(dir)
    //    df.printSchema()
        // 创建临时表
        df.createOrReplaceTempView("route")
        // 查询结果对象
        val MMSIDF = spark.sql("SELECT ship_mobile_nineyard FROM route")
        println(MMSIDF.count())
        val MMSIRDD = MMSIDF.rdd.map(_.mkString(",")).map(_+".csv")
        println("------------一共N条MMSI数据:"+MMSIRDD.count())
        val ISRDD = MMSIRDD.intersection(filesRDD).cache()
        val longAccumulator = sc.longAccumulator("mmsi-account")
        longAccumulator.add(1)
        /******************************************************保存mmsi交集*******************************************************************/
    //    val savafiledir = "D:\\Hadoop\\ship\\record"
        val savafiledir = args(2)
        // 有效mmsi
        ISRDD.coalesce(1).saveAsTextFile(savafiledir)
        //
        val ISRDDCount = ISRDD.count()
        // 有效数据
        sc.parallelize(List(ISRDDCount)).coalesce(1).saveAsTextFile(savafiledir+"/count")
        /************************************************************************************************************************************/
        println("------------一共N条有效的MMSI数据:"+ISRDDCount)
        /*************************************************************************************************************************************/
        println("partition:"+ISRDD.getNumPartitions)
    //    filesRDD.foreach(x => println(x))
        // broadcast share
        val fileBroadcast =  sc.broadcast(ISRDD.collect())
        println("-------------------------------开始执行-------------------------------------")
        // 遍历 fileArray
        fileBroadcast.value.foreach({
    //    filesRDD.foreach({
    //    files.foreach({
    //      filePath = null;   fileName = null;  fileRDD = null;
          filestr => {
            var file:File = null
            var fileName:String = null
            var dir:String = null
            var mmsi:String = null
            var currentFileRDD:RDD[String] = null;
            try {
              file = new File(filestr)
              fileName = file.getName
              dir = "file:///" + pathdir + "/" + fileName
              mmsi = fileName.substring(0, fileName.length - 4)
              currentFileRDD = sc.textFile(dir)
              // 计算文件行数
              val count = currentFileRDD.count()
              if (count > 1) {
                val verifyCSV = spark.read.csv(dir).rdd.first().toString()
                val f1 = verifyCSV.contains("Longitude")
                val f2 = verifyCSV.contains("Latitude")
    //            println("verifyCSV", f1, f2)
                if (f1 && f2) {
    
                  // 将 MMSI 写入文件操作
                  fileWriter = new FileWriter(savafiledir.substring(8,savafiledir.length) + "/MMSIFile", true)
                  bufferedWriter = new BufferedWriter(fileWriter)
                  bufferedWriter.write(mmsi+"\n")
                  bufferedWriter.close(); bufferedWriter = null; fileWriter = null
    
    //              longAccumulator.add(1) ISRDDCount
                  println("============================== 正在执行第 " + longAccumulator.sum +" 条船 (MMSI)   剩余 " + (ISRDDCount - longAccumulator.sum ) +" 条船 (MMSI) =================================")
    
                  /** ******************************************************分割线 ********************************************************/
    //              println("/************************************************遍历目录*****************************************************/")
                  println("date:" + data + "    mmsi:" + mmsi + "   fileName:" + fileName + "    file numbere of rows:" + count)
    
                  // csv转DataFrame
                  val df = spark.read.option("header", "true").option("inferSchema", "true").csv(dir)
                  // 单个文件的记录数
                  total = df.count()
                  println("mmsi:" + mmsi + ",一共有:" + total + " 条记录!")
    
                  /** ********************************************取每天的日期集合*************************************************** */
                  // 创建临时表 route
                  df.createOrReplaceTempView("route")
                  // 每天的日期(按时间排序,不重复)
                  val dicData = spark.sql("SELECT LEFT(Time,8) AS Date FROM route GROUP BY Date ORDER BY Date").na.drop()
                  // DataFrame转Array(当月日期集合)
                  dateList = dicData.collect().map(x => x.toString().substring(1, 9)).filter(_.substring(0, 6) == data)
                  dateList.foreach(x => print(x + " "))
                  // 用于去除月份
                  dateListRDD = sc.parallelize(dateList)
    
                  // 当月的天数               // 当月第一天             // 当月最后一天
                  allDays = dateList.length;
                  startDate = dateList(0);
                  endDate = dateList(allDays - 1)
                  println("\n当月首日日期:" + startDate + " 当月尾日日期:" + endDate + " 当月总共天数:" + allDays)
    
                  /** ********************************************生成每天起点终点的 Time 和 Ponit *************************************************** */
                  // routePointMap存储每天的起点和终点坐标
                  routePointMap = new util.TreeMap[String, String]
                  //  // routeDateMap存储每天的起点时间和终点时间
                  //  routeDateMap = new util.TreeMap[String,String]
                  dateList.foreach({ x =>
                    // 当天日期
                    Date = x
                    // 根据当天日期查询当天的时间和经纬度 ,并且只要 Message_ID == 1 2 3 18 19 27
                    routeTable = spark.sql("SELECT Time,TRIM(Message_ID) AS Message_ID,Time,TRIM(Longitude) AS Longitude,TRIM(Latitude) AS Latitude FROM route " +
                      "WHERE (Message_ID = '1' OR Message_ID = '2' OR Message_ID = '3' OR Message_ID = '18' OR Message_ID = '19' OR Message_ID = '27' ) AND Longitude IS NOT NULL AND Latitude IS NOT NULL AND LEFT(Time,8)=" + Date).na.drop()
    //                routeTable = spark.sql("SELECT Time,TRIM(Longitude) AS Longitude,TRIM(Latitude) AS Latitude FROM route " +
    //                  "WHERE Longitude IS NOT NULL AND Latitude IS NOT NULL AND LEFT(Time,8)=" + Date).na.drop()
                    // 验证 Message_ID 是否为 1 2 3 18 19 27
    //                val Message_ID = routeTable.select("Message_ID")
    //                Message_ID.foreach(x => println(x))
                    // 过滤月份
                    if ( !routeTable.select("Longitude").filter(_.toString().contains(".")).isEmpty && !routeTable.select("Latitude").filter(_.toString().trim.contains(".")).isEmpty) {
                      //            routeTable.show()
                      // 起点时间
                      startTime = routeTable.select("Time").filter(_.toString().length > 8).first().toString().substring(1, 16)
                      // 终点时间
                      endTime = routeTable.select("Time").filter(_.toString().length > 8).orderBy(df("Time").desc).first().toString().substring(1, 16)
                      // 起点经度
                      startLongitude = routeTable.select("Longitude").filter(_.toString().contains(".")).first().toString().substring(1).replace("]", "")
                      // 起点纬度
                      startLatitude = routeTable.select("Latitude").filter(_.toString().contains(".")).first().toString().substring(1).replace("]", "")
                      // 终点经度
                      endLongitude = routeTable.select("Longitude").filter(_.toString().contains(".")).orderBy(df("Time").desc).first().toString().substring(1).replace("]", "")
                      // 终点纬度
                      endLatitude = routeTable.select("Latitude").filter(_.toString().contains(".")).orderBy(df("Time").desc).first().toString().substring(1).replace("]", "")
                      // 起点坐标
                      startPoint = "POINT(" + startLongitude + " " + startLatitude + ")"
                      // 终点坐标
                      endPoint = "POINT(" + endLongitude + " " + endLatitude + ")"
                      // 每天的起点坐标和终点坐标
                      routePointMap.put(Date, startTime + "," + endTime + "," + startPoint + "," + endPoint)
                      //    // 每个日期的起点和终点 key:date value:startTime,endTime
                      //    routeDateMap.put(Date,startTime+","+endTime+","+startPoint+","+endPoint)
                      // 输出验证
    //                  println(Date, startTime, endTime, startPoint, endPoint)
                    } else {
                      println("过滤的日期:" + Date)
                    }
                  })
    
                  Date = null; startTime = null; endTime = null; startPoint = null; endPoint = null; startLongitude = null;
                  startLatitude = null; endLongitude = null; endLatitude = null; routeTable = null;
    
                  /** ****************************************第一天生成的 line ************************************************ */
                  val dateListSet = routePointMap.keySet()
                  val dateJavaList = new util.ArrayList(dateListSet)
    
    //              if (null != dateListSet && null != dateJavaList) {
    //                println("-----------------------------------true-----------------------------------")
    //              }
    
                  //  mmsi = mmsi
                  Date = dateJavaList.get(0)
                  // 第二天的日期
                  nextDate = dateJavaList.get(1)
                  // 起点时间和起点点(第一天的起点) routePointMap.put(Date,startTime+","+endTime+","+startPoint+","+endPoint)
                  startTime = routePointMap.get(Date).split(",")(0)
                  startPoint = routePointMap.get(Date).split(",")(2).substring(6, routePointMap.get(Date).split(",")(2).length - 1)
                  // 终点时间和终点点(第二天的起点)
                  endTime = routePointMap.get(nextDate).split(",")(0)
                  endPoint = routePointMap.get(nextDate).split(",")(2).substring(6, routePointMap.get(nextDate).split(",")(2).length - 1)
                  // 第一天的点跨越的时间为
    //              println("nextDate:" + nextDate.substring(6, 8).toInt + " " + "Date:" + Date.substring(6, 8).toInt)
                  acrossDays = (nextDate.substring(6, 8).toInt - Date.substring(6, 8).toInt + 1).toString
    //              println("第一天的日期、起点时间、起点坐标、终点时间、终点坐标、跨越时间:" + Date, startTime, startPoint, endTime, endPoint, acrossDays)
                  // "LINESTRING(10010 40040,10011 40041,10012 40042,10013 40043)"
                  lineStringBuffer.append("LINESTRING(")
                  // 根据当天日期查询当天的时间和经纬度 ,并且只要 Message_ID == 1 2 3 18 19 27
                  routeTable = spark.sql("SELECT Time,TRIM(Message_ID) AS Message_ID,Time,TRIM(Longitude) AS Longitude,TRIM(Latitude) AS Latitude FROM route " +
                    "WHERE (Message_ID = '1' OR Message_ID = '2' OR Message_ID = '3' OR Message_ID = '18' OR Message_ID = '19' OR Message_ID = '27' ) AND Longitude IS NOT NULL AND Latitude IS NOT NULL AND LEFT(Time,8)=" + Date).na.drop()
                  // 连线:需要遍历每天的经纬度
                  loglat = routeTable.select("Longitude", "Latitude").filter(_.toString().contains("."))
    //              loglat = routeTable.select("Longitude", "Latitude","Time").filter(x => x.toString().contains(".") && x.toString().contains("null") && x.toString().contains("|"))
                  // 转成 RDD 遍历 日期Data,跑拼接经纬度成 line
                  (loglat.rdd).foreach({ x =>
                    println("(loglat.rdd).foreach \t"+x)
                    lineStringBuffer.append(x.toString().replace(",", " ").substring(1, x.toString().length - 1) + ",")
                  })
                  // 第一天的终点(即第二天的第一个点)
    //              println("第二天的第一个点:" + endPoint)
                  lineStringBuffer.append(endPoint + ",")
                  // StringBuilder 转 String
                  line = lineStringBuffer.toString().substring(0, lineStringBuffer.length - 1) + ")"
                  println( mmsi + " " + Date + " line:" + line)
                  /** *******************************************  这里写入数据到hbase  ****************************************************** */
                  sparkGIS = new SparkOpenGIS
                  routeLine = new RouteLine
                  routeLine.mmsi = mmsi
                  routeLine.date = Date
                  routeLine.startTime = startTime
                  routeLine.endTime = endTime
                  routeLine.startPoint = startPoint
                  routeLine.endPoint = endPoint
                  routeLine.line = line
                  routeLine.acrossDays = acrossDays
                  routeLine.id = routeLine.mmsi + "-" + routeLine.startTime
                  // 获取 HBase 数据源
                  dataStore = sparkGIS.getDataStore
                  // 创建 HBase table
                  sft = sparkGIS.getSimpleFeatureTypesLine
                  // 根据schema建hbase表
                  dataStore.createSchema(sft)
                  // RouteLine对象 转换为 Feature
                  feature = sparkGIS.convertToFeatureLine(routeLine: RouteLine)
                  // 写入数据到 dataStore
                  flag = sparkGIS.writeFeatureSingle(dataStore, sft, feature)
                  // 输出验证 longAccumulator.add(1)第一天添加数据成功即可认为成功添加了一条船
                  if (flag) println("write to hbase table successfully!") else println("write to hbase table fialled!")
                  // 清空对象和数据
                  sparkGIS = null; routeLine = null; dataStore = null; sft = null; feature = null; flag = false;
    
                  /** *******************************************  这里写入数据到hbase  ****************************************************** */
    
                  Date = null; nextDate = null; startTime = null; endTime = null; startPoint = null; endPoint = null; acrossDays = null;
                  routeTable = null; loglat = null; lineStringBuffer.delete(0, lineStringBuffer.length()); line = null;
                  println("------------------------------------------第一天处理完成---------------------------------------------------")
    
                  // 第一天添加成功则可以计数了
                  longAccumulator.add(1)
    
                  // 当月的天数               // 当月第一天             // 当月最后一天
                  allDays = dateJavaList.size();
                  startDate = dateJavaList.get(0);
                  endDate = dateJavaList.get(allDays - 1)
                  println("\n过滤后,当月首日日期:" + startDate + " 当月尾日日期:" + endDate + " 当月总共天数:" + allDays)
    
                  /** ****************************************  生成每天的 line ************************************************ */
                  // 第一天和最后一天单独处理,不需要遍历
                  for (i <- 1 to allDays - 2) {
                    // 上一天日期
                    lastDate = dateJavaList.get(i - 1)
                    //  当天日期
                    Date = dateJavaList.get(i)
                    // 下一天日期
                    nextDate = dateJavaList.get(i + 1)
    //                println("上一天、当天、下一天:" + lastDate, Date, nextDate)
                    // 起点时间和起点点(上一天的终点) routePointMap.put(Date,startTime+","+endTime+","+startPoint+","+endPoint)
                    startTime = routePointMap.get(Date).split(",")(0)
                    startPoint = routePointMap.get(Date).split(",")(2).substring(6, routePointMap.get(Date).split(",")(2).length - 1)
                    println(" Date", Date, " startTime", startTime + " nextDate", nextDate, " startPoint", startPoint)
                    // 终点时间和终点点(下一天的起点)
                    endTime = routePointMap.get(nextDate).split(",")(0)
                    endPoint = routePointMap.get(nextDate).split(",")(2).substring(6, routePointMap.get(nextDate).split(",")(2).length - 1)
                    // 当天点跨越的时间
                    print("lastDate:" + lastDate.substring(6, 8).toInt + " " + "nextDate:" + nextDate.substring(6, 8).toInt +" || ")
                    acrossDays = (nextDate.substring(6, 8).toInt - lastDate.substring(6, 8).toInt + 1).toString
    //                println("当天的日期、起点时间、起点坐标、终点时间、终点坐标、跨越时间:" + Date, startTime, startPoint, endTime, endPoint, acrossDays)
                    // "LINESTRING(10010 40040,10011 40041,10012 40042,10013 40043)"
                    lineStringBuffer.append("LINESTRING(")
                    // 第一天的点跨越的时间为
                    //              println("Date:"+Date.substring(6,8).toInt+" "+"lastDate:"+lastDate.substring(6,8).toInt)
                    //              acrossDays = (Date.substring(6,8).toInt - lastDate.substring(6,8).toInt + 1).toString
    
                    // 当天的起点(上一天的终点)
    //                println(Date + " 的起点:" + startPoint)
                    //              lineStringBuffer.append(startPoint+",")
    
                    // 根据当天日期查询当天的时间和经纬度 ,并且只要 Message_ID == 1 2 3 18 19 27
                    routeTable = spark.sql("SELECT Time,TRIM(Message_ID) AS Message_ID,Time,TRIM(Longitude) AS Longitude,TRIM(Latitude) AS Latitude FROM route " +
                      "WHERE (Message_ID = '1' OR Message_ID = '2' OR Message_ID = '3' OR Message_ID = '18' OR Message_ID = '19' OR Message_ID = '27' ) AND Longitude IS NOT NULL AND Latitude IS NOT NULL AND LEFT(Time,8)=" + Date).na.drop()
                    // 连线:需要遍历每天的经纬度
                    loglat = routeTable.select("Longitude", "Latitude").filter(_.toString().contains("."))
    //                loglat = routeTable.select("Longitude", "Latitude","Time").filter(x => x.toString().contains(".") && x.toString().contains("null") && x.toString().contains("|"))
                    // 转成 RDD 遍历 日期Data,跑拼接经纬度成 line
                    (loglat.rdd).foreach({ x =>
                      println("(loglat.rdd).foreach \t"+x)
                      lineStringBuffer.append(x.toString().replace(",", " ").substring(1, x.toString().length - 1) + ",")
                    })
    
                    // 当天的终点(下一天的起点)
    //                println(Date + " 的终点:" + endPoint)
                    lineStringBuffer.append(endPoint + ",")
                    // StringBuilder 转 String
                    line = lineStringBuffer.toString().substring(0, lineStringBuffer.length - 1) + ")"
                    println( mmsi + " " + Date + " line:" + line)
    
                    /** *******************************************  这里写入数据到hbase  ****************************************************** */
                    sparkGIS = new SparkOpenGIS
                    routeLine = new RouteLine
                    routeLine.mmsi = mmsi
                    routeLine.date = Date
                    routeLine.startTime = startTime
                    routeLine.endTime = endTime
                    routeLine.startPoint = startPoint
                    routeLine.endPoint = endPoint
                    routeLine.line = line
                    routeLine.acrossDays = acrossDays
                    routeLine.id = routeLine.mmsi + "-" + routeLine.startTime
                    // 获取 HBase 数据源
                    dataStore = sparkGIS.getDataStore
                    // 创建 HBase table
                    sft = sparkGIS.getSimpleFeatureTypesLine
                    // 根据schema建hbase表
                    dataStore.createSchema(sft)
                    // RouteLine对象 转换为 Feature
                    feature = sparkGIS.convertToFeatureLine(routeLine: RouteLine)
                    // 写入数据到 dataStore
                    flag = sparkGIS.writeFeatureSingle(dataStore, sft, feature)
                    // 输出验证
                    if (flag) println("write to hbase table successfully!") else println("write to hbase table fialled!")
                    // 清空对象和数据
                    sparkGIS = null; routeLine = null; dataStore = null; sft = null; feature = null; flag = false;
    
                    /** *******************************************  这里写入数据到hbase  ****************************************************** */
    
                    lastDate = null; Date = null; nextDate = null; startTime = null; endTime = null; startPoint = null; endPoint = null;
                    acrossDays = null; routeTable = null; loglat = null; lineStringBuffer.delete(0, lineStringBuffer.length()); line = null;
                  }
                  println("------------------------------------------多个当天处理完成---------------------------------------------------")
    
                  /** ****************************************最后一天生成的 line ************************************************ */
                  //  mmsi = mmsi
                  // 倒数第二天的日期(最后一天的终点坐标不用算)
                  lastDate = dateJavaList.get(dateJavaList.size() - 2)
                  //          lastDate = dateList(dateList.length-2)
                  // 最后一天的日期
                  Date = dateJavaList.get(dateJavaList.size() - 1)
                  // 最后一天的起点(即倒数第二天的终点) routePointMap.put(Date,startTime+","+endTime+","+startPoint+","+endPoint)
                  startTime = routePointMap.get(Date).split(",")(0)
                  startPoint = routePointMap.get(Date).split(",")(2).substring(6, routePointMap.get(Date).split(",")(2).length - 1)
                  //          Date = dateList(dateList.length-1)
                  // 第一天的点跨越的时间
                  println("lastDate:" + lastDate.substring(6, 8).toInt + " " + "Date:" + Date.substring(6, 8).toInt)
                  acrossDays = (Date.substring(6, 8).toInt - lastDate.substring(6, 8).toInt + 1).toString
                  println("最后一天的起点时间、起点坐标、跨越时间:" + startTime, startPoint, acrossDays)
                  // "LINESTRING(10010 40040,10011 40041,10012 40042,10013 40043)"
                  lineStringBuffer.append("LINESTRING(")
                  // 根据当天日期查询当天的时间和经纬度 ,并且只要 Message_ID == 1 2 3 18 19 27
                  routeTable = spark.sql("SELECT Time,TRIM(Message_ID) AS Message_ID,Time,TRIM(Longitude) AS Longitude,TRIM(Latitude) AS Latitude FROM route " +
                    "WHERE (Message_ID = '1' OR Message_ID = '2' OR Message_ID = '3' OR Message_ID = '18' OR Message_ID = '19' OR Message_ID = '27' ) AND Longitude IS NOT NULL AND Latitude IS NOT NULL AND LEFT(Time,8)=" + Date).na.drop()
    //              println("最后一天的起点:" + startPoint)
                  //            lineStringBuffer.append(startPoint+",")
                  // 连线:需要遍历每天的经纬度
                  loglat = routeTable.select("Longitude", "Latitude").filter(_.toString().contains("."))
    //              loglat = routeTable.select("Longitude", "Latitude","Time").filter(x => x.toString().contains(".") && x.toString().contains("null") && x.toString().contains("|"))
                  // 转成 RDD 遍历 日期Data,跑拼接经纬度成 line
                  ((loglat.rdd)).foreach({ x =>
                    lineStringBuffer.append(x.toString().replace(",", " ").substring(1, x.toString().length - 1) + ",")
                  })
                  // StringBuilder 转 String
                  line = lineStringBuffer.toString().substring(0, lineStringBuffer.length - 1) + ")"
                  println( mmsi + " " + Date + " line:" + line)
                  println("------------------------------------------最后一天处理完成---------------------------------------------------")
    
                  /** *******************************************  这里写入数据到hbase  ****************************************************** */
                  sparkGIS = new SparkOpenGIS
                  routeLine = new RouteLine
                  routeLine.mmsi = mmsi
                  routeLine.date = Date
                  routeLine.startTime = startTime
                  routeLine.endTime = endTime
                  routeLine.startPoint = startPoint
                  routeLine.endPoint = endPoint
                  routeLine.line = line
                  routeLine.acrossDays = acrossDays
                  routeLine.id = routeLine.mmsi + "-" + routeLine.startTime
                  // 获取 HBase 数据源
                  dataStore = sparkGIS.getDataStore
                  // 创建 HBase table
                  sft = sparkGIS.getSimpleFeatureTypesLine
                  // 根据schema建hbase表
                  dataStore.createSchema(sft)
                  // RouteLine对象 转换为 Feature
                  feature = sparkGIS.convertToFeatureLine(routeLine: RouteLine)
                  // 写入数据到 dataStore
                  flag = sparkGIS.writeFeatureSingle(dataStore, sft, feature)
                  // 输出验证
                  if (flag) println("write to hbase table successfully!") else println("write to hbase table fialled!")
                  // 清空对象和数据
                  sparkGIS = null; routeLine = null; dataStore = null; sft = null; feature = null; flag = false;
    
                  /** *******************************************  这里写入数据到hbase  ****************************************************** */
    
                  Date = null; lastDate = null; startTime = null; startPoint = null; endTime = null; endPoint = null; acrossDays = null;
                  routeTable = null; loglat = null; lineStringBuffer.delete(0, lineStringBuffer.length()); line = null;
    
                  /** ************************************************连线操作结束****************************************************** */
    
    //              println("每天的起点和终点的时间和坐标:")
    //              (routePointMap).forEach({
    //                new BiConsumer[String, String] {
    //                  override def accept(t: String, u: String): Unit = {
    //                    println(t + "\t" + u)
    //                  }
    //                }
    //              })
                  routePointMap.clear(); dateList = null; dateJavaList.clear();
                }
                /** ******************************************************分割线 ********************************************************/
              }
            }catch {
              case e: NullPointerException =>
                e.printStackTrace()
              case e:IndexOutOfBoundsException =>
                e.printStackTrace()
              case e:ArrayIndexOutOfBoundsException =>
                e.printStackTrace()
              case e:IllegalArgumentException =>
                e.printStackTrace()
              case e:Exception =>
                e.printStackTrace()
            } finally {
              System.gc()
            }
          }
        })
        println("-------------------------------执行结束-------------------------------------")
    //    val partitioner = new HashPartitioner(6)
        sc.stop()
      }
    }
    
    //class CustomPartitioner(val num:Int) extends Partitioner{
    //  override def numPartitions: Int = num
    //  override def getPartition(key:Any): Int = {
    //    (key.hashCode() % num).toInt
    //  }
    //}