当前位置 博文首页 > 堅持╅信念★:Hadoop源码:namenode格式化和启动过程实现

    堅持╅信念★:Hadoop源码:namenode格式化和启动过程实现

    作者:堅持╅信念★ 时间:2021-01-18 14:04

    基于源码源码分析hadoop namenode格式化和启动过程实现 (According to the source code analysis hadoop namenode formatting and startup process implementation.)

    基于源码源码分析hadoop namenode格式化和启动过程实现 (According to the source code analysis hadoop namenode formatting and startup process implementation.)

    Namenode 管理hdfs元数据和RPC服务响应客户端,初次使用时需格式化;元数据存储在edits和fsimage文件,其中 fsimage 保存最新的元数据信息,edits 保存自最新的元数据信息后的变化;edits 存储的信息会在下一次启动 namenode 时与 fsimage 合并产生新的 fsimage 文件。我们先以1.0版本源码为例介绍,后期再补充新版本的扩展和改变,由于篇幅原因,部分代码被去除。

    1 namenode 格式化

    初次启动namenode时,hadoop要求必须格式化namenode;

    hadoop namenode -format
    

    格式化会删除namenode image和edits文件,再创建新的 image 和 edits;

    public static void format(File dir, Configuration conf)
      throws IOException {
        File image = new File(dir, "image");
        File edits = new File(dir, "edits");
    
        if (!((!image.exists() || FileUtil.fullyDelete(image, conf)) &&
              (!edits.exists() || edits.delete()) &&image.mkdirs())) {
    
          throw new IOException("Unable to format: "+dir);
        }
    }
    

    所以已经生产使用的集群,谨慎使用。

    2 namenode初始化和启动

    Configuration conf = new Configuration();
    NameNode namenode = new NameNode(conf);
    namenode.join();
    

    namenode初始化和启动过程中四个重要的操作:

    1、根据(fs.default.name(旧版本参数)/fs.defaultFS(新版本参数))配置的主机名和端口号创建套接字地址,这个地址就是 namenode 文件系统元数据地址; 2、加载 namenode 元数据;首先将现有的 image 数据以层级方式加载到内存列表,加载完成后存放到活动列表中;

    TreeSet activeBlocks = new TreeSet();
    DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
    try {
        int numFiles = in.readInt();
        for (int i = 0; i < numFiles; i++) {
            UTF8 name = new UTF8();
            name.readFields(in);
            int numBlocks = in.readInt();
            if (numBlocks == 0) {
                unprotectedAddFile(name, null);
            } else {
                Block blocks[] = new Block[numBlocks];
                for (int j = 0; j < numBlocks; j++) {
                    blocks[j] = new Block();
                    blocks[j].readFields(in);
                }
                unprotectedAddFile(name, blocks);
            }
        }
    } finally {
        in.close();
    }
    

    对于存储了大量数据的集群,元数据加载到内存会比较耗时,总的元数据数 / 加载完成后存放到活动列表 的数量 * 100% 就是我们在前端界面看到的namenode启动进度。image 加载完成后,继续加载最新的 edits ,并将 edits 合并到 image 产生新的 image 文件,并创建一个新的 edits 文件记录系统运行时的变化。

    public FSDirectory(File dir) throws IOException {
        File fullimage = new File(dir, "image");
        if (! fullimage.exists()) {
          throw new IOException("NameNode not formatted: " + dir);
        }
        File edits = new File(dir, "edits");
        if (loadFSImage(fullimage, edits)) {
            saveFSImage(fullimage, edits);
        }
    
        synchronized (this) {
            this.ready = true;
            this.notifyAll();
            this.editlog = new DataOutputStream(new FileOutputStream(edits));
        }
    }
    

    3、创建datanode节点和namenode心跳监控、创建租凭管理监控(可以理解为锁管理,比如释放锁);

    this.hbthread = new Daemon(new HeartbeatMonitor());
    this.lmthread = new Daemon(new LeaseMonitor());
    hbthread.start();
    lmthread.start();
    

    namenode通过间隔的检查与datanode心跳来判断datanode的存活;租凭管理监控通俗的理解就是当某个操作(比如:写数据)超出namenode规定的最大操作时间时,这个租凭管理监控进程就会认为该操作过期了,将它释放清除。

    while ((sortedLeases.size() > 0) &&((top = (Lease) sortedLeases.first()) != null)) {
        if (top.expired()) {
            top.releaseLocks();
            leases.remove(top.holder);
            LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
            if (!sortedLeases.remove(top)) {
                LOG.info("Unknown failure trying to remove " + top + " from lease set.");
            }
        } else {
            break;
        }
    }
    

    4、为namenode创建RPC服务器(线程池),这个线程池的作用是用来处理客户端的远程过程调用及集群守护进程的调用

    public NameNode(Configuration conf) throws IOException {
        this(getDir(conf),DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getPort(), conf);
    }
    
    public NameNode(File dir, int port, Configuration conf) throws IOException {
        this.namesystem = new FSNamesystem(dir, conf);
        this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
        this.server = RPC.getServer(this, port, handlerCount, false, conf);
        this.server.start();
    }
    

    通过dfs.namenode.handler.count 参数设置线程池大小,默认是10;

    public synchronized void start() throws IOException {
      Listener listener = new Listener();
      listener.start();
    
      for (int i = 0; i < handlerCount; i++) {
        Handler handler = new Handler(i);
        handler.start();
      }
    }
    

    在生产环境中,该参数的设置需要多方面考虑和商榷,对于大集群,来自不同DataNode的并发心跳以及客户端并发的元数据操作可能巨大,如果线程池太小,总是忙碌状态,客户端连接NameNode的时候总是超时或者连接被拒绝,这意味着需要更大的池来处理;线程池太大,会引发调用延迟和增加主机负载等问题;

    this.server = RPC.getServer(this, port, handlerCount, false, conf);
    this.server.start();
    

    到此RPC服务器(线程池)构建并启动,整个namenode启动完成,RPC服务器(线程池)进入监听响应状态;

    210116 222524 Server listener on port 9000: starting
    210116 222524 Server handler 0 on 9000: starting
    210116 222524 Server handler 1 on 9000: starting
    210116 222524 Server handler 2 on 9000: starting
    210116 222524 Server handler 3 on 9000: starting
    210116 222524 Server handler 4 on 9000: starting
    210116 222524 Server handler 5 on 9000: starting
    210116 222524 Server handler 6 on 9000: starting
    210116 222524 Server handler 7 on 9000: starting
    210116 222524 Server handler 8 on 9000: starting
    210116 222524 Server handler 9 on 9000: starting
    

    3 总结

    Namenode 格式化就是删除已经存在的edits和fsimage文件,创建新的;Namenode启动时创建套接字地址、加载 namenode 元数据、创建监控进程(datanode节点和namenode心跳监控、租凭管理监控)、创建RPC服务器(线程池)。

    参考文献

    • https://hadoop.apache.org/docs/r1.0.4 - hadoop documenation