工作机制
- DataNode启动后会向NameNdoe注册自己
- 注册成功后,DN会每隔6小时上报当前节点的所有块信息
- 并且每隔3s进行心跳
- 如果超过10min30s没有心跳,NN会把这个DN当成死亡节点,去除掉
环境准备
创建一个Maven项目,pom文件依赖如下
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
前置源码
一个DN的启动是由DataNode.java的main方法启动的,如下所示:
1. main,DataNode启动主入口
public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
// 核心逻辑在这
secureMain(args, null);
}
2. secureMain(args, null),进来看看
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try {
...
// 创建DataNode
DataNode datanode = createDataNode(args, null, resources);
...
} catch (Throwable e) {
LOG.error("Exception in secureMain", e);
terminate(1, e);
} finally {
LOG.warn("Exiting Datanode");
terminate(errorCode);
}
}
3. createDataNode(args, null, resources),创建DN
@VisibleForTesting
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 创建DN实例
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
// 启动DN进程
dn.runDatanodeDaemon();
}
return dn;
}
4. instantiateDataNode(args, conf, resources),创建DN实例
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
...
return makeInstance(dataLocations, conf, resources);
}
// 看看上述的makeInstance(dataLocations, conf, resources)
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
...
return new DataNode(conf, locations, storageLocationChecker, resources);
}
// 看看上述的有参构造
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final StorageLocationChecker storageLocationChecker,
final SecureResources resources) throws IOException {
...
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is {}", hostName);
// 构造DN实例时候,会开启它
startDataNode(dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
...
}
5. startDataNode(dataDirs, resources),开启DN
void startDataNode(List<StorageLocation> dataDirectories,
SecureResources resources
) throws IOException {
...
// 创建数据存储对象
storage = new DataStorage();
// global DN settings
registerMXBean();
// 初始化DataXceiver
// DataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务
initDataXceiver();
// 启动HttpServer
startInfoServer();
...
// 初始化RPC服务
initIpcServer();
...
// 创建BlockPoolManager
blockPoolManager = new BlockPoolManager(this);
// 向NN注册,并发送DN的心跳
blockPoolManager.refreshNamenodes(getConf());
...
}
初始化HTTP服务
那就从前置源码的步骤5看看HTTP服务怎么初始化的
1. startInfoServer(),初始化HTTP服务
private void startInfoServer()
throws IOException {
// SecureDataNodeStarter will bind the privileged port to the channel if
// the DN is started by JSVC, pass it along.
ServerSocketChannel httpServerChannel = secureResources != null ?
secureResources.getHttpServerChannel() : null;
// 创建Http服务器
httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
// 开启Http服务器
httpServer.start();
if (httpServer.getHttpAddress() != null) {
infoPort = httpServer.getHttpAddress().getPort();
}
if (httpServer.getHttpsAddress() != null) {
infoSecurePort = httpServer.getHttpsAddress().getPort();
}
}
2.
public DatanodeHttpServer(final Configuration conf,
final DataNode datanode,
final ServerSocketChannel externalHttpChannel)
throws IOException {
...
// 构造出Http服务器实例
// 绑定了ip和端口
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("datanode")
.setConf(confForInfoServer)
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.hostName(getHostnameForSpnegoPrincipal(confForInfoServer))
.addEndpoint(URI.create("http://localhost:" + proxyPort))
.setFindPort(true);
...
}
初始化DN的RPC服务端
那就从前置源码的步骤5看看RPC服务怎么初始化的
1. initIpcServer(),初始化DN的RPC服务端
private void initIpcServer() throws IOException {
...
// 构造RPC服务端
// 一样是在绑定ip和端口
ipcServer = new RPC.Builder(getConf())
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();
...
}
DN向NN注册
那就从前置源码的步骤5看看DN怎么向NN注册的
1. blockPoolManager.refreshNamenodes(getConf()),DN向NN注册
void refreshNamenodes(Configuration conf)
throws IOException {
...
// 开始注册
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
}
}
2. doRefreshNamenodes(newAddressMap, newLifelineAddressMap),进来看看
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap,
Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
throws IOException {
...
synchronized (this) {
if (!toAdd.isEmpty()) {
...
for (String nsToAdd : toAdd) {
// 创建注册DN所需的服务
BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
// 开启注册DN所需的服务
startAll();
}
...
}
3. createBPOS(nsToAdd, nnIds, addrs, lifelineAddrs),创建注册DN所需的服务
protected BPOfferService createBPOS(
final String nameserviceId,
List<String> nnIds,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
// 本质上是new了一个BPOfferService实例
return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
dn);
}
4. startAll(),开启注册DN所需的服务
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// 遍历上述的offerServices,开启它们
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
// 看看上述的bpos.start()
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
// 继续进入,看看actor.start()
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this);
bpThread.setDaemon(true); // needed for JUnit testing
if (lifelineSender != null) {
lifelineSender.start();
}
// 这里是表示用一个线程类去启动服务的
bpThread.start();
}
5. bpThread.start(),寻找这个线程类的run方法
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// 向NN 注册
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
// 注册失败,5s后重试
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
runningState = RunningState.RUNNING;
if (initialRegistrationComplete != null) {
initialRegistrationComplete.countDown();
}
while (shouldRun()) {
try {
// 发送心跳
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
6. connectToNNAndHandshake(),注册NN
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
// 注册获取NN的RPC客户端对象
bpNamenode = dn.connectToNN(nnAddr);
NamespaceInfo nsInfo = retrieveNamespaceInfo();
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
this.bpThread.setName(formatThreadName("heartbeating", nnAddr));
// 注册
register(nsInfo);
}
7. dn.connectToNN(nnAddr),获取注册NN的RPC客户端对象
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
}
// 看看上述return时的有参构造
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
// 这里用反射去创建注册NN的RPC服务端
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}
private static DatanodeProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
// 反射创建
return RPC.getProxy(DatanodeProtocolPB.class,
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
}
8. register(nsInfo),注册NN
void register(NamespaceInfo nsInfo) throws IOException {
// 创建注册信息
DatanodeRegistration newBpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// 把注册信息发送给NN(本质上是DN调用接口方法,让NN去执行注册逻辑,类似Dubbo)
newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
newBpRegistration.setNamespaceInfo(nsInfo);
bpRegistration = newBpRegistration;
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
+ e.getLocalizedMessage());
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
} catch(RemoteException e) {
LOG.warn("RemoteException in register", e);
throw e;
} catch(IOException e) {
LOG.warn("Problem connecting to server: " + nnAddr);
}
// Try again in a second
sleepAndLogInterrupts(1000, "connecting to server");
}
...
}
9. bpNamenode.registerDatanode(newBpRegistration),发送注册信息
@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
) throws IOException {
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration));
RegisterDatanodeResponseProto resp;
try {
// 注册DN
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getRegistration());
}
// 看看上述的rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build())
// 这是个接口方法,咱们直接去找它的实现,在FSNamesystem.java
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
writeLock();
try {
// 真正的注册逻辑在这
blockManager.registerDatanode(nodeReg);
} finally {
writeUnlock("registerDatanode");
}
}
10. blockManager.registerDatanode(nodeReg),进来看看
public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
assert namesystem.hasWriteLock();
datanodeManager.registerDatanode(nodeReg);
bmSafeMode.checkSafeMode();
}
// 看看上述的datanodeManager.registerDatanode(nodeReg
public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException, UnresolvedTopologyException {
...
// 注册DN
addDatanode(nodeDescr);
blockManager.getBlockReportLeaseManager().register(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
// 将DN添加到心跳管理
heartbeatManager.addDatanode(nodeDescr);
heartbeatManager.updateDnStat(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
startAdminOperationIfNecessary(nodeDescr);
success = true;
...
}
向NN发送心跳
向NN发送心跳从DN向NN注册步骤5的开始的
1. offerService(),发送心跳
private void offerService() throws Exception {
while (shouldRun()) {
try {
...
if (sendHeartbeat) {
...
if (!dn.areHeartbeatsDisabledForTests()) {
// 发送心跳
resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
if (fullBlockReportLeaseId != 0) {
LOG.warn(nnAddr + " sent back a full block report lease " +
"ID of 0x" +
Long.toHexString(resp.getFullBlockReportLeaseId()) +
", but we already have a lease ID of 0x" +
Long.toHexString(fullBlockReportLeaseId) + ". " +
"Overwriting old lease ID.");
}
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
getRpcMetricSuffix());
}
...
}
...
}
...
}
...
}
2. sendHeartBeat(requestBlockReportLease)
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
throws IOException {
...
// 通过NN的RPC客户端发送给NN
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getActiveTransferThreadCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
...
}
3. bpNamenode.sendHeartbeat(...),它其实会调用NameNodeRpcServer.java的sendHeartbeat方法
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
// 处理DN发送的心跳
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
}
4.namesystem.handleHeartbeat(...),处理DN发送的心跳
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
// 处理DN发送过来的心跳
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers, slowDisks);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getCorrectLastAppliedOrWrittenTxId());
// 响应DN的心跳
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
blockReportLeaseId);
} finally {
readUnlock("handleHeartbeat");
}
}
5. blockManager.getDatanodeManager().handleHeartbeat(...),看看怎么处理DN发送过来的心跳
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
...
// 根据DN发送过来的心跳,更新信息
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
...
}
6. updateHeartbeat(...),更新信息
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}
// 看看上述的blockManager.updateHeartbeat(...)
void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
failedVolumes, volumeFailureSummary);
}
// 看看上述的node.updateHeartbeat(...)
void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
heartbeatedSinceRegistration = true;
}
// 看看上述的 updateHeartbeatState(...)
void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
// 更新存储
updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
// 更新心跳时间
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
rollBlocksScheduled(getLastUpdateMonotonic());
}
评论区