当前位置 博文首页 > qq262593421的博客:Java线程池对多个目录下的相同文件按照时间

    qq262593421的博客:Java线程池对多个目录下的相同文件按照时间

    作者:[db:作者] 时间:2021-08-30 10:25

    一、问题描述

    存在若干个文件夹,文件夹名称以年月为名(一个月份一个文件夹)
    例:201901,201902,202011,202012

    每个文件夹下有上w个txt文件,文件名均为9位数数字
    例:204125631.txt,315125620.txt,478125650.txt
    每个txt文本有进上千行数据,并且每个文件夹(年月为名)下的9位数文件名都相同(只有少部分不一样)

    二、问题需求

    现在需要将每个月的文件夹下具有相同文件名的txt文件按照时间排序进行合并(不要求源文件不变)

    三、代码实现

    RenameMMSI?

    package com.xtd.file.Thread;
    
    import java.io.File;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class RenameMMSI {
    
        // 需要修改文件名称的文件夹根目录
        private static final String basedir = "H:\\历史全量\\running";
        // base文件操作对象
        private static final File baseFile = new File(basedir);
        // 每个月份的目录
        private static final String[] monthList = baseFile.list();
        // 定长线程池
        private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
    
        public static void main(String[] args) {
            long time1 = System.currentTimeMillis();
            rename();
            long time2 = System.currentTimeMillis();
            System.out.println("time:"+(time2-time1));
        }
    
        public static void rename(){
            AtomicInteger total = new AtomicInteger();
            // 遍历每个月份的目录
            for(int i=0;i<monthList.length;i++){
                // 每个月份
                String[] listFile = new File(basedir+"\\"+monthList[i]).list();
                for(int j=0;j<listFile.length;j++){
                    final int finalJ = j;
                    int finalI = i;
                    fixedThreadPool.execute(() -> {
                        String currentFileName = listFile[finalJ];
                        File oldFile = new File(basedir+"\\"+monthList[finalI] + "\\" + currentFileName);
                        File newFile = new File(basedir+"\\"+monthList[finalI] + "\\" + currentFileName.substring(7,currentFileName.length()));
    //                    System.out.println(oldFile.getName());
    //                    System.out.println(newFile.getName());
                        oldFile.renameTo(newFile);
    //                    total.incrementAndGet();
                    });
                }
    //            System.out.println("---------------------------");
            }
            fixedThreadPool.shutdown();
    //        System.out.println("total:"+total);
        }
    }
    

    MoveMMSI?

    package com.xtd.file.Thread;
     
    import java.io.File;
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    /**
     * 1、遍历文件夹下所有的文件名称
     * 2、按照 MMSI 创建文件夹
     * 3、将 MMSI 文件 放到一个文件夹
     * 4、遍历 moveDir 下的文件夹名称,在 mergeDir 下创建 MMSI.txt 文件
     */
    public class MoveMMSI {
     
        // 一共 3、4万个 MMSI
        private static Set<String> set = new HashSet(46327);
        // 文件路径
    //    private static final String basedir = "E:\\HistoryData\\SHGL\\javafile";
    //    private static final String basedir = "E:\\HistoryData\\ArcticOceanData\\javafile1";
    
        // "D:\\Hadoop\\ship\\上海钢联\\测试数据1"
        private static final String basedir = "H:\\历史全量\\running";
        // base文件操作对象
        private static final File baseFile = new File(basedir);
        // 每个月份的目录
        private static final String[] monthList = baseFile.list();
        // 移动的文件目录
        private static final String moveDir = baseFile.getParent()+"\\move";
        // 合并的文件目录
        private static final String mergeDir = baseFile.getParent()+"\\merge";
        // 定长线程池
        private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
     
        public static void main(String[] args) {
     
            long time1 = System.currentTimeMillis();
            // 创建移动和合并的目录
            new File(mergeDir).mkdir();
            new File(moveDir).mkdir();
            dirSet(basedir);    // 遍历所有文件放到set集合中
    //        fixTheadPoolTest();
            foreachSet();      // 遍历 union MMSI,以 MMSI 为名创建目录
            long time2 = System.currentTimeMillis();
            formothList(monthList);
            System.out.println( time2 - time1);
    //        System.out.println(moveDir);
    //        moveFile("E:\\HistoryData\\ArcticOceanData\\movefile\\file1\\file002.txt","E:\\HistoryData\\ArcticOceanData\\movefile\\file2\\file003.txt");
        }
    
        // 遍历所有文件放到set集合中
        public static void dirSet(String dir){
            int total = 0;
            String[] listFile = null;
            // 遍历每个月份的目录
            for(int i=0;i<monthList.length;i++){
                listFile = new File(basedir+"\\"+monthList[i]).list();
                for(int j=0;j<listFile.length;j++){
                    set.add(listFile[j]);
                    ++total;
    //                System.out.println(listFile[j]);
                }
            }
            System.out.println(total);
        }
     
        /**
         * 多线程运行
         * 1、遍历 union MMSI,以 MMSI 为名创建目录
         * 2、不管有没有文件,将每个月份下的每个 union MMSI 文件 move 到 以 MMSI 为名的目录下
         */
        public static void foreachSet(){
            System.out.println("=============================================");
    //        Iterator<String> iterator = set.iterator();
            String mkdir = null;
            String sourcePath = null;
            String targePath = null;
            for(String next:set) {
    //        while(iterator.hasNext()){
    //            String next = iterator.next();
                mkdir = moveDir+"\\"+next.substring(0,next.length()-4);
    //            System.out.println(mkdir);
                new File(mkdir).mkdir();
                try {
                    new File(mergeDir+"\\"+next).createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    //            System.out.println("mergeDir\t"+mergeDir+"\\"+next);
                for (int i=0;i<monthList.length;i++){
                    String monthPath = monthList[i];
                    sourcePath = basedir+"\\"+monthPath+"\\"+next;
                    if(monthList[i].length() == 6){
                        targePath = mkdir+"\\"+monthPath+"_"+next;
                    }else {
                        targePath = mkdir+"\\"+monthPath.substring(0,6)+"_"+next;
                    }
    //                System.out.println("sourcePath\t" + sourcePath);
    //                System.out.println("targePath\t" + targePath);
                    new File(sourcePath).renameTo(new File(targePath));
                }
            }
            System.out.println(set.size());
        }
     
        public static void formothList(String[] monthList){
            for (String s : monthList) {
                System.out.println(s);
            }
        }
    }

    MergeMMSI?

    package com.xtd.file.Thread;
    
    import java.io.*;
    import java.util.Arrays;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 1、遍历一个moveDir下的所有文件
     */
    public class MergeMMSI {
    
    //    private static final String inName = "E:\\HistoryData\\SHGL\\java001.txt";
    //    private static final String outName = "E:\\HistoryData\\SHGL\\java002.txt";
        // 移动后文件的目录
    //    private static final String moveDir = "D:\\Hadoop\\ship\\SHGL\\move";
        private static final String moveDir = "H:\\历史全量\\move";
        // 合并文件的目录
    //    private static final String mergeDir = "D:\\Hadoop\\ship\\SHGL\\merge";
        private static final String mergeDir = "H:\\历史全量\\merge";
        // 需要遍历的 MMSI 目录
        private static final String[] listDir = new File(moveDir).list();
        // 定长线程池
        private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
    //    private static final ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
    
        public static void main(String[] args) {
    
    //       String content =  inputStram(inName);
    //       System.out.println(content);
    //       outputSteam(outName,content);
    //       appendWrite(inName,outName);
            long time1 = System.currentTimeMillis();
            forMoveDir();
            long time2 = System.currentTimeMillis();
            System.out.println(time2-time1);
        }
    
        /**
         * 1、遍历 MMSI 文件夹目录
         * 2、按照日期一次读取每个 MMSI 文件夹下的文件
         * 3、将读取的内容追加到merge文件中
         */
        public static void forMoveDir(){
            int total = 0;
            // 遍历每个文件夹
            for(String mmdir:listDir){
                // 每个线程处理一个 MMSI , 写入文件会按照顺序执行
                fixedThreadPool.execute(() -> {
                    String dir = moveDir+"\\"+mmdir;
                    String[] listfile = new File(dir).list();
                    Arrays.sort(listfile);
                    // 遍历每个文件
                    for(String file:listfile){
                        String sourceFile = moveDir+"\\"+file.substring(7,file.length()-4)+"\\"+file;
                        String tergeFile = mergeDir+"\\"+file.substring(7);
        //                System.out.println(sourceFile);
        //                System.out.println(tergeFile);
                        appendWrite(sourceFile,tergeFile);
                    }
                });
                ++total;
            }
            // 执行完毕,关闭线程池
            fixedThreadPool.shutdown();
            System.out.println(total);
        }
    
        public static void appendWrite(String inName,String outName){
            try {
                // 文件读取
                FileInputStream fileInputStream = new FileInputStream(inName);
                byte[] b = new byte[fileInputStream.available()];
                fileInputStream.read(b);
                fileInputStream.close();
                String content =  new String(b);
    //            System.out.println(content);
                // 文件写入
                FileOutputStream fileOutputStream = new FileOutputStream(outName,true);
                fileOutputStream.write(b);
    //            System.out.println("--------------------------------------------");
                fileOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void outputSteam(String inName,String content){
            try{
                FileOutputStream fileOutputStream = new FileOutputStream(inName,true);
                byte[] b = content.getBytes();
                fileOutputStream.write(b);
                System.out.println("--------------------------------------------");
                fileOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static String inputStram(String inName){
            try {
                FileInputStream fileInputStream = new FileInputStream(inName);
                byte[] b = new byte[fileInputStream.available()];
                fileInputStream.read(b);
                fileInputStream.close();
                return new String(b);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    
    }
    

    cs