当前位置 博文首页 > 等你归去来:基于文件的表合并及行转列实现参考

    等你归去来:基于文件的表合并及行转列实现参考

    作者:等你归去来 时间:2021-06-29 19:04

      用例:有N个文件,每个文件只有一列主键,每个文件代表一种属性。即当如PRI1主键在A文件中,说明PRI1具有A属性。这种场景,一般用于数据的筛选,比如需要既有属性A又有属性B的主键有哪些?就是这类场景。

      如何处理该场景?

     

    1. 解题思路

      如果抛却如题所说文件限制,那我们如何解决?

      比如,我们可以将每个文件数据导入到redis中,数据结构为hash, redis-key为pri主键,hash-key为属性X, hash-value为1或不存在。在做判定的时候,只需找到对应的key, 再去判断其是否具有对应属性即可解决问题了。

      这个方案看起来比较合适,但有两个缺点:1. redis内存数据库,容量有限,不一定能满足大数据量的场景; 2. 针对反向查询的需求无法满足,即想要查找既含有A属性又含有B属性的主键列表,就很难办到。

      再比如,我们可以使用类似于mysql之类的关系型数据,先将单文件数据导致单表中,表名以相应属性标识命名,然后以sql形式进行临时计算即可。sql参考如下:

     select COALESCE(ta.id,tb.id) as id, 
         case when ta.id is not null then 1 else 0 end as ta_flag, 
         case when tb.id is not null then 1 else 0 end as tb_flag
       from table_a as ta 
        full join table_b as tb on ta.id=tb.id;

      应该说这种解决方案算是比较好的了,在计算不大的情况下,这种复杂度在数据库领域简直是小场面了。需要再次说明的是,在数据库会新建一个个的小表,它只有一列主键数据,然后在查询的时候再进行计算。这种方案的问题在于,当标识越来越多之后,就会导致小表会越来越多,甚至可能超出数据库限制。原本是一个一般的需求,却要要求非常好数据库支持,也不太好嘛。

      不过,上面这个问题,也可以解决。比如我们可以使用行转列的形式,将以上小表转换成一张大表,随后将小表删除,从而达到数据库的普通要求。合并语句也不复杂。参考如下:

    create table w_xx as 
     select COALESCE(ta.id,tb.id) as id, 
         case when ta.id is not null then 1 else 0 end as ta_flag, 
         case when tb.id is not null then 1 else 0 end as tb_flag
       from table_a as ta 
        full join table_b as tb on ta.id=tb.id;

      如此,基本完美了。

     

    2. 基于文件的行转列数据join

      如果我没有外部存储介质,那当如何?如题,直接基于文件,将多个合并起来。看起来并非难事。

      如果不考虑内存问题,则可以将每个文件读入为list, 转换为map存储,和上面的redis实现方案类似。只是可能不太现实,也比较简单,忽略实现。

      再简单化,如果我们每个文件中保存的主键都是有序的,要想合并就更简单了。
      基本思路是,两两文件合并,依次读取行,然后比对是否有相等的值,然后写到新文件中即可。

      另外,如果要做并行计算,可以考虑使用上一篇文章提到的 fork/join 框架,非常合场景呢。

     

    2.1. 文件行转列合并主体框架

      主要算法为依次遍历各文件,进行数据判定,然后写目标文件。具体实现如下:

    /**
     * 功能描述: 文件合并工具类
     *
     */
    @Slf4j
    public class FileJoiner {
    
        /**
         * router结果文件分隔符
         */
        private static final String CSV_RESULT_FILE_SEPARATOR = ",";
    
        /**
         * 合并文件语义,等价sql:
         * select coalesce(a.id, b.id, c.id...) id,
         *      case when a.id is not null then '1' else '' end f_a,
         *      case when b.id is not null then '1' else '' end f_b,
         *      ...
         *  from a
         *      full join b on a.id = b.id
         *      full join c on a.id = c.id
         *      ...
         *   ;
         */
        public JoinFileDescriptor joinById(JoinFileDescriptor a,
                                           JoinFileDescriptor b) throws IOException {
            JoinFileDescriptor mergedDesc = new JoinFileDescriptor();
            if(a.getLineCnt() <= 0 && b.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先a后b
                fieldDesc.addAll(a.getFieldInfo());
                fieldDesc.addAll(b.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            if(a.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先b后a
                fieldDesc.addAll(b.getFieldInfo());
                fieldDesc.addAll(a.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            if(b.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先a后b
                fieldDesc.addAll(a.getFieldInfo());
                fieldDesc.addAll(b.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            // 正式合并 a b 表
            String mergedPath = a.getPath() + ".m" + a.getDeep();
            long cnt = -1;
            try(BufferedReader aReader = new BufferedReader(new FileReader(a.getPath()))) {
                try(BufferedReader bReader = new BufferedReader(new FileReader(b.getPath()))) {
                    a.setReader(aReader);
                    b.setReader(bReader);
                    try(OutputStream outputStream = FileUtils.openOutputStream(new File(mergedPath))) {
                        cnt = unionTwoBufferStream(a, b, outputStream);
                    }
                }
            }
            mergedDesc.setPath(mergedPath);
            mergedDesc.setLineCnt(cnt);
            mergedDesc.incrDeep();
            // 先a后b
            List<FileFieldDesc> fieldDesc = new ArrayList<>();
            a.getFieldInfo().forEach(FileFieldDesc::writeOk);
            b.getFieldInfo().forEach(FileFieldDesc::writeOk);
            fieldDesc.addAll(a.getFieldInfo());
            fieldDesc.addAll(b.getFieldInfo());
            mergedDesc.setFieldInfo(fieldDesc);
            return mergedDesc;
        }
    
        /**
         * 合并多文件,无序的,但各字段位置可定位
         *
         * @param fileList 待合并的文件列表
         * @param orderedFieldList 需要按序排列
         * @return 合并后文件信息及字段列表
         * @throws Exception 合并出错抛出
         */
        public JoinFileDescriptor joinMultiFile(List<JoinFileDescriptor> fileList,
                                                List<String> orderedFieldList) throws Exception {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            FileJoinFJTask fjTask = new FileJoinFJTask(fileList);
            ForkJoinTask<JoinFileDescriptor> future = forkJoinPool.submit(fjTask);
            JoinFileDescriptor mergedFile = future.get();
    //        List<String> orderedFieldList = new ArrayList<>();
    //        for (JoinFileDescriptor file1 : fileList) {
    //            List<String> field1 = file1.getFieldInfo().stream()
    //                                        .map(FileFieldDesc::getFieldName)
    //                                        .collect(Collectors.toList());
    //            orderedFieldList.addAll(field1);
    //        }
            return rewriteFileBySelectField(mergedFile, orderedFieldList);
        }
    
        /**
         * 按照要求字段顺序重写文件内容
         *
         * @param originFile 当前文件描述
         * @param orderedFields 目标字段序列
         * @return 处理好的文件实例(元数据或获取)
         * @throws IOException 写文件异常抛出
         */
        public JoinFileDescriptor rewriteFileBySelectField(JoinFileDescriptor originFile,
                                                           List<String> orderedFields) throws IOException {
            List<FileFieldDesc> fieldDescList = originFile.getFieldInfo();
            if(checkIfCurrentFileInOrder(fieldDescList, orderedFields)) {
                log.info("当前文件已按要求排放好,无需再排: {}", orderedFields);
                return originFile;
            }
            Map<String, FieldOrderIndicator> indicatorMap = composeFieldOrderIndicator(fieldDescList, orderedFields);
            AtomicLong lineCounter = new AtomicLong(0);
            String targetFilePath = originFile.getPath() + ".of";
            try(BufferedReader aReader = new BufferedReader(new FileReader(originFile.getPath()))) {
                try(OutputStream outputStream = FileUtils.openOutputStream(new File(targetFilePath))) {
                    String lineData;
                    while ((lineData = aReader.readLine()) != null) {
                        String[] cols = StringUtils.splitPreserveAllTokens(
                                            lineData, CSV_RESULT_FILE_SEPARATOR);
                        // 空行
                        if(cols.length == 0) {
                            continue;
                        }
                        // id,1,...
                        StringBuilder sb = new StringBuilder(cols[0]);
                        for (String f1 : orderedFields) {
                            sb.append(CSV_RESULT_FILE_SEPARATOR);
                            FieldOrderIndicator fieldDescIndicator = indicatorMap.get(f1);
                            if(fieldDescIndicator == null
                                    || (fieldDescIndicator.fieldIndex >= cols.length
                                        && fieldDescIndicator.fieldDesc.getWriteFlag() == 1)) {
                                continue;
                            }
                            sb.append(cols[fieldDescIndicator.fieldIndex]);
                        }
                        writeLine(outputStream, sb.toString(), lineCounter);
                    }
                }
            }
            JoinFileDescriptor mergedDesc = new JoinFileDescriptor();
            mergedDesc.setPath(targetFilePath);
            mergedDesc.setLineCnt(lineCounter.get());
            mergedDesc.setFieldInfo(
                    orderedFields.stream()
                            .map(r -> FileFieldDesc.newField(r, 1))
                            .collect(Collectors.toList()));
            return mergedDesc;
        }
    
        /**
         * 构造字段下标指示器
         *
         * @param currentFieldDescList 当前字段排列情况
         * @param orderedFields 目标序列的字段列表
         * @return {"a":{"fieldIndex":1, "fieldDesc":{"name":"aaa", "writeFlag":1}}}
         */
        private Map<String, FieldOrderIndicator> composeFieldOrderIndicator(List<FileFieldDesc> currentFieldDescList,
                                                                            List<String> orderedFields) {
            Map<String, FieldOrderIndicator> indicatorMap = new HashMap<>(orderedFields.size());
            outer:
            for (String f1 : orderedFields) {
                for (int i = 0; i < currentFieldDescList.size(); i++) {
                    FileFieldDesc originField1 = currentFieldDescList.get(i);
                    if (f1.equals(originField1.getFieldName())) {
                        indicatorMap.put(f1, new FieldOrderIndicator(i + 1, originField1));
                        continue outer;
                    }
                }
                indicatorMap.put(f1, null);
            }
            return indicatorMap;
        }
    
        /**
         * 检测当前文件是按字段先后要求排放好
         *
         * @param currentFieldDescList 现有文件字段排列情况
         * @param orderedFields 期望排列的顺序列表
         * @return true:已排好序,无需再排; false:未按要求排好
         */
        private boolean checkIfCurrentFileInOrder(List<FileFieldDesc> currentFieldDescList,
                                                  List<String> orderedFields) {
            if(orderedFields.size() != currentFieldDescList.size()) {
                return true;
            }
            for (int j = 0; j < orderedFields.size(); j++) {
                String targetFieldName = orderedFields.get(j);
                FileFieldDesc possibleFieldDesc = currentFieldDescList.get(j);
                if(possibleFieldDesc != null
                        && targetFieldName.equals(possibleFieldDesc.getFieldName())
                        && possibleFieldDesc.getWriteFlag() == 1) {
                    continue;
                }
                return false;
            }
            return true;
        }
    
        /**
         * 计算两个数据流取并集 ( A ∪ B)
         *
         *   并将 A/B 标签位写到后置位置中, 1代表存在,空代表存在
         *      如A存在且B存在,则写结果为:  A,1,1
         *      如A存在但B不存在, 则写结果为: A,1,
         *      如A不存在但B存在, 则写结果为: B,,1
         *
         *    当A或B中存在多列时,以第一列为主键进行关联
         *       如A为: 111
         *         B为: 111,,1,1
         *       则合并后的结果为: 111,1,,1,1
         *
         * @return 最终写入的文件行数
         */
        private long unionTwoBufferStream(JoinFileDescriptor a,
                                          JoinFileDescriptor b,
                                          OutputStream targetOutputStream) throws IOException {
            String lineDataLeft;
            String lineDataRight;
    //        String lineDataLast = null;
            AtomicLong lineNumCounter = new AtomicLong(0);
            BufferedReader leftBuffer = a.getReader();
            BufferedReader rightBuffer = b.getReader();
            lineDataRight = rightBuffer.readLine();
            // 主键固定在第一列
            int idIndex = 1;
            String leftId = null;
            String rightId = getIdColumnValueFromLineData(lineDataRight, idIndex);
            String lastId = null;
            int cmpV;
            while ((lineDataLeft = leftBuffer.readLine()) != null) {
                // 以左表基础迭代,所以优先检查右表
                leftId = getIdColumnValueFromLineData(lineDataLeft, idIndex);
                if(lineDataRight != null
                        && (cmpV = leftId.compareTo(rightId)) >= 0) {
                    do {
                        if(rightId.equals(lastId)) {
                            lineDataRight = rightBuffer.readLine();
                            rightId = getIdColumnValueFromLineData(
                                    lineDataRight, idIndex);
                            // 合并左右数据
                            continue;
                        }
                        writeLine(targetOutputStream,
                                joinLineData(cmpV == 0 ? lineDataLeft : null,
                                        lineDataRight, a.getFieldInfo(),
                                        b.getFieldInfo()),
                                lineNumCounter);
                        lastId = rightId;
                        lineDataRight = rightBuffer.readLine();
                        rightId = getIdColumnValueFromLineData(
                                lineDataRight, idIndex);
                    } 
    
    下一篇:没有了