一、创建RDD时的默认并行数与分区数
1、从外部存储(文件)创建RDD
例如:local模式下
val lineRDD: RDD[String] = sc.textFile("./aa,txt")
默认并行数:def defaultMinPartitions: Int = math.min(totalCores, 2)
totalCores:任务运行的总核数
分区数有两种情况:
a,从本地文件file:///生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:
(按照官网的描述,本地file的分片规则,应该按照hdfs的block大小划分,但实测的结果是固定按照32M来分片,可能是bug,不过不影响使用,因为spark能用所有hadoop接口支持的存储系统,所以spark textFile使用hadoop接口访问本地文件时和访问hdfs还是有区别的)
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
b,从hdfs分布式文件系统hdfs://生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
源码如下:
2、从集合(内存)中创建RDD
例如:Yarn、Standalone模式下
val valueRDD: RDD[String] = sc.makeRDD(Array("a", "b", "c", "d"))
默认并行度:def defaultParallelism: Int = math.max(totalCoreCount.get(), 2))
totalCoreCount:任务运行的总核数
分区数:
这种方式下,如果在parallelize操作时没有指定分区数,则
rdd的分区数 = sc.defaultParallelism
二、各种模式的默认并行数
1、本地模式
本地模式(不会启动executor,由SparkSubmit进程生成指定数量的线程数来并发):
spark-shell spark.default.parallelism = 1
spark-shell --master local[N] spark.default.parallelism = N (使用N个核)
spark-shell --master local spark.default.parallelism = 1
2、伪集群模式
x为本机上启动的executor数,y为每个executor使用的core数,z为每个 executor使用的内存
spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y
3、 mesos 细粒度模式
Mesos fine grained mode spark.default.parallelism = 8
4、yarn模式、standalone模式
spark.default.parallelism = max(所有executor使用的core总数, 2)