当前位置 博文首页 > Simon-Lau:MapReduce——客户端提交任务源码分析

    Simon-Lau:MapReduce——客户端提交任务源码分析

    作者:Simon-Lau 时间:2021-06-09 18:26

    计算向数据移动

    MR程序并不会在客户端执行任何的计算操作,它是为计算工作做好准备,例如计算出切片信息,直接影响到Map任务的并行度。

    在Driver中提交任务时,会写到这样的语句:

      boolean result = job.waitForCompletion(true);
    

    进入到waitForCompletion中:

    public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,
    ClassNotFoundException {
        if (state == JobState.DEFINE) {
           // 提交任务语句
          submit();
        }
                                     ..............
    

    继续跟进 submit():

     public void submit() throws IOException, InterruptedException, ClassNotFoundException {
         
        ensureState(JobState.DEFINE);
        setUseNewAPI();
        connect();
         
        final JobSubmitter submitter = 
            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
          public JobStatus run() throws IOException, InterruptedException, 
          ClassNotFoundException {
              // 执行提交任务
            return submitter.submitJobInternal(Job.this, cluster);
          }
        });
                        ..............
       }
    

    上面代码可以看出,客户端经过连接集群,获得任务提交器submitter后执行了submitJobInternal(Job.this, cluster)方法,进入看(其实我只想看切片方法)

     /**
       * Internal method for submitting jobs to the system.
       * The job submission process involves:
       *   1、Checking the input and output specifications of the job.
       *   2、Computing the InputSplits for the job.
       *   3、Setup the requisite accounting information for the 
       *      DistributedCache of the job, if necessary.
       *   4、Copying the job's jar and configuration to the map-reduce system
       *      directory on the distributed file-system. 
       *   5、Submitting the job to the JobTracker and optionally
       *   monitoring it's status.
       */ 
    ..............
    // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          int maps = writeSplits(job, submitJobDir);
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          LOG.info("number of splits:" + maps);
    ..............
    

    从这个方法头上的注释信息可以看到,在真正执行任务之前,客户端做了这么5件事,稍微翻译一下:

    • 检查作业的输入和输出规范;
    • 计算输入切片的数量;
    • 如有必要,为作业的DistributedCache 设置必要的记帐信息;
    • 将作业的 jar 和配置复制到分布式文件系统上的 map-reduce system 目录;
    • 将作业提交给 JobTracker 并可选择监控它的状态

    可以看到执行切片的方法时writeSplits(job, submitJobDir)

    private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps;
        if (jConf.getUseNewMapper()) {
          maps = writeNewSplits(job, jobSubmitDir);
        } else {
          maps = writeOldSplits(jConf, jobSubmitDir);
        }
        return maps;
      }
    

    也有新旧API的区分,看新的writeNewSplits(job, jobSubmitDir)

    private <T extends InputSplit>
      int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        ..................
            // 只看切片方法 
        List<InputSplit> splits = input.getSplits(job);
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); 
        ..............
            // 返回值是数组的长度,也就是切片的个数,也就是mapTask的并行度
        return array.length;
      }
    

    进入切片方法,方法太长了,删除部分,留下核心业务逻辑。这个得好好说说

      public List<InputSplit> getSplits(JobContext job) throws IOException {
          
        // 如果没有指定的话,minSize = 1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // 如果没有指定的话,maxSize = Long.Max
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        // FileStatus这个概念来自于HDFS,存储客户端提交文件的元数据
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          // 获取到文件的路径
          Path path = file.getPath();
          // 获取到文件的长度
          long length = file.getLen();
          if (length != 0) {
            // 数据块位置数组,用于存储该文件对应的数据块的位置
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {  // 没有指定,默认是可分片的
              long blockSize = file.getBlockSize();
                // 返回默认值:切片大小 = 块大小
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
               // 获取整个文件的长度,用于计算切片的偏移量
              long bytesRemaining = length;
               // SPLIT_SLOP 的大小是1.1
               // 这个判断表达式的含义是如果剩余的块体积大大于1.1倍的切片大小,继续切片
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                  // 在这计算了一步块索引
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                  
    //-----------getBlockIndex() begin--------------------------------------------
    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
          for (int i = 0 ; i < blkLocations.length; i++) {
          // is the offset inside this block?
          if ((blkLocations[i].getOffset() <= offset) &&
              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
              // 代码逻辑非常简单,就是返回当前offset是在哪个block里面
            return i;
          }
        }
                        ....................
    //-----------getBlockIndex() end----------------------------------------------
                            
                // 计算完成之后加入切片集合
                // 切片信息包括:路径,偏移量,切片大小,服务器节点【支撑计算向数据移动】
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              // 计算剩余数据块的切片信息
              if (bytesRemaining != 0) { 
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable :不能切片,那就是一片
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          }
              ......
        // 返回切片文件的集合。根据集合中数据的个数,就可以计算出有多少个maptask
        return splits;
      }
    
    bk
    下一篇:没有了