环境准备
创建一个Maven项目,pom文件依赖如下
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
前置逻辑
一个NN的启动是由NameNode.java的main方法启动的,如下所示:
1. main(),NameNode启动主入口
public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
// 核心逻辑是这个,创建一个NN
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
2. createNameNode(argv, null),创建NN
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
...
switch (startOpt) {
...
default:
DefaultMetricsSystem.initialize("NameNode");
// 创建的逻辑则是new一个对象
return new NameNode(conf);
}
}
3. new NameNode(conf),看看怎么创建的
public NameNode(Configuration conf) throws IOException {
// 调用了另一个有参构造
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
...
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 创建的主要逻辑就在这个initialize方法中
initialize(getConf());
state.prepareToEnterState(haContext);
try {
haContext.writeLock();
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stopAtException(e);
throw e;
}
...
}
启动9870端口服务
从前置逻辑步骤3的initialize方法开始看,看看HDFS的web端是怎么启动起来的
1. initialize(getConf()),初始化方法
protected void initialize(Configuration conf) throws IOException {
...
if (NamenodeRole.NAMENODE == role) {
// 开启一个web服务器
startHttpServer(conf);
}
}
2. startHttpServer(conf),进来看看
private void startHttpServer(final Configuration conf) throws IOException {
// 绑定默认的端口和ip
// 默认端口和ip由getHttpServerBindAddress方法获取
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
// 开启服务器
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
3. getHttpServerBindAddress(conf),获取默认端口和ip
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
InetSocketAddress bindAddress = getHttpServerAddress(conf);
...
return bindAddress;
}
// 进入上述的getHttpServerAddress(conf)
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
return getHttpAddress(conf);
}
// 进入上述的getHttpAddress(conf)
public static InetSocketAddress getHttpAddress(Configuration conf) {
// DFS_NAMENODE_HTTP_ADDRESS_KEY等于dfs.namenode.http-address
// DFS_NAMENODE_HTTP_ADDRESS_DEFAULT等于0.0.0.0:9870
// 这个其实就是hdfs-site.xml的一段配置,xml的配置可以覆盖此处的默认值
return NetUtils.createSocketAddr(
conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
4. httpServer.start(),开启服务器
void start() throws IOException {
...
// 这里面做了绑定servlet的动作
setupServlets(httpServer);
...
}
5. setupServlets(httpServer),绑定servlet,其实就是web页面点击时各种跳转逻辑
private static void setupServlets(HttpServer2 httpServer) {
httpServer.addInternalServlet("startupProgress",
StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet(IsNameNodeActiveServlet.SERVLET_NAME,
IsNameNodeActiveServlet.PATH_SPEC,
IsNameNodeActiveServlet.class);
httpServer.addInternalServlet(NetworkTopologyServlet.SERVLET_NAME,
NetworkTopologyServlet.PATH_SPEC, NetworkTopologyServlet.class);
}
加载镜像文件和编辑日志
还是回到前置逻辑步骤3的initialize方法,此时web端服务已经开启,现在要加载镜像文件和编辑日志
1. initialize(getConf()),初始化方法
protected void initialize(Configuration conf) throws IOException {
// 开启HDFS的web端服务
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 加载镜像文件和编辑日志
loadNamesystem(conf);
...
}
2. loadNamesystem(conf),加载镜像文件和编辑日志
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
...
// 这里就读取了本地的镜像文件和编辑日志
// 即getNamespaceDirs和getNamespaceEditsDirs方法
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
...
}
初始化NN的RPC服务端
还是回到前置逻辑步骤3的initialize方法,此时文件也加载好了,现在要初始化NN的RPC服务端
1. initialize(getConf()),初始化方法
protected void initialize(Configuration conf) throws IOException {
// 开启HDFS的web端服务
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 加载镜像文件和编辑日志
loadNamesystem(conf);
...
// 创建RPC服务端
rpcServer = createRpcServer(conf);
...
}
2. createRpcServer(conf),创建RPC服务端
protected NameNodeRpcServer createRpcServer(Configuration conf)
throws IOException {
return new NameNodeRpcServer(conf, this);
}
// 这里创建了RPC服务端
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
...
serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
...
}
NN启动资源检查
还是回到前置逻辑步骤3的initialize方法,此时RPC服务端也创建好了,现在要启动资源检查
1. initialize(getConf()),初始化方法
protected void initialize(Configuration conf) throws IOException {
// 开启HDFS的web端服务
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 加载镜像文件和编辑日志
loadNamesystem(conf);
...
// 创建RPC服务端
rpcServer = createRpcServer(conf);
...
// 启动资源检查
startCommonServices(conf);
...
}
2. startCommonServices(conf),启动资源检查
private void startCommonServices(Configuration conf) throws IOException {
// 检查资源
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 上述的RPC服务端在这里启动的,之前仅仅是创建
rpcServer.start();
...
}
3. startCommonServices(conf, haContext)
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 创建检查器
nnResourceChecker = new NameNodeResourceChecker(conf);
// 真正检查的逻辑在这
// 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
checkAvailableResources();
...
// 开启安全模式
prog.beginPhase(Phase.SAFEMODE);
...
// NN对心跳超时判断
blockManager.activate(conf, completeBlocksTotal);
...
} finally {
writeUnlock("startCommonServices");
}
...
}
4. new NameNodeResourceChecker(conf),看看检查器
public NameNodeResourceChecker(Configuration conf) throws IOException {
this.conf = conf;
volumes = new HashMap<String, CheckedVolume>();
// DFS_NAMENODE_DU_RESERVED_KEY等于dfs.namenode.resource.du.reserved
// DFS_NAMENODE_DU_RESERVED_DEFAULT等于100MB
// 这里定义了NN中元数据储存空间默认值
duReserved = conf.getLongBytes(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
...
// 对所有路径进行资源检查
for (URI editsDirToCheck : localEditDirs) {
addDirToCheck(editsDirToCheck,
FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
editsDirToCheck));
}
...
}
5. checkAvailableResources(),返回步骤3,看看检查的逻辑
void checkAvailableResources() {
long resourceCheckTime = monotonicNow();
Preconditions.checkState(nnResourceChecker != null,
"nnResourceChecker not initialized");
// 判断资源是否足够,不够返回false
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
resourceCheckTime = monotonicNow() - resourceCheckTime;
NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}
NN对心跳超时判断
从NN启动资源检查步骤3的blockManager.activate开始,对NN对心跳超时判断进行Debug
1. startCommonServices(conf, haContext)
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 创建检查器
nnResourceChecker = new NameNodeResourceChecker(conf);
// 真正检查的逻辑在这
// 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
checkAvailableResources();
...
// 开启安全模式
prog.beginPhase(Phase.SAFEMODE);
...
// NN对心跳超时判断
blockManager.activate(conf, completeBlocksTotal);
...
} finally {
writeUnlock("startCommonServices");
}
...
}
2. blockManager.activate(conf, completeBlocksTotal),进行心跳超时判断
public void activate(Configuration conf, long blockTotal) {
pendingReconstruction.start();
datanodeManager.activate(conf);
...
}
// 进入上述的datanodeManager.activate(conf)
void activate(final Configuration conf) {
datanodeAdminManager.activate(conf);
// 开始检查心跳
heartbeatManager.activate();
}
// 进入上述的heartbeatManager.activate()
void activate() {
heartbeatThread.start();
}
3. heartbeatThread.start(),start方法,这是个线程类,用多线程去检查心跳,因此这里看重写的run方法
@Override
public void run() {
while(namesystem.isRunning()) {
restartHeartbeatStopWatch();
try {
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
// 检查心跳
heartbeatCheck();
...
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
...
}
}
4. heartbeatCheck()
@VisibleForTesting
void heartbeatCheck() {
...
// isDatanodeDeadd方法判断DataNode有没有挂掉
if (dead == null && dm.isDatanodeDead(d)) {
...
}
...
}
// 看看上述的isDatanodeDead(d)
boolean isDatanodeDead(DatanodeDescriptor node) {
// monotonicNow()现在系统时刻
// node.getLastUpdateMonotonic()上次心跳的时刻
return (node.getLastUpdateMonotonic() <
(monotonicNow() - heartbeatExpireInterval));
}
5. heartbeatExpireInterval,上次心跳检查的时间。这个值会被DatanodeManager设置,即DatanodeManager的setHeartbeatInterval方法
private void setHeartbeatInterval(long intervalSeconds,
int recheckInterval) {
this.heartbeatIntervalSeconds = intervalSeconds;
this.heartbeatRecheckInterval = recheckInterval;
// recheckInterval是5min,1000*intervalSeconds是3s
// heartbeatExpireInterval是10min30s,即DN在10min30s后仍无法联系上,则会被判断为死亡
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
* intervalSeconds;
this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
blockInvalidateLimit);
}
安全模式
安全模式是NN启动过程中但还未完全结束的一个阶段。从NN启动资源检查步骤3的blockManager.activate开始进行Debug
1. startCommonServices(conf, haContext)
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 创建检查器
nnResourceChecker = new NameNodeResourceChecker(conf);
// 真正检查的逻辑在这
// 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
checkAvailableResources();
...
// 开始进入安全模式
prog.beginPhase(Phase.SAFEMODE);
// 获取所有可以正常使用的block
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// 启动块服务,NN对心跳超时判断
blockManager.activate(conf, completeBlocksTotal);
...
} finally {
writeUnlock("startCommonServices");
}
...
}
2. getCompleteBlocksTotal(),获取所有可以正常使用的block
public long getCompleteBlocksTotal() {
// Calculate number of blocks under construction
long numUCBlocks = 0;
readLock();
try {
// 获取正在构建的block
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
// 返回所有可用的块(即 所有的块 - 正在构建的块)
return getBlocksTotal() - numUCBlocks;
} finally {
readUnlock("getCompleteBlocksTotal");
}
}
3. blockManager.activate(conf, completeBlocksTotal),安全模式和心跳检测都在这里进行的
public void activate(Configuration conf, long blockTotal) {
pendingReconstruction.start();
// 心跳检查在这里开启
datanodeManager.activate(conf);
...
// 开启安全模式
bmSafeMode.activate(blockTotal);
}
4. bmSafeMode.activate(blockTotal),开启安全模式
void activate(long total) {
assert namesystem.hasWriteLock();
assert status == BMSafeModeStatus.OFF;
startTime = monotonicNow();
// 设置块数量并判断是否满足块个数的阈值
setBlockTotal(total);
// 判断DataNode节点和块信息是否达到退出安全模式标准
if (areThresholdsMet()) {
boolean exitResult = leaveSafeMode(false);
Preconditions.checkState(exitResult, "Failed to leave safe mode.");
} else {
// enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", true);
lastStatusReport = monotonicNow();
}
}
5. setBlockTotal(total),设置块数量并判断是否满足块个数的阈值
void setBlockTotal(long total) {
assert namesystem.hasWriteLock();
synchronized (this) {
this.blockTotal = total;
// 计算阈值:例如:1000个正常的块 * 0.999 = 999
// this.threshold=0.999f
this.blockThreshold = (long) (total * threshold);
}
this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}
6. areThresholdsMet(),判断DataNode节点和块信息是否达到退出安全模式标准
private boolean areThresholdsMet() {
assert namesystem.hasWriteLock();
// Calculating the number of live datanodes is time-consuming
// in large clusters. Skip it when datanodeThreshold is zero.
// We need to evaluate getNumLiveDataNodes only when
// (blockSafe >= blockThreshold) is true and hence moving evaluation
// of datanodeNum conditional to isBlockThresholdMet as well
synchronized (this) {
// 正常启动的块数 >= 块的最小阈值
boolean isBlockThresholdMet = (blockSafe >= blockThreshold);
boolean isDatanodeThresholdMet = true;
if (isBlockThresholdMet && datanodeThreshold > 0) {
int datanodeNum = blockManager.getDatanodeManager().
getNumLiveDataNodes();
// 正常启动的DN数量 >= DN的最小阈值
isDatanodeThresholdMet = (datanodeNum >= datanodeThreshold);
}
return isBlockThresholdMet && isDatanodeThresholdMet;
}
}
评论区