标签搜索

目 录CONTENT

文章目录

DataNode启动源码解析

陈铭
2023-01-25 / 0 评论 / 0 点赞 / 320 阅读 / 2,443 字 / 正在检测是否收录...

工作机制

image

  1. DataNode启动后会向NameNdoe注册自己
  2. 注册成功后,DN会每隔6小时上报当前节点的所有块信息
  3. 并且每隔3s进行心跳
  4. 如果超过10min30s没有心跳,NN会把这个DN当成死亡节点,去除掉

环境准备

创建一个Maven项目,pom文件依赖如下

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs-client</artifactId>
            <version>3.3.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

前置源码

一个DN的启动是由DataNode.java的main方法启动的,如下所示:

1. main,DataNode启动主入口
  public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }
    // 核心逻辑在这
    secureMain(args, null);
  }
  
2. secureMain(args, null),进来看看
  public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      ...
      // 创建DataNode
      DataNode datanode = createDataNode(args, null, resources);
      ...
    } catch (Throwable e) {
      LOG.error("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }

3. createDataNode(args, null, resources),创建DN
  @VisibleForTesting
  @InterfaceAudience.Private
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 创建DN实例
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动DN进程
      dn.runDatanodeDaemon();
    }
    return dn;
  }
  
4. instantiateDataNode(args, conf, resources),创建DN实例
  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    ...
    return makeInstance(dataLocations, conf, resources);
  }
  
  // 看看上述的makeInstance(dataLocations, conf, resources)
  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    ...
    return new DataNode(conf, locations, storageLocationChecker, resources);
  }
  
  // 看看上述的有参构造
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final StorageLocationChecker storageLocationChecker,
           final SecureResources resources) throws IOException {
    ...
    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is {}", hostName);
      // 构造DN实例时候,会开启它
      startDataNode(dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
    ...
  }
  
5. startDataNode(dataDirs, resources),开启DN
  void startDataNode(List<StorageLocation> dataDirectories,
                     SecureResources resources
                     ) throws IOException {
    ...
    // 创建数据存储对象
    storage = new DataStorage();
    
    // global DN settings
    registerMXBean();
    // 初始化DataXceiver
    // DataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务
    initDataXceiver();
    // 启动HttpServer
    startInfoServer();
    ...
    // 初始化RPC服务
    initIpcServer();
    ...
    // 创建BlockPoolManager
    blockPoolManager = new BlockPoolManager(this);
    // 向NN注册,并发送DN的心跳
    blockPoolManager.refreshNamenodes(getConf());
    ...
  }

初始化HTTP服务

那就从前置源码的步骤5看看HTTP服务怎么初始化的

1. startInfoServer(),初始化HTTP服务
  private void startInfoServer()
    throws IOException {
    // SecureDataNodeStarter will bind the privileged port to the channel if
    // the DN is started by JSVC, pass it along.
    ServerSocketChannel httpServerChannel = secureResources != null ?
        secureResources.getHttpServerChannel() : null;
    // 创建Http服务器
    httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
    // 开启Http服务器
    httpServer.start();
    if (httpServer.getHttpAddress() != null) {
      infoPort = httpServer.getHttpAddress().getPort();
    }
    if (httpServer.getHttpsAddress() != null) {
      infoSecurePort = httpServer.getHttpsAddress().getPort();
    }
  }
  
2. 
  public DatanodeHttpServer(final Configuration conf,
        final DataNode datanode,
        final ServerSocketChannel externalHttpChannel)
        throws IOException {
    ...
    // 构造出Http服务器实例
    // 绑定了ip和端口
    HttpServer2.Builder builder = new HttpServer2.Builder()
        .setName("datanode")
        .setConf(confForInfoServer)
        .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
        .hostName(getHostnameForSpnegoPrincipal(confForInfoServer))
        .addEndpoint(URI.create("http://localhost:" + proxyPort))
        .setFindPort(true);
    ...
  }

初始化DN的RPC服务端

那就从前置源码的步骤5看看RPC服务怎么初始化的

1. initIpcServer(),初始化DN的RPC服务端
  private void initIpcServer() throws IOException {
    ...
    // 构造RPC服务端
    // 一样是在绑定ip和端口
    ipcServer = new RPC.Builder(getConf())
        .setProtocol(ClientDatanodeProtocolPB.class)
        .setInstance(service)
        .setBindAddress(ipcAddr.getHostName())
        .setPort(ipcAddr.getPort())
        .setNumHandlers(
            getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
                DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
        .setSecretManager(blockPoolTokenSecretManager).build();
    ...
  }

DN向NN注册

那就从前置源码的步骤5看看DN怎么向NN注册的

1. blockPoolManager.refreshNamenodes(getConf()),DN向NN注册
  void refreshNamenodes(Configuration conf)
      throws IOException {
    ...
    // 开始注册
    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
    }
  }
  
2. doRefreshNamenodes(newAddressMap, newLifelineAddressMap),进来看看
  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap,
      Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
      throws IOException {
    ...
    synchronized (this) {
      if (!toAdd.isEmpty()) {
        ...
        for (String nsToAdd : toAdd) {
          // 创建注册DN所需的服务
          BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
              lifelineAddrs);
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 开启注册DN所需的服务
      startAll();
    }
    ...
  }

3. createBPOS(nsToAdd, nnIds, addrs, lifelineAddrs),创建注册DN所需的服务
  protected BPOfferService createBPOS(
      final String nameserviceId,
      List<String> nnIds,
      List<InetSocketAddress> nnAddrs,
      List<InetSocketAddress> lifelineNnAddrs) {
    // 本质上是new了一个BPOfferService实例
    return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
        dn);
  }
  
4. startAll(),开启注册DN所需的服务
  synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              // 遍历上述的offerServices,开启它们
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }
  
  // 看看上述的bpos.start()
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
  
  // 继续进入,看看actor.start()
  void start() {
    if ((bpThread != null) && (bpThread.isAlive())) {
      //Thread is started already
      return;
    }
    bpThread = new Thread(this);
    bpThread.setDaemon(true); // needed for JUnit testing

    if (lifelineSender != null) {
      lifelineSender.start();
    }
    // 这里是表示用一个线程类去启动服务的
    bpThread.start();
  }
  
5. bpThread.start(),寻找这个线程类的run方法
  @Override
  public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // 向NN 注册
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            // 注册失败,5s后重试
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }

      runningState = RunningState.RUNNING;
      if (initialRegistrationComplete != null) {
        initialRegistrationComplete.countDown();
      }

      while (shouldRun()) {
        try {
          // 发送心跳
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }
  
6. connectToNNAndHandshake(),注册NN
  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    // 注册获取NN的RPC客户端对象
    bpNamenode = dn.connectToNN(nnAddr);
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    bpos.verifyAndSetNamespaceInfo(this, nsInfo);
    this.bpThread.setName(formatThreadName("heartbeating", nnAddr));
    // 注册
    register(nsInfo);
  }
  
7. dn.connectToNN(nnAddr),获取注册NN的RPC客户端对象
  DatanodeProtocolClientSideTranslatorPB connectToNN(
      InetSocketAddress nnAddr) throws IOException {
    return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
  }
  
  // 看看上述return时的有参构造
  public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
      Configuration conf) throws IOException {
    RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
        ProtobufRpcEngine2.class);
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    // 这里用反射去创建注册NN的RPC服务端
    rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
  }
  
  private static DatanodeProtocolPB createNamenode(
      InetSocketAddress nameNodeAddr, Configuration conf,
      UserGroupInformation ugi) throws IOException {
    // 反射创建
    return RPC.getProxy(DatanodeProtocolPB.class,
        RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
        conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
  }
  
8. register(nsInfo),注册NN
  void register(NamespaceInfo nsInfo) throws IOException {
    // 创建注册信息
    DatanodeRegistration newBpRegistration = bpos.createRegistration();

    LOG.info(this + " beginning handshake with NN");

    while (shouldRun()) {
      try {
        // 把注册信息发送给NN(本质上是DN调用接口方法,让NN去执行注册逻辑,类似Dubbo)
        newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
        newBpRegistration.setNamespaceInfo(nsInfo);
        bpRegistration = newBpRegistration;
        break;
      } catch(EOFException e) {  // namenode might have just restarted
        LOG.info("Problem connecting to server: " + nnAddr + " :"
            + e.getLocalizedMessage());
      } catch(SocketTimeoutException e) {  // namenode is busy
        LOG.info("Problem connecting to server: " + nnAddr);
      } catch(RemoteException e) {
        LOG.warn("RemoteException in register", e);
        throw e;
      } catch(IOException e) {
        LOG.warn("Problem connecting to server: " + nnAddr);
      }
      // Try again in a second
      sleepAndLogInterrupts(1000, "connecting to server");
    }
    ...
  }
  
9. bpNamenode.registerDatanode(newBpRegistration),发送注册信息
  @Override
  public DatanodeRegistration registerDatanode(DatanodeRegistration registration
      ) throws IOException {
    RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
        .newBuilder().setRegistration(PBHelper.convert(registration));
    RegisterDatanodeResponseProto resp;
    try {
      // 注册DN
      resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
    } catch (ServiceException se) {
      throw ProtobufHelper.getRemoteException(se);
    }
    return PBHelper.convert(resp.getRegistration());
  }
  
  // 看看上述的rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build())
  // 这是个接口方法,咱们直接去找它的实现,在FSNamesystem.java
  void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
    writeLock();
    try {
      // 真正的注册逻辑在这
      blockManager.registerDatanode(nodeReg);
    } finally {
      writeUnlock("registerDatanode");
    }
  }
  
10. blockManager.registerDatanode(nodeReg),进来看看
  public void registerDatanode(DatanodeRegistration nodeReg)
      throws IOException {
    assert namesystem.hasWriteLock();
    datanodeManager.registerDatanode(nodeReg);
    bmSafeMode.checkSafeMode();
  }
  
  // 看看上述的datanodeManager.registerDatanode(nodeReg
  public void registerDatanode(DatanodeRegistration nodeReg)
      throws DisallowedDatanodeException, UnresolvedTopologyException {
      ...
        // 注册DN
        addDatanode(nodeDescr);
        blockManager.getBlockReportLeaseManager().register(nodeDescr);
        // also treat the registration message as a heartbeat
        // no need to update its timestamp
        // because its is done when the descriptor is created
        // 将DN添加到心跳管理
        heartbeatManager.addDatanode(nodeDescr);
        heartbeatManager.updateDnStat(nodeDescr);
        incrementVersionCount(nodeReg.getSoftwareVersion());
        startAdminOperationIfNecessary(nodeDescr);
        success = true;
      ...
  }

向NN发送心跳

向NN发送心跳从DN向NN注册步骤5的开始的

1. offerService(),发送心跳
  private void offerService() throws Exception {
    while (shouldRun()) {
      try {
        ...
        if (sendHeartbeat) {
          ...
          if (!dn.areHeartbeatsDisabledForTests()) {
            // 发送心跳
            resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
                LOG.warn(nnAddr + " sent back a full block report lease " +
                        "ID of 0x" +
                        Long.toHexString(resp.getFullBlockReportLeaseId()) +
                        ", but we already have a lease ID of 0x" +
                        Long.toHexString(fullBlockReportLeaseId) + ". " +
                        "Overwriting old lease ID.");
              }
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
                getRpcMetricSuffix());
          }
          ...
        }
        ...
      }
      ...
    }
    ...
  }
  
2. sendHeartBeat(requestBlockReportLease)
  HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    ...
    // 通过NN的RPC客户端发送给NN
    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getActiveTransferThreadCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        slowPeers,
        slowDisks);
    ...
  }
  
3. bpNamenode.sendHeartbeat(...),它其实会调用NameNodeRpcServer.java的sendHeartbeat方法
  public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
      int xmitsInProgress, int xceiverCount,
      int failedVolumes, VolumeFailureSummary volumeFailureSummary,
      boolean requestFullBlockReportLease,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks)
          throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    // 处理DN发送的心跳
    return namesystem.handleHeartbeat(nodeReg, report,
        dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
        failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
        slowPeers, slowDisks);
  }
  
4.namesystem.handleHeartbeat(...),处理DN发送的心跳
  HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, long cacheCapacity, long cacheUsed,
      int xceiverCount, int xmitsInProgress, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      boolean requestFullBlockReportLease,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks)
          throws IOException {
    readLock();
    try {
      //get datanode commands
      final int maxTransfer = blockManager.getMaxReplicationStreams()
          - xmitsInProgress;
      // 处理DN发送过来的心跳
      DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
          nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
          slowPeers, slowDisks);
      long blockReportLeaseId = 0;
      if (requestFullBlockReportLease) {
        blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
      }

      //create ha status
      final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
          haContext.getState().getServiceState(),
          getFSImage().getCorrectLastAppliedOrWrittenTxId());
      // 响应DN的心跳
      return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
          blockReportLeaseId);
    } finally {
      readUnlock("handleHeartbeat");
    }
  }
  
5. blockManager.getDatanodeManager().handleHeartbeat(...),看看怎么处理DN发送过来的心跳
  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks) throws IOException {
    ...
    // 根据DN发送过来的心跳,更新信息
    heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
        cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
    ...
  }
  
6. updateHeartbeat(...),更新信息
  synchronized void updateHeartbeat(final DatanodeDescriptor node,
      StorageReport[] reports, long cacheCapacity, long cacheUsed,
      int xceiverCount, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) {
    stats.subtract(node);
    blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
        xceiverCount, failedVolumes, volumeFailureSummary);
    stats.add(node);
  }
  
  // 看看上述的blockManager.updateHeartbeat(...)
  void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
      long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) {

    for (StorageReport report: reports) {
      providedStorageMap.updateStorage(node, report.getStorage());
    }
    node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
        failedVolumes, volumeFailureSummary);
  }
  
  // 看看上述的node.updateHeartbeat(...)
  void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
      long cacheUsed, int xceiverCount, int volFailures,
      VolumeFailureSummary volumeFailureSummary) {
    updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
        volFailures, volumeFailureSummary);
    heartbeatedSinceRegistration = true;
  }
  
  // 看看上述的 updateHeartbeatState(...)
  void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
      long cacheUsed, int xceiverCount, int volFailures,
      VolumeFailureSummary volumeFailureSummary) {
    // 更新存储
    updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
        volFailures, volumeFailureSummary);
    // 更新心跳时间
    setLastUpdate(Time.now());
    setLastUpdateMonotonic(Time.monotonicNow());
    rollBlocksScheduled(getLastUpdateMonotonic());
  }
0

评论区