Spark中textFile源码分析

tech2023-02-04  94

文章目录

textfile源码解析1. 分区策略源码2 最小分区数3. 文件切片(分区)规则4. 数据存储规则案例一:案例二:案例三:案例四:

textfile源码解析

textfile从文件系统中读取文件,基于读取的数据,创建HadoopRDD!

object Spark02_RDD_File { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("File - RDD") val sc = new SparkContext(sparkConf) // TODO Spark - 从磁盘(File)中创建RDD // path : 读取文件(目录)的路径 // path可以设定相对路径,如果是IDEA,那么相对路径的位置从项目的根开始查找。 //如果是单元测试,文件相对路径是相对module // path路径根据环境的不同自动发生改变。 // Spark读取文件时,默认采用的是Hadoop读取文件的规则 // 默认是一行一行的读取文件内容 // 如果路径指向的为文件目录。那么这个目录中的文本文件都会被读取 //val fileRDD: RDD[String] = sc.textFile("input") // 读取指定的文件 //val fileRDD: RDD[String] = sc.textFile("input/word.txt") // 文件路径可以采用通配符 val fileRDD: RDD[String] = sc.textFile("input/word*.txt") // 文件路径还可以指向第三方存储系统:HDFS //val fileRDD: RDD[String] = sc.textFile("input/word*.txt") println(fileRDD.collect().mkString(",")) sc.stop() } }

1. 分区策略源码

override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try { // 调用输入格式(org.apache.hadoop.mapred.TextInputFormat)进行切片,切片时, minPartitions=2 val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) // 是否过滤空切片后的切片集合 val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } // 如果切的是1片,且是针对文件的切片,做特殊处理 if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { val codecFactory = new CompressionCodecFactory(jobConf) if (Utils.isFileSplittable(path, codecFactory)) { logWarning(s"Loading one large file ${path.toString} with only one partition, " + s"we can increase partition numbers for improving performance.") } else { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, because the file is compressed by unsplittable compression codec.") } } } // 分区数=过滤后的切片数 val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { case e: InvalidInputException if ignoreMissingFiles => logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" + s" partitions returned from this path.", e) Array.empty[Partition] } public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // 当前切片的数据的总大小 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); //file.getLen()获取文件的字节数,若有多个文件则累加 // 计算 goalsize(期望每片大小) long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);//是0的话给1,否则为本身值 //为什么实际分区数量可能大于最小分区数minPartitions呢? 假如文件总字节数能够整除minPartitions,则实际分区数为minPartitions,否则规则如下: 假如总字节10byte,minPartitions为210/2=5,能整除,实际两个分区,每个分区5字节数据 假如总字节11byte,minPartitions为211/2=511/5>0.1,则产生一个新的分区存放1字节数据,实际分区数是3 // minSize默认为1,调节 org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 改变minSize SPLIT_MINSIZE就是"mapreduce.input.fileinputformat.split.minsize long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);//minSplitSize默认为1 // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片以文件为单位切片 for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { // 获取文件的块大小,块大小在上传文件时,指定,如果不指定,默认 128M long blockSize = file.getBlockSize(); // 计算片大小 一般等于 blockSize long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; // 循环切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } // 剩余部分 <=片大小1.1倍,整体作为1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { //如果文件不可切,整个文件作为1片 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { // 文件长度为0,创建一个空切片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); } long splitSize = computeSplitSize(goalSize, minSize, blockSize); // 在处理大数据时,一般情况下,blockSize作为片大小 return Math.max(minSize, Math.min(goalSize, blockSize));

2 最小分区数

def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } //调用的是hadoop中的【TextInputFormat】,也就是说按行读取

defaultMinPartitions: 没设置最小分区数的话,则最小分区数为这个默认值

// 使用defaultParallelism(当前环境默认的核数totalCores) 和 2取最小 def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

defaultMinPartitions和minPartitions 不是最终 分区数,但是会影响最终分区数!

最终分区数,取决于切片数!

3. 文件切片(分区)规则

(即分多少个区,调用saveAsTextFile时每个区会生成一个文件。而且分区是以文件为单位的,读取多个文件时,与读取单个文件规则不太一样)

假如文件总字节10byte,minPartitions为2: 10/2=5,能整除,实际两个分区,每个分区5字节数据 假如总字节11byte,minPartitions为2: 11/2=5 余1,1/5>0.1,则产生一个新的分区存放1字节数据,实际分区数是3

4. 数据存储规则

数据存储并不是按字节总数平均分配到不同的分区的,而是数据按行读取之后,考虑数据的偏移量(offset),规则如下:

案例一:

目录input下有一个文件input1,内容如下:

总字节数为12个字节 (回车换行占用两个字节)

执行一下代码:

@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() }

查看output文件夹发现三个文件:part-00000,part-00001,part-00002

part-00000文件内容:1,2,3

part-00001文件内容:4,5,6

part-00002文件内容为空白。 那么为什么会这样呢?

①首先设置了最小分区数为3,而文件input1总字节数为12,12/3==4,即每个分区4个字节。input文件内偏移量是这样的: (我用@@表示回车符,占用两个字节,偏移量从0开始,每个字节增1)

1,2,3@@ = > 0,1,2,3,4,5,6

4,5,6 = > 7,8,9,10,11

那么既然每个分区4个字节,第一个分区读取从[0,4],第二个分区读取偏移量[4,8],第三个分区读取偏移量[8,12],第一个分区的偏移量全是第一行数据的,所以1,2,3读取到第一个分区。

第二个分区中偏移量8是第二行数据中的逗号,而textFile是按行读取,所以第二行数据全部读取到第二个分区。第三个分区数据为空

案例二:

目录input下有一个文件input1,内容为: 123456789 (总字节数为9),执行如下代码:

@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }

查看output文件夹发现三个文件:part-00000,part-00001,part-00002

part-00000文件内容: 123456789 其余两个文件内容为空

不难理解,三个分区总共9个字节,每个分区3个字节,但是数据全在一行,第一个分区按偏移量读取数据的时候把一行数据全部读取,所以其他两个分区内容为空。

案例三:

目录input下有一个文件input1,内容为:

总字节数为16(三个回车符占6个字节),执行如下代码:

@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }

查看output文件夹发现三个文件:part-00000,part-00001,part-00002,part-00003

这里设置的最小分区数为3,为什么最终分区数为4呢?是这样的,总字节数16/3=5余1,每个分区5个字节还余了1,1/5=0.2大于0.1,所以增加一个分区储存这一个字节。

文件内容如下:

part-00000:

part-00001:

part-00002:

part-00003:内容空白

读取过程:

part-00000:偏移量[0,5], part-00001:偏移量[5,10], part-00002:偏移量[10,15], part-00003:偏移量15

input1文件中偏移量如下:

input1文件中偏移量如下:

1@@ => 0,1,2

234@@ => 3,4,5,6,7

567@@ => 8,9,10,11,12

890 => 13,14,15

part-00000读到偏移量5,偏移量5对应第二行数据的数字4,所以第一行和第二行数据读到part-00000

part-00001从5读到偏移量10,而第二行数据已经读到第一个分区了,所以第三行数据到part-00001

part-00002从10读到偏移量15,而第三行数据已经读到第二个分区了,所以第四行数据到part-00002

part-00003读取偏移量15,而第四行数据已经读到第三个分区了,所以part-00003没数据。

案例四:

目录input下有两个文件input1,input2, input1内容为:123456, input2内容为:7890

两个文件总字节数为10,执行如下代码:

@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }

结果发现,output文件夹下有四个分区文件 part-00000,part-00001,part-00002,part-00003

因为10/3 = 3余1,所以四个分区。文件内容如下:

part-00000:123456

part-00001:内容为空

part-00002:7890

part-00003:内容为空。

现在分析一下原因:

input1的偏移量:

123456 => 0,1,2,3,4,5

input2的偏移量:

7890 => 0,1,2,3

按10/3=3余1算的话分为4个分区,每个分区3个字节,有一个分区是1个字节,所以按文件分别切分。

①首先从input1文件读取,偏移量[0,3]范围读取到第一个分区,input1的123456在一行,全部读取到第一个分区part-00000,然后偏移量[3,6]范围读取到第二个分区,因为第一行数据已经被读取到了第一个分区,所以第二个分区内容为空。

②从input2文件读取时,偏移量[0,3]范围读取到第三个分区,input2的7890在一行,全部读取到第三个分区part-00002,所以第四个分区读取数据时已经没数据了,所以part-00003内容为空。

最新回复(0)