标签搜索

目 录CONTENT

文章目录

NameNode启动源码解析

陈铭
2023-01-25 / 0 评论 / 1 点赞 / 161 阅读 / 2,476 字 / 正在检测是否收录...

环境准备

创建一个Maven项目,pom文件依赖如下

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs-client</artifactId>
            <version>3.3.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

前置逻辑

一个NN的启动是由NameNode.java的main方法启动的,如下所示:

1. main(),NameNode启动主入口
  public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      // 核心逻辑是这个,创建一个NN
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.error("Failed to start namenode.", e);
      terminate(1, e);
    }
  }
  
2. createNameNode(argv, null),创建NN
  public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    ...
    switch (startOpt) {
    ...
    default:
      DefaultMetricsSystem.initialize("NameNode");
      // 创建的逻辑则是new一个对象
      return new NameNode(conf);
    }
  }
  
3. new NameNode(conf),看看怎么创建的
  public NameNode(Configuration conf) throws IOException {
    // 调用了另一个有参构造
    this(conf, NamenodeRole.NAMENODE);
  }

  protected NameNode(Configuration conf, NamenodeRole role)
      throws IOException {
    ...
    try {
      initializeGenericKeys(conf, nsId, namenodeId);
      // 创建的主要逻辑就在这个initialize方法中
      initialize(getConf());
      state.prepareToEnterState(haContext);
      try {
        haContext.writeLock();
        state.enterState(haContext);
      } finally {
        haContext.writeUnlock();
      }
    } catch (IOException e) {
      this.stopAtException(e);
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      this.stopAtException(e);
      throw e;
    }
    ...
  }

启动9870端口服务

前置逻辑步骤3的initialize方法开始看,看看HDFS的web端是怎么启动起来的

1. initialize(getConf()),初始化方法
  protected void initialize(Configuration conf) throws IOException {
    ...
    if (NamenodeRole.NAMENODE == role) {
      // 开启一个web服务器
      startHttpServer(conf);
    }
  }

2. startHttpServer(conf),进来看看
  private void startHttpServer(final Configuration conf) throws IOException {
    // 绑定默认的端口和ip
    // 默认端口和ip由getHttpServerBindAddress方法获取
    httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
    // 开启服务器
    httpServer.start();
    httpServer.setStartupProgress(startupProgress);
  }
  
3. getHttpServerBindAddress(conf),获取默认端口和ip
  protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
    InetSocketAddress bindAddress = getHttpServerAddress(conf);
    ...
    return bindAddress;
  }
  
  // 进入上述的getHttpServerAddress(conf)
  protected InetSocketAddress getHttpServerAddress(Configuration conf) {
    return getHttpAddress(conf);
  }
  
  // 进入上述的getHttpAddress(conf)
  public static InetSocketAddress getHttpAddress(Configuration conf) {
    // DFS_NAMENODE_HTTP_ADDRESS_KEY等于dfs.namenode.http-address
    // DFS_NAMENODE_HTTP_ADDRESS_DEFAULT等于0.0.0.0:9870
    // 这个其实就是hdfs-site.xml的一段配置,xml的配置可以覆盖此处的默认值
    return  NetUtils.createSocketAddr(
        conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
  }

4. httpServer.start(),开启服务器
  void start() throws IOException {
    ...
    // 这里面做了绑定servlet的动作
    setupServlets(httpServer);
    ...
  }

5. setupServlets(httpServer),绑定servlet,其实就是web页面点击时各种跳转逻辑
  private static void setupServlets(HttpServer2 httpServer) {
    httpServer.addInternalServlet("startupProgress",
        StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
    httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
        true);
    httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
        ImageServlet.class, true);
    httpServer.addInternalServlet(IsNameNodeActiveServlet.SERVLET_NAME,
        IsNameNodeActiveServlet.PATH_SPEC,
        IsNameNodeActiveServlet.class);
    httpServer.addInternalServlet(NetworkTopologyServlet.SERVLET_NAME,
        NetworkTopologyServlet.PATH_SPEC, NetworkTopologyServlet.class);
  }

加载镜像文件和编辑日志

还是回到前置逻辑步骤3的initialize方法,此时web端服务已经开启,现在要加载镜像文件和编辑日志

1. initialize(getConf()),初始化方法
  protected void initialize(Configuration conf) throws IOException {
    // 开启HDFS的web端服务
    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }
    // 加载镜像文件和编辑日志
    loadNamesystem(conf);
    ...
  }
  
2. loadNamesystem(conf),加载镜像文件和编辑日志
  protected void loadNamesystem(Configuration conf) throws IOException {
    this.namesystem = FSNamesystem.loadFromDisk(conf);
  }
  
  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
    ...
    // 这里就读取了本地的镜像文件和编辑日志
    // 即getNamespaceDirs和getNamespaceEditsDirs方法
    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
        FSNamesystem.getNamespaceEditsDirs(conf));
    ...
  }

初始化NN的RPC服务端

还是回到前置逻辑步骤3的initialize方法,此时文件也加载好了,现在要初始化NN的RPC服务端

1. initialize(getConf()),初始化方法
  protected void initialize(Configuration conf) throws IOException {
    // 开启HDFS的web端服务
    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }
    // 加载镜像文件和编辑日志
    loadNamesystem(conf);
    ...
    // 创建RPC服务端
    rpcServer = createRpcServer(conf);
    ...
  }
  
2. createRpcServer(conf),创建RPC服务端
  protected NameNodeRpcServer createRpcServer(Configuration conf)
      throws IOException {
    return new NameNodeRpcServer(conf, this);
  }
  // 这里创建了RPC服务端
  public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    ...
      serviceRpcServer = new RPC.Builder(conf)
          .setProtocol(
              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
          .setInstance(clientNNPbService)
          .setBindAddress(bindHost)
          .setPort(serviceRpcAddr.getPort())
          .setNumHandlers(serviceHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();
    ...
  }

NN启动资源检查

还是回到前置逻辑步骤3的initialize方法,此时RPC服务端也创建好了,现在要启动资源检查

1. initialize(getConf()),初始化方法
  protected void initialize(Configuration conf) throws IOException {
    // 开启HDFS的web端服务
    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }
    // 加载镜像文件和编辑日志
    loadNamesystem(conf);
    ...
    // 创建RPC服务端
    rpcServer = createRpcServer(conf);
    ...
    // 启动资源检查
    startCommonServices(conf);
    ...
  }
  
2. startCommonServices(conf),启动资源检查
  private void startCommonServices(Configuration conf) throws IOException {
    // 检查资源
    namesystem.startCommonServices(conf, haContext);
    registerNNSMXBean();
    if (NamenodeRole.NAMENODE != role) {
      startHttpServer(conf);
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
      if (levelDBAliasMapServer != null) {
        httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
      }
    }
    // 上述的RPC服务端在这里启动的,之前仅仅是创建
    rpcServer.start();
    ...
  }
  
3. startCommonServices(conf, haContext)
  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    this.registerMBean(); // register the MBean for the FSNamesystemState
    writeLock();
    this.haContext = haContext;
    try {
      // 创建检查器
      nnResourceChecker = new NameNodeResourceChecker(conf);
      // 真正检查的逻辑在这
      // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
      checkAvailableResources();
      ...
      // 开启安全模式
      prog.beginPhase(Phase.SAFEMODE);
      ...
      // NN对心跳超时判断
      blockManager.activate(conf, completeBlocksTotal);
      ...
    } finally {
      writeUnlock("startCommonServices");
    }
    ...
  }
  
4. new NameNodeResourceChecker(conf),看看检查器
  public NameNodeResourceChecker(Configuration conf) throws IOException {
    this.conf = conf;
    volumes = new HashMap<String, CheckedVolume>();
    // DFS_NAMENODE_DU_RESERVED_KEY等于dfs.namenode.resource.du.reserved
    // DFS_NAMENODE_DU_RESERVED_DEFAULT等于100MB
    // 这里定义了NN中元数据储存空间默认值
    duReserved = conf.getLongBytes(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
        DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
    ...
    // 对所有路径进行资源检查
    for (URI editsDirToCheck : localEditDirs) {
      addDirToCheck(editsDirToCheck,
          FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
              editsDirToCheck));
    }
    ...
  }
  
5. checkAvailableResources(),返回步骤3,看看检查的逻辑
  void checkAvailableResources() {
    long resourceCheckTime = monotonicNow();
    Preconditions.checkState(nnResourceChecker != null,
        "nnResourceChecker not initialized");
    // 判断资源是否足够,不够返回false
    hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
    resourceCheckTime = monotonicNow() - resourceCheckTime;
    NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
  }

NN对心跳超时判断

NN启动资源检查步骤3的blockManager.activate开始,对NN对心跳超时判断进行Debug

1. startCommonServices(conf, haContext)
  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    this.registerMBean(); // register the MBean for the FSNamesystemState
    writeLock();
    this.haContext = haContext;
    try {
      // 创建检查器
      nnResourceChecker = new NameNodeResourceChecker(conf);
      // 真正检查的逻辑在这
      // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
      checkAvailableResources();
      ...
      // 开启安全模式
      prog.beginPhase(Phase.SAFEMODE);
      ...
      // NN对心跳超时判断
      blockManager.activate(conf, completeBlocksTotal);
      ...
    } finally {
      writeUnlock("startCommonServices");
    }
    ...
  }
  
2. blockManager.activate(conf, completeBlocksTotal),进行心跳超时判断
  public void activate(Configuration conf, long blockTotal) {
    pendingReconstruction.start();
    datanodeManager.activate(conf);
    ...
  }
  
  // 进入上述的datanodeManager.activate(conf)
  void activate(final Configuration conf) {
    datanodeAdminManager.activate(conf);
    // 开始检查心跳
    heartbeatManager.activate();
  }
  
  // 进入上述的heartbeatManager.activate()
  void activate() {
    heartbeatThread.start();
  }
  
3. heartbeatThread.start(),start方法,这是个线程类,用多线程去检查心跳,因此这里看重写的run方法
    @Override
    public void run() {
      while(namesystem.isRunning()) {
        restartHeartbeatStopWatch();
        try {
          final long now = Time.monotonicNow();
          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
            // 检查心跳
            heartbeatCheck();
            ...
          }
        } catch (Exception e) {
          LOG.error("Exception while checking heartbeat", e);
        }
        ...
      }
    }

4. heartbeatCheck()
  @VisibleForTesting
  void heartbeatCheck() {
    ...
    // isDatanodeDeadd方法判断DataNode有没有挂掉
    if (dead == null && dm.isDatanodeDead(d)) {
      ...
    }
    ...
  }
  
  // 看看上述的isDatanodeDead(d)
  boolean isDatanodeDead(DatanodeDescriptor node) {
    // monotonicNow()现在系统时刻
    // node.getLastUpdateMonotonic()上次心跳的时刻
    return (node.getLastUpdateMonotonic() <
            (monotonicNow() - heartbeatExpireInterval));
  }

5. heartbeatExpireInterval,上次心跳检查的时间。这个值会被DatanodeManager设置,即DatanodeManager的setHeartbeatInterval方法
  private void setHeartbeatInterval(long intervalSeconds,
      int recheckInterval) {
    this.heartbeatIntervalSeconds = intervalSeconds;
    this.heartbeatRecheckInterval = recheckInterval;
    // recheckInterval是5min,1000*intervalSeconds是3s
    // heartbeatExpireInterval是10min30s,即DN在10min30s后仍无法联系上,则会被判断为死亡
    this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
        * intervalSeconds;
    this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
        blockInvalidateLimit);
  }

安全模式

安全模式是NN启动过程中但还未完全结束的一个阶段。从NN启动资源检查步骤3的blockManager.activate开始进行Debug

1. startCommonServices(conf, haContext)
  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    this.registerMBean(); // register the MBean for the FSNamesystemState
    writeLock();
    this.haContext = haContext;
    try {
      // 创建检查器
      nnResourceChecker = new NameNodeResourceChecker(conf);
      // 真正检查的逻辑在这
      // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
      checkAvailableResources();
      ...
      // 开始进入安全模式
      prog.beginPhase(Phase.SAFEMODE);
      // 获取所有可以正常使用的block
      long completeBlocksTotal = getCompleteBlocksTotal();
      prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
          completeBlocksTotal);
      // 启动块服务,NN对心跳超时判断
      blockManager.activate(conf, completeBlocksTotal);
      ...
    } finally {
      writeUnlock("startCommonServices");
    }
    ...
  }
  
2. getCompleteBlocksTotal(),获取所有可以正常使用的block
  public long getCompleteBlocksTotal() {
    // Calculate number of blocks under construction
    long numUCBlocks = 0;
    readLock();
    try {
      // 获取正在构建的block
      numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
      // 返回所有可用的块(即 所有的块 - 正在构建的块)
      return getBlocksTotal() - numUCBlocks;
    } finally {
      readUnlock("getCompleteBlocksTotal");
    }
  }
  
3. blockManager.activate(conf, completeBlocksTotal),安全模式和心跳检测都在这里进行的
  public void activate(Configuration conf, long blockTotal) {
    pendingReconstruction.start();
    // 心跳检查在这里开启
    datanodeManager.activate(conf);
    ...
    // 开启安全模式
    bmSafeMode.activate(blockTotal);
  }

4. bmSafeMode.activate(blockTotal),开启安全模式
  void activate(long total) {
    assert namesystem.hasWriteLock();
    assert status == BMSafeModeStatus.OFF;

    startTime = monotonicNow();
    // 设置块数量并判断是否满足块个数的阈值
    setBlockTotal(total);
    // 判断DataNode节点和块信息是否达到退出安全模式标准
    if (areThresholdsMet()) {
      boolean exitResult = leaveSafeMode(false);
      Preconditions.checkState(exitResult, "Failed to leave safe mode.");
    } else {
      // enter safe mode
      status = BMSafeModeStatus.PENDING_THRESHOLD;
      initializeReplQueuesIfNecessary();
      reportStatus("STATE* Safe mode ON.", true);
      lastStatusReport = monotonicNow();
    }
  }
  
5. setBlockTotal(total),设置块数量并判断是否满足块个数的阈值
  void setBlockTotal(long total) {
    assert namesystem.hasWriteLock();
    synchronized (this) {
      this.blockTotal = total;
      // 计算阈值:例如:1000个正常的块 * 0.999 = 999
     // this.threshold=0.999f
      this.blockThreshold = (long) (total * threshold);
    }
    this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
  }
6. areThresholdsMet(),判断DataNode节点和块信息是否达到退出安全模式标准
  private boolean areThresholdsMet() {
    assert namesystem.hasWriteLock();
    // Calculating the number of live datanodes is time-consuming
    // in large clusters. Skip it when datanodeThreshold is zero.
    // We need to evaluate getNumLiveDataNodes only when
    // (blockSafe >= blockThreshold) is true and hence moving evaluation
    // of datanodeNum conditional to isBlockThresholdMet as well
    synchronized (this) {
      // 正常启动的块数 >= 块的最小阈值
      boolean isBlockThresholdMet = (blockSafe >= blockThreshold);
      boolean isDatanodeThresholdMet = true;
      if (isBlockThresholdMet && datanodeThreshold > 0) {
        int datanodeNum = blockManager.getDatanodeManager().
                getNumLiveDataNodes();
        // 正常启动的DN数量 >= DN的最小阈值
        isDatanodeThresholdMet = (datanodeNum >= datanodeThreshold);
      }
      return isBlockThresholdMet && isDatanodeThresholdMet;
    }
  }
1

评论区