当前位置 博文首页 > qq262593421的博客:Java线程池对多个目录下的相同文件按照时间
存在若干个文件夹,文件夹名称以年月为名(一个月份一个文件夹)
例:201901,201902,202011,202012每个文件夹下有上w个txt文件,文件名均为9位数数字
例:204125631.txt,315125620.txt,478125650.txt
每个txt文本有进上千行数据,并且每个文件夹(年月为名)下的9位数文件名都相同(只有少部分不一样)
现在需要将每个月的文件夹下具有相同文件名的txt文件按照时间排序进行合并(不要求源文件不变)
RenameMMSI?
package com.xtd.file.Thread;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class RenameMMSI {
// 需要修改文件名称的文件夹根目录
private static final String basedir = "H:\\历史全量\\running";
// base文件操作对象
private static final File baseFile = new File(basedir);
// 每个月份的目录
private static final String[] monthList = baseFile.list();
// 定长线程池
private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
public static void main(String[] args) {
long time1 = System.currentTimeMillis();
rename();
long time2 = System.currentTimeMillis();
System.out.println("time:"+(time2-time1));
}
public static void rename(){
AtomicInteger total = new AtomicInteger();
// 遍历每个月份的目录
for(int i=0;i<monthList.length;i++){
// 每个月份
String[] listFile = new File(basedir+"\\"+monthList[i]).list();
for(int j=0;j<listFile.length;j++){
final int finalJ = j;
int finalI = i;
fixedThreadPool.execute(() -> {
String currentFileName = listFile[finalJ];
File oldFile = new File(basedir+"\\"+monthList[finalI] + "\\" + currentFileName);
File newFile = new File(basedir+"\\"+monthList[finalI] + "\\" + currentFileName.substring(7,currentFileName.length()));
// System.out.println(oldFile.getName());
// System.out.println(newFile.getName());
oldFile.renameTo(newFile);
// total.incrementAndGet();
});
}
// System.out.println("---------------------------");
}
fixedThreadPool.shutdown();
// System.out.println("total:"+total);
}
}
MoveMMSI?
package com.xtd.file.Thread;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 1、遍历文件夹下所有的文件名称
* 2、按照 MMSI 创建文件夹
* 3、将 MMSI 文件 放到一个文件夹
* 4、遍历 moveDir 下的文件夹名称,在 mergeDir 下创建 MMSI.txt 文件
*/
public class MoveMMSI {
// 一共 3、4万个 MMSI
private static Set<String> set = new HashSet(46327);
// 文件路径
// private static final String basedir = "E:\\HistoryData\\SHGL\\javafile";
// private static final String basedir = "E:\\HistoryData\\ArcticOceanData\\javafile1";
// "D:\\Hadoop\\ship\\上海钢联\\测试数据1"
private static final String basedir = "H:\\历史全量\\running";
// base文件操作对象
private static final File baseFile = new File(basedir);
// 每个月份的目录
private static final String[] monthList = baseFile.list();
// 移动的文件目录
private static final String moveDir = baseFile.getParent()+"\\move";
// 合并的文件目录
private static final String mergeDir = baseFile.getParent()+"\\merge";
// 定长线程池
private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
public static void main(String[] args) {
long time1 = System.currentTimeMillis();
// 创建移动和合并的目录
new File(mergeDir).mkdir();
new File(moveDir).mkdir();
dirSet(basedir); // 遍历所有文件放到set集合中
// fixTheadPoolTest();
foreachSet(); // 遍历 union MMSI,以 MMSI 为名创建目录
long time2 = System.currentTimeMillis();
formothList(monthList);
System.out.println( time2 - time1);
// System.out.println(moveDir);
// moveFile("E:\\HistoryData\\ArcticOceanData\\movefile\\file1\\file002.txt","E:\\HistoryData\\ArcticOceanData\\movefile\\file2\\file003.txt");
}
// 遍历所有文件放到set集合中
public static void dirSet(String dir){
int total = 0;
String[] listFile = null;
// 遍历每个月份的目录
for(int i=0;i<monthList.length;i++){
listFile = new File(basedir+"\\"+monthList[i]).list();
for(int j=0;j<listFile.length;j++){
set.add(listFile[j]);
++total;
// System.out.println(listFile[j]);
}
}
System.out.println(total);
}
/**
* 多线程运行
* 1、遍历 union MMSI,以 MMSI 为名创建目录
* 2、不管有没有文件,将每个月份下的每个 union MMSI 文件 move 到 以 MMSI 为名的目录下
*/
public static void foreachSet(){
System.out.println("=============================================");
// Iterator<String> iterator = set.iterator();
String mkdir = null;
String sourcePath = null;
String targePath = null;
for(String next:set) {
// while(iterator.hasNext()){
// String next = iterator.next();
mkdir = moveDir+"\\"+next.substring(0,next.length()-4);
// System.out.println(mkdir);
new File(mkdir).mkdir();
try {
new File(mergeDir+"\\"+next).createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
// System.out.println("mergeDir\t"+mergeDir+"\\"+next);
for (int i=0;i<monthList.length;i++){
String monthPath = monthList[i];
sourcePath = basedir+"\\"+monthPath+"\\"+next;
if(monthList[i].length() == 6){
targePath = mkdir+"\\"+monthPath+"_"+next;
}else {
targePath = mkdir+"\\"+monthPath.substring(0,6)+"_"+next;
}
// System.out.println("sourcePath\t" + sourcePath);
// System.out.println("targePath\t" + targePath);
new File(sourcePath).renameTo(new File(targePath));
}
}
System.out.println(set.size());
}
public static void formothList(String[] monthList){
for (String s : monthList) {
System.out.println(s);
}
}
}
MergeMMSI?
package com.xtd.file.Thread;
import java.io.*;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 1、遍历一个moveDir下的所有文件
*/
public class MergeMMSI {
// private static final String inName = "E:\\HistoryData\\SHGL\\java001.txt";
// private static final String outName = "E:\\HistoryData\\SHGL\\java002.txt";
// 移动后文件的目录
// private static final String moveDir = "D:\\Hadoop\\ship\\SHGL\\move";
private static final String moveDir = "H:\\历史全量\\move";
// 合并文件的目录
// private static final String mergeDir = "D:\\Hadoop\\ship\\SHGL\\merge";
private static final String mergeDir = "H:\\历史全量\\merge";
// 需要遍历的 MMSI 目录
private static final String[] listDir = new File(moveDir).list();
// 定长线程池
private static final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(60);
// private static final ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
public static void main(String[] args) {
// String content = inputStram(inName);
// System.out.println(content);
// outputSteam(outName,content);
// appendWrite(inName,outName);
long time1 = System.currentTimeMillis();
forMoveDir();
long time2 = System.currentTimeMillis();
System.out.println(time2-time1);
}
/**
* 1、遍历 MMSI 文件夹目录
* 2、按照日期一次读取每个 MMSI 文件夹下的文件
* 3、将读取的内容追加到merge文件中
*/
public static void forMoveDir(){
int total = 0;
// 遍历每个文件夹
for(String mmdir:listDir){
// 每个线程处理一个 MMSI , 写入文件会按照顺序执行
fixedThreadPool.execute(() -> {
String dir = moveDir+"\\"+mmdir;
String[] listfile = new File(dir).list();
Arrays.sort(listfile);
// 遍历每个文件
for(String file:listfile){
String sourceFile = moveDir+"\\"+file.substring(7,file.length()-4)+"\\"+file;
String tergeFile = mergeDir+"\\"+file.substring(7);
// System.out.println(sourceFile);
// System.out.println(tergeFile);
appendWrite(sourceFile,tergeFile);
}
});
++total;
}
// 执行完毕,关闭线程池
fixedThreadPool.shutdown();
System.out.println(total);
}
public static void appendWrite(String inName,String outName){
try {
// 文件读取
FileInputStream fileInputStream = new FileInputStream(inName);
byte[] b = new byte[fileInputStream.available()];
fileInputStream.read(b);
fileInputStream.close();
String content = new String(b);
// System.out.println(content);
// 文件写入
FileOutputStream fileOutputStream = new FileOutputStream(outName,true);
fileOutputStream.write(b);
// System.out.println("--------------------------------------------");
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void outputSteam(String inName,String content){
try{
FileOutputStream fileOutputStream = new FileOutputStream(inName,true);
byte[] b = content.getBytes();
fileOutputStream.write(b);
System.out.println("--------------------------------------------");
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static String inputStram(String inName){
try {
FileInputStream fileInputStream = new FileInputStream(inName);
byte[] b = new byte[fileInputStream.available()];
fileInputStream.read(b);
fileInputStream.close();
return new String(b);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
cs