Hadoop写文件create代码追踪

tech2022-10-25  169

服务端代码分析

NameNodeRpcServer.create方法:namenode所有的rpc响应都由NameNodeRpcServer来处理

1.参数配置及检查

检查namenode的状态

checkNNStartup(); private void checkNNStartup() throws IOException { if (!this.nn.isStarted()) { String message = NameNode.composeNotStartedMessage(this.nn.getRole()); throw new RetriableException(message); } }

获取client的IP地址

String clientMachine = getClientMachine(); private static String getClientMachine() { String clientMachine = Server.getRemoteAddress(); if (clientMachine == null) { //not a RPC client clientMachine = ""; } return clientMachine; }

校验文件srcPath的length及depth srcPath最大length为:MAX_PATH_LENGTH = 8000 srcPath最大depth: MAX_PATH_DEPTH = 1000

if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } private boolean checkPathLength(String src) { Path srcPath = new Path(src); return (src.length() <= MAX_PATH_LENGTH && srcPath.depth() <= MAX_PATH_DEPTH); }

验证当前状态下是否允许给定的操作类别.主要针对开启HA的集群

namesystem.checkOperation(OperationCategory.WRITE); public void checkOperation(OperationCategory op) throws StandbyException { if (haContext != null) { // null in some unit tests haContext.checkOperation(op); } }

以上代码的时序图:

2.在namespace中创建文件

首先查看缓存中是否已经有了相同rpc调用的结果,如果缓存中有直接返回结果。

CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); }

缓存中没有,则调用FSNamesystem的startFile方法在namespace中创建文件,创建成功后将返回值HdfsFileStatus对象返回给客户端。FSNamesystem.startFile方法中直接调用了FSNamesystem.startFileInt方法

status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, ecPolicyName, cacheEntry != null); metrics.incrFilesCreated(); metrics.incrCreateFileOps(); return status;

3.具体流程分析

HdfsFileStatus创建过程:

FSNamesystem.startFileInt方法

首先判断src是否有效

if (!DFSUtil.isValidName(src) || FSDirectory.isExactReservedName(src) || (FSDirectory.isReservedName(src) && !FSDirectory.isReservedRawName(src) && !FSDirectory.isReservedInodesName(src))) { throw new InvalidPathException(src); } public static boolean isValidName(String src) { // Path must be absolute.必须是绝对路径,src必须以‘/’开头。 if (!src.startsWith(Path.SEPARATOR)) { return false; } // Check for ".." "." ":" "/" String[] components = StringUtils.split(src, '/'); for (int i = 0; i < components.length; i++) { String element = components[i]; if (element.equals(".") || (element.contains(":")) || (element.contains("/"))) { return false; } // ".." is allowed in path starting with /.reserved/.inodes if (element.equals("..")) { if (components.length > 4 && components[1].equals(".reserved") && components[2].equals(".inodes")) { continue; } return false; } // The string may start or end with a /, but not have "//" in the middle. // 路径中间不能有‘//’ if (element.isEmpty() && i != components.length - 1 && i != 0) { return false; } } return true; } /* * CHECK_RESERVED_FILE_NAMES:true * DOT_RESERVED_PATH_PREFIX:"/.reserved" */ //src是不是"/.reserved" public static boolean isExactReservedName(String src) { return CHECK_RESERVED_FILE_NAMES && src.equals(DOT_RESERVED_PATH_PREFIX); } //src是不是以"/.reserved/"开头 public static boolean isReservedName(String src) { return src.startsWith(DOT_RESERVED_PATH_PREFIX + Path.SEPARATOR); } //src是不是以"/.reserved/raw"开头 static boolean isReservedRawName(String src) { return src.startsWith(DOT_RESERVED_PATH_PREFIX + Path.SEPARATOR + RAW_STRING); } //src是不是以"/.reserved/.inodes"开头 static boolean isReservedRawName(String src) { return src.startsWith(DOT_RESERVED_PATH_PREFIX + Path.SEPARATOR + RAW_STRING); }

以上代码的时序图:

然后检查冗余方式:纠删码 or 副本

boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE); if (shouldReplicate && (!org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName))) { throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " + "ecPolicyName are exclusive parameters. Set both is not allowed!"); }

验证当前状态是否允许写操作,并做权限检查

checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); //可重入的读写锁 //基于java.util.concurrent.locks.ReentrantReadWriteLock //写锁 writeLock();

检查namenode是否处于safemode

checkNameNodeSafeMode("Cannot create file" + src); void checkNameNodeSafeMode(String errorMsg) throws RetriableException, SafeModeException { if (isInSafeMode()) { SafeModeException se = newSafemodeException(errorMsg); if (haEnabled && haContext != null && haContext.getState().getServiceState() == HAServiceState.ACTIVE && isInStartupSafeMode()) { throw new RetriableException(se); } else { throw se; } } }

时序图如下:

构造INodesInPath 对象

iip = FSDirWriteFileOp.resolvePathForStartFile(dir, pc, src, flag, createParent); 参数: dir:[FSDirectory] The namespace tree pc:[FSPermissionChecker] dir.getPermissionChecker() src:[String] 客户端传来的参数 flag:[EnumSetWritable] CREATE((short) 0x01),OVERWRITE((short) 0x02),APPEND((short) 0x04)... createParent:[boolean]

检查replication是否在正确范围内

if (shouldReplicate || (org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName) && !FSDirErasureCodingOp.hasErasureCodingPolicy(this, iip))) { blockManager.verifyReplication(src, replication, clientMachine); } public void verifyReplication(String src, short replication, String clientName) throws IOException { String err = null; if (replication > maxReplication) { err = " exceeds maximum of " + maxReplication; } else if (replication < minReplication) { err = " is less than the required minimum of " + minReplication; } if (err != null) { throw new IOException("Requested replication factor of " + replication + err + " for " + src + (clientName == null? "": ", clientName=" + clientName)); } }

检查blockSize

if (blockSize < minBlockSize) { throw new IOException("Specified block size is less than configured" + " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY + "): " + blockSize + " < " + minBlockSize); }

获取文件加密信息(本次不做细究)

// provider = DFSUtil.createKeyProviderCryptoExtension(conf); if (!iip.isRaw() && provider != null) { EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo( this, iip, supportedVersions); // if the path has an encryption zone, the lock was released while // generating the EDEK. re-resolve the path to ensure the namesystem // and/or EZ has not mutated if (ezInfo != null) { checkOperation(OperationCategory.WRITE); iip = FSDirWriteFileOp.resolvePathForStartFile( dir, pc, iip.getPath(), flag, createParent); feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo( dir, iip, ezInfo); } }

调用FSDirWriteFileOp.startFile方法在namespace中创建文件

dir.writeLock(); ... stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, clientMachine, flag, createParent, replication, blockSize, feInfo, toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache); ... dir.writeUnlock(); ... return stat;

FSDirWriteFileOp.startFile方法:Create a new file or overwrite an existing file

检查参数

//是否覆盖 boolean overwrite = flag.contains(CreateFlag.OVERWRITE); //是否使用内存存储策略 boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);

获取文件路径及namesapce中的目录树结构

final String src = iip.getPath(); FSDirectory fsd = fsn.getFSDirectory();

如果文件已经存在

if (iip.getLastINode() != null) { //如果是覆盖写,则先删除 if (overwrite) { List<INode> toRemoveINodes = new ChunkedArrayList<>(); List<Long> toRemoveUCFiles = new ChunkedArrayList<>(); long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks, toRemoveINodes, toRemoveUCFiles, now()); if (ret >= 0) { iip = INodesInPath.replace(iip, iip.length() - 1, null); FSDirDeleteOp.incrDeletedFileCount(ret); fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true); } } //如果不是覆盖写,则抛出文件已存在的异常信息 else { // If lease soft limit time is expired, recover the lease fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip, src, holder, clientMachine, false); throw new FileAlreadyExistsException(src + " for client " + clientMachine + " already exists"); } } static long delete(FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes, List<Long> removedUCFiles, long mtime) throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath()); } long filesRemoved = -1; FSNamesystem fsn = fsd.getFSNamesystem(); fsd.writeLock(); try { //deleteAllowed:iip是null或者iip是跟目录的话返回false,否者返回true if (deleteAllowed(iip)) { List<INodeDirectory> snapshottableDirs = new ArrayList<>(); //todo FSDirSnapshotOp.checkSnapshot(fsd, iip, snapshottableDirs); ReclaimContext context = new ReclaimContext( fsd.getBlockStoragePolicySuite(), collectedBlocks, removedINodes, removedUCFiles); if (unprotectedDelete(fsd, iip, context, mtime)) { filesRemoved = context.quotaDelta().getNsDelta(); } //todo fsd.updateReplicationFactor(context.collectedBlocks() .toUpdateReplicationInfo()); //todo fsn.removeSnapshottableDirs(snapshottableDirs); //todo fsd.updateCount(iip, context.quotaDelta(), false); } } finally { fsd.writeUnlock(); } return filesRemoved; }

检查文件数是否超限

fsn.checkFsObjectLimit(); void checkFsObjectLimit() throws IOException { if (maxFsObjects != 0 && maxFsObjects <= dir.totalInodes() + getBlocksTotal()) { throw new IOException("Exceeded the configured number of objects " + maxFsObjects + " in the filesystem."); } }

创建所有的父级目录

INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);

目录创建流程:createAncestorDirectories函数中判断iip中是否存在未创建的目录,如果存在则调用createSingleDirectory遍历创建不存在的目录,createSingleDirectory方法调用unprotectedMkdir方法进行目录创建

private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId, INodesInPath parent, byte[] name, PermissionStatus permission, List<AclEntry> aclEntries, long timestamp) throws QuotaExceededException, AclException, FileAlreadyExistsException { assert fsd.hasWriteLock(); //确保父级节点存在 assert parent.getLastINode() != null; //确保父级节点是目录节点 if (!parent.getLastINode().isDirectory()) { throw new FileAlreadyExistsException("Parent path is not a directory: " + parent.getPath() + " " + DFSUtil.bytes2String(name)); } //创建一个目录 final INodeDirectory dir = new INodeDirectory(inodeId, name, permission, timestamp); //将创建的目录添加到namespace的目录树中 INodesInPath iip = fsd.addLastINode(parent, dir, permission.getPermission(), true); if (iip != null && aclEntries != null) { AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID); } return iip; } }

在namespace中创建文件

if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, ecPolicyName); newNode = iip != null ? iip.getLastINode().asFile() : null; } private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, String clientName, String clientMachine, boolean shouldReplicate, String ecPolicyName) throws IOException { //检查父级目录是否为空 Preconditions.checkNotNull(existing); long modTime = now(); INodesInPath newiip; fsd.writeLock(); try { ... INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, blockType); newNode.setLocalName(localName); newNode.toUnderConstruction(clientName, clientMachine); //向namespace树中添加文件 newiip = fsd.addINode(existing, newNode, permissions.getPermission()); } ... return newiip; } public final void addToInodeMap(INode inode) { if (inode instanceof INodeWithAdditionalFields) { //最终是将数据放到了INodeMap中 inodeMap.put(inode); if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); } } }

设置存储策略

setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);

记录日志

fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); //logOpenFile方法调用logEdit方法 void logEdit(final FSEditLogOp op) { boolean needsSync = false; synchronized (this) { assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); // check if it is time to schedule an automatic sync needsSync = doEditTransaction(op); if (needsSync) { isAutoSyncScheduled = true; } } // Sync the log if an automatic sync is required. if (needsSync) { logSync(); } }

返回结果

return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);

INodesInPath 对象的构造过程

INodesInPath创建过程:

FSDirWriteFileOp.resolvePathForStartFile方法:

调用FSDirectory.resolvePath方法创建INodesInPath对象

INodesInPath iip = dir.resolvePath(pc, src, DirOp.CREATE); //下面是一些校验,校验不过抛出响应的异常 ...

FSDirectory.resolvePath

将src拆分成字节数组

... byte[][] components = INode.getPathComponents(src); ... //调用INodesInPath.resolve方法创建INodesInPath对象 INodesInPath iip = INodesInPath.resolve(rootDir, components, isRaw); ...

INodesInPath.resolve方法:

遍历components,创建INodesInPath

//跟目录 INode curNode = startingDir; int count = 0; int inodeNum = 0; INode[] inodes = new INode[components.length]; boolean isSnapshot = false; int snapshotId = CURRENT_STATE_ID; while (count < components.length && curNode != null) { // ... else { // normal case, and also for resolving file/dir under snapshot root //采用binarySearch方法查找childName在namespace中的位置,如果namespace中不存在返回null curNode = dir.getChild(childName,, isSnapshot ? snapshotId : CURRENT_STATE_ID); } } return new INodesInPath(inodes, components, isRaw, isSnapshot, snapshotId);
最新回复(0)