textfile从文件系统中读取文件,基于读取的数据,创建HadoopRDD!
object Spark02_RDD_File { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("File - RDD") val sc = new SparkContext(sparkConf) // TODO Spark - 从磁盘(File)中创建RDD // path : 读取文件(目录)的路径 // path可以设定相对路径,如果是IDEA,那么相对路径的位置从项目的根开始查找。 //如果是单元测试,文件相对路径是相对module // path路径根据环境的不同自动发生改变。 // Spark读取文件时,默认采用的是Hadoop读取文件的规则 // 默认是一行一行的读取文件内容 // 如果路径指向的为文件目录。那么这个目录中的文本文件都会被读取 //val fileRDD: RDD[String] = sc.textFile("input") // 读取指定的文件 //val fileRDD: RDD[String] = sc.textFile("input/word.txt") // 文件路径可以采用通配符 val fileRDD: RDD[String] = sc.textFile("input/word*.txt") // 文件路径还可以指向第三方存储系统:HDFS //val fileRDD: RDD[String] = sc.textFile("input/word*.txt") println(fileRDD.collect().mkString(",")) sc.stop() } }defaultMinPartitions: 没设置最小分区数的话,则最小分区数为这个默认值
// 使用defaultParallelism(当前环境默认的核数totalCores) 和 2取最小 def defaultMinPartitions: Int = math.min(defaultParallelism, 2)defaultMinPartitions和minPartitions 不是最终 分区数,但是会影响最终分区数!
最终分区数,取决于切片数!
(即分多少个区,调用saveAsTextFile时每个区会生成一个文件。而且分区是以文件为单位的,读取多个文件时,与读取单个文件规则不太一样)
假如文件总字节10byte,minPartitions为2: 10/2=5,能整除,实际两个分区,每个分区5字节数据 假如总字节11byte,minPartitions为2: 11/2=5 余1,1/5>0.1,则产生一个新的分区存放1字节数据,实际分区数是3目录input下有一个文件input1,内容如下:
总字节数为12个字节 (回车换行占用两个字节)
执行一下代码:
@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() }查看output文件夹发现三个文件:part-00000,part-00001,part-00002
part-00000文件内容:1,2,3
part-00001文件内容:4,5,6
part-00002文件内容为空白。 那么为什么会这样呢?
①首先设置了最小分区数为3,而文件input1总字节数为12,12/3==4,即每个分区4个字节。input文件内偏移量是这样的: (我用@@表示回车符,占用两个字节,偏移量从0开始,每个字节增1)
1,2,3@@ = > 0,1,2,3,4,5,6
4,5,6 = > 7,8,9,10,11
那么既然每个分区4个字节,第一个分区读取从[0,4],第二个分区读取偏移量[4,8],第三个分区读取偏移量[8,12],第一个分区的偏移量全是第一行数据的,所以1,2,3读取到第一个分区。
第二个分区中偏移量8是第二行数据中的逗号,而textFile是按行读取,所以第二行数据全部读取到第二个分区。第三个分区数据为空
目录input下有一个文件input1,内容为: 123456789 (总字节数为9),执行如下代码:
@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }查看output文件夹发现三个文件:part-00000,part-00001,part-00002
part-00000文件内容: 123456789 其余两个文件内容为空
不难理解,三个分区总共9个字节,每个分区3个字节,但是数据全在一行,第一个分区按偏移量读取数据的时候把一行数据全部读取,所以其他两个分区内容为空。
目录input下有一个文件input1,内容为:
总字节数为16(三个回车符占6个字节),执行如下代码:
@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }查看output文件夹发现三个文件:part-00000,part-00001,part-00002,part-00003
这里设置的最小分区数为3,为什么最终分区数为4呢?是这样的,总字节数16/3=5余1,每个分区5个字节还余了1,1/5=0.2大于0.1,所以增加一个分区储存这一个字节。
文件内容如下:
part-00000:
part-00001:
part-00002:
part-00003:内容空白
读取过程:
part-00000:偏移量[0,5], part-00001:偏移量[5,10], part-00002:偏移量[10,15], part-00003:偏移量15
input1文件中偏移量如下:
input1文件中偏移量如下:
1@@ => 0,1,2
234@@ => 3,4,5,6,7
567@@ => 8,9,10,11,12
890 => 13,14,15
part-00000读到偏移量5,偏移量5对应第二行数据的数字4,所以第一行和第二行数据读到part-00000
part-00001从5读到偏移量10,而第二行数据已经读到第一个分区了,所以第三行数据到part-00001
part-00002从10读到偏移量15,而第三行数据已经读到第二个分区了,所以第四行数据到part-00002
part-00003读取偏移量15,而第四行数据已经读到第三个分区了,所以part-00003没数据。
目录input下有两个文件input1,input2, input1内容为:123456, input2内容为:7890
两个文件总字节数为10,执行如下代码:
@Test def test3 = { val sparkConf = new SparkConf().setMaster("local").setAppName("test") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.textFile("input",3) rdd1.saveAsTextFile("output") sparkContext.stop() } }结果发现,output文件夹下有四个分区文件 part-00000,part-00001,part-00002,part-00003
因为10/3 = 3余1,所以四个分区。文件内容如下:
part-00000:123456
part-00001:内容为空
part-00002:7890
part-00003:内容为空。
现在分析一下原因:
input1的偏移量:
123456 => 0,1,2,3,4,5
input2的偏移量:
7890 => 0,1,2,3
按10/3=3余1算的话分为4个分区,每个分区3个字节,有一个分区是1个字节,所以按文件分别切分。
①首先从input1文件读取,偏移量[0,3]范围读取到第一个分区,input1的123456在一行,全部读取到第一个分区part-00000,然后偏移量[3,6]范围读取到第二个分区,因为第一行数据已经被读取到了第一个分区,所以第二个分区内容为空。
②从input2文件读取时,偏移量[0,3]范围读取到第三个分区,input2的7890在一行,全部读取到第三个分区part-00002,所以第四个分区读取数据时已经没数据了,所以part-00003内容为空。