当前位置 博文首页 > 等你归去来:基于文件的表合并及行转列实现参考
用例:有N个文件,每个文件只有一列主键,每个文件代表一种属性。即当如PRI1主键在A文件中,说明PRI1具有A属性。这种场景,一般用于数据的筛选,比如需要既有属性A又有属性B的主键有哪些?就是这类场景。
如何处理该场景?
如果抛却如题所说文件限制,那我们如何解决?
比如,我们可以将每个文件数据导入到redis中,数据结构为hash, redis-key为pri主键,hash-key为属性X, hash-value为1或不存在。在做判定的时候,只需找到对应的key, 再去判断其是否具有对应属性即可解决问题了。
这个方案看起来比较合适,但有两个缺点:1. redis内存数据库,容量有限,不一定能满足大数据量的场景; 2. 针对反向查询的需求无法满足,即想要查找既含有A属性又含有B属性的主键列表,就很难办到。
再比如,我们可以使用类似于mysql之类的关系型数据,先将单文件数据导致单表中,表名以相应属性标识命名,然后以sql形式进行临时计算即可。sql参考如下:
select COALESCE(ta.id,tb.id) as id, case when ta.id is not null then 1 else 0 end as ta_flag, case when tb.id is not null then 1 else 0 end as tb_flag from table_a as ta full join table_b as tb on ta.id=tb.id;
应该说这种解决方案算是比较好的了,在计算不大的情况下,这种复杂度在数据库领域简直是小场面了。需要再次说明的是,在数据库会新建一个个的小表,它只有一列主键数据,然后在查询的时候再进行计算。这种方案的问题在于,当标识越来越多之后,就会导致小表会越来越多,甚至可能超出数据库限制。原本是一个一般的需求,却要要求非常好数据库支持,也不太好嘛。
不过,上面这个问题,也可以解决。比如我们可以使用行转列的形式,将以上小表转换成一张大表,随后将小表删除,从而达到数据库的普通要求。合并语句也不复杂。参考如下:
create table w_xx as select COALESCE(ta.id,tb.id) as id, case when ta.id is not null then 1 else 0 end as ta_flag, case when tb.id is not null then 1 else 0 end as tb_flag from table_a as ta full join table_b as tb on ta.id=tb.id;
如此,基本完美了。
如果我没有外部存储介质,那当如何?如题,直接基于文件,将多个合并起来。看起来并非难事。
如果不考虑内存问题,则可以将每个文件读入为list, 转换为map存储,和上面的redis实现方案类似。只是可能不太现实,也比较简单,忽略实现。
再简单化,如果我们每个文件中保存的主键都是有序的,要想合并就更简单了。
基本思路是,两两文件合并,依次读取行,然后比对是否有相等的值,然后写到新文件中即可。
另外,如果要做并行计算,可以考虑使用上一篇文章提到的 fork/join 框架,非常合场景呢。
主要算法为依次遍历各文件,进行数据判定,然后写目标文件。具体实现如下:
/** * 功能描述: 文件合并工具类 * */ @Slf4j public class FileJoiner { /** * router结果文件分隔符 */ private static final String CSV_RESULT_FILE_SEPARATOR = ","; /** * 合并文件语义,等价sql: * select coalesce(a.id, b.id, c.id...) id, * case when a.id is not null then '1' else '' end f_a, * case when b.id is not null then '1' else '' end f_b, * ... * from a * full join b on a.id = b.id * full join c on a.id = c.id * ... * ; */ public JoinFileDescriptor joinById(JoinFileDescriptor a, JoinFileDescriptor b) throws IOException { JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); if(a.getLineCnt() <= 0 && b.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(a.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先b后a fieldDesc.addAll(b.getFieldInfo()); fieldDesc.addAll(a.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(b.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } // 正式合并 a b 表 String mergedPath = a.getPath() + ".m" + a.getDeep(); long cnt = -1; try(BufferedReader aReader = new BufferedReader(new FileReader(a.getPath()))) { try(BufferedReader bReader = new BufferedReader(new FileReader(b.getPath()))) { a.setReader(aReader); b.setReader(bReader); try(OutputStream outputStream = FileUtils.openOutputStream(new File(mergedPath))) { cnt = unionTwoBufferStream(a, b, outputStream); } } } mergedDesc.setPath(mergedPath); mergedDesc.setLineCnt(cnt); mergedDesc.incrDeep(); // 先a后b List<FileFieldDesc> fieldDesc = new ArrayList<>(); a.getFieldInfo().forEach(FileFieldDesc::writeOk); b.getFieldInfo().forEach(FileFieldDesc::writeOk); fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } /** * 合并多文件,无序的,但各字段位置可定位 * * @param fileList 待合并的文件列表 * @param orderedFieldList 需要按序排列 * @return 合并后文件信息及字段列表 * @throws Exception 合并出错抛出 */ public JoinFileDescriptor joinMultiFile(List<JoinFileDescriptor> fileList, List<String> orderedFieldList) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); FileJoinFJTask fjTask = new FileJoinFJTask(fileList); ForkJoinTask<JoinFileDescriptor> future = forkJoinPool.submit(fjTask); JoinFileDescriptor mergedFile = future.get(); // List<String> orderedFieldList = new ArrayList<>(); // for (JoinFileDescriptor file1 : fileList) { // List<String> field1 = file1.getFieldInfo().stream() // .map(FileFieldDesc::getFieldName) // .collect(Collectors.toList()); // orderedFieldList.addAll(field1); // } return rewriteFileBySelectField(mergedFile, orderedFieldList); } /** * 按照要求字段顺序重写文件内容 * * @param originFile 当前文件描述 * @param orderedFields 目标字段序列 * @return 处理好的文件实例(元数据或获取) * @throws IOException 写文件异常抛出 */ public JoinFileDescriptor rewriteFileBySelectField(JoinFileDescriptor originFile, List<String> orderedFields) throws IOException { List<FileFieldDesc> fieldDescList = originFile.getFieldInfo(); if(checkIfCurrentFileInOrder(fieldDescList, orderedFields)) { log.info("当前文件已按要求排放好,无需再排: {}", orderedFields); return originFile; } Map<String, FieldOrderIndicator> indicatorMap = composeFieldOrderIndicator(fieldDescList, orderedFields); AtomicLong lineCounter = new AtomicLong(0); String targetFilePath = originFile.getPath() + ".of"; try(BufferedReader aReader = new BufferedReader(new FileReader(originFile.getPath()))) { try(OutputStream outputStream = FileUtils.openOutputStream(new File(targetFilePath))) { String lineData; while ((lineData = aReader.readLine()) != null) { String[] cols = StringUtils.splitPreserveAllTokens( lineData, CSV_RESULT_FILE_SEPARATOR); // 空行 if(cols.length == 0) { continue; } // id,1,... StringBuilder sb = new StringBuilder(cols[0]); for (String f1 : orderedFields) { sb.append(CSV_RESULT_FILE_SEPARATOR); FieldOrderIndicator fieldDescIndicator = indicatorMap.get(f1); if(fieldDescIndicator == null || (fieldDescIndicator.fieldIndex >= cols.length && fieldDescIndicator.fieldDesc.getWriteFlag() == 1)) { continue; } sb.append(cols[fieldDescIndicator.fieldIndex]); } writeLine(outputStream, sb.toString(), lineCounter); } } } JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); mergedDesc.setPath(targetFilePath); mergedDesc.setLineCnt(lineCounter.get()); mergedDesc.setFieldInfo( orderedFields.stream() .map(r -> FileFieldDesc.newField(r, 1)) .collect(Collectors.toList())); return mergedDesc; } /** * 构造字段下标指示器 * * @param currentFieldDescList 当前字段排列情况 * @param orderedFields 目标序列的字段列表 * @return {"a":{"fieldIndex":1, "fieldDesc":{"name":"aaa", "writeFlag":1}}} */ private Map<String, FieldOrderIndicator> composeFieldOrderIndicator(List<FileFieldDesc> currentFieldDescList, List<String> orderedFields) { Map<String, FieldOrderIndicator> indicatorMap = new HashMap<>(orderedFields.size()); outer: for (String f1 : orderedFields) { for (int i = 0; i < currentFieldDescList.size(); i++) { FileFieldDesc originField1 = currentFieldDescList.get(i); if (f1.equals(originField1.getFieldName())) { indicatorMap.put(f1, new FieldOrderIndicator(i + 1, originField1)); continue outer; } } indicatorMap.put(f1, null); } return indicatorMap; } /** * 检测当前文件是按字段先后要求排放好 * * @param currentFieldDescList 现有文件字段排列情况 * @param orderedFields 期望排列的顺序列表 * @return true:已排好序,无需再排; false:未按要求排好 */ private boolean checkIfCurrentFileInOrder(List<FileFieldDesc> currentFieldDescList, List<String> orderedFields) { if(orderedFields.size() != currentFieldDescList.size()) { return true; } for (int j = 0; j < orderedFields.size(); j++) { String targetFieldName = orderedFields.get(j); FileFieldDesc possibleFieldDesc = currentFieldDescList.get(j); if(possibleFieldDesc != null && targetFieldName.equals(possibleFieldDesc.getFieldName()) && possibleFieldDesc.getWriteFlag() == 1) { continue; } return false; } return true; } /** * 计算两个数据流取并集 ( A ∪ B) * * 并将 A/B 标签位写到后置位置中, 1代表存在,空代表存在 * 如A存在且B存在,则写结果为: A,1,1 * 如A存在但B不存在, 则写结果为: A,1, * 如A不存在但B存在, 则写结果为: B,,1 * * 当A或B中存在多列时,以第一列为主键进行关联 * 如A为: 111 * B为: 111,,1,1 * 则合并后的结果为: 111,1,,1,1 * * @return 最终写入的文件行数 */ private long unionTwoBufferStream(JoinFileDescriptor a, JoinFileDescriptor b, OutputStream targetOutputStream) throws IOException { String lineDataLeft; String lineDataRight; // String lineDataLast = null; AtomicLong lineNumCounter = new AtomicLong(0); BufferedReader leftBuffer = a.getReader(); BufferedReader rightBuffer = b.getReader(); lineDataRight = rightBuffer.readLine(); // 主键固定在第一列 int idIndex = 1; String leftId = null; String rightId = getIdColumnValueFromLineData(lineDataRight, idIndex); String lastId = null; int cmpV; while ((lineDataLeft = leftBuffer.readLine()) != null) { // 以左表基础迭代,所以优先检查右表 leftId = getIdColumnValueFromLineData(lineDataLeft, idIndex); if(lineDataRight != null && (cmpV = leftId.compareTo(rightId)) >= 0) { do { if(rightId.equals(lastId)) { lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); // 合并左右数据 continue; } writeLine(targetOutputStream, joinLineData(cmpV == 0 ? lineDataLeft : null, lineDataRight, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = rightId; lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); }