Spark框架核心是一个计算引擎,采用了标准的master-slave标准。其中Driver表示Master,负责管理整个集群的作业任务调度。Excutor是slave,负责实际执行任务。
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:Driver程序的运行节点。
从底层代码来讲,makeRDD方法就是parallize()方法
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }本地文件系统,所有Hadoop支持的数据集,比如HDFS和HBase等
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) //使用textFile方法从指定路径获取 val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()由上述源码可知分区数=过滤后的切片数,因此查看切片方法源码
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // 当前切片的数据的总大小 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } // 计算 goalsize(期望每片大小),numSplits受并行度影响,如果设置了则按照设置个数来算,没设置就按照并行度和2取最小值def defaultMinPartitions: Int = math.min(defaultParallelism, 2) long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 默认为1,调节 org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 改变minSize long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片以文件为单位切片 for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); //文件非空 if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { // 获取文件的块大小,块大小在上传文件时,指定,如果不指定,默认 128M long blockSize = file.getBlockSize(); // 计算片大小 一般等于 blockSize long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; // 循环切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } // 剩余部分 <=片大小1.1倍,整体作为1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { //如果文件不可切,整个文件作为1片 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { // 文件长度为0,创建一个空切片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); } long splitSize = computeSplitSize(goalSize, minSize, blockSize); // 在处理大数据时,goalsize一般很大,(例如10T/100)一般情况下,blockSize作为片大小 return Math.max(minSize, Math.min(goalSize, blockSize)); //下面是hadoop中FileInputFormat的切片大小公式,很类似 Math.max(minSize,Math.min(maxSize,blockSize))其中minsize默认值为1,maxsize非常大总结:从分析源码中获取的关键信息为,分区数=切片数,切片大小计算方法为max(minSize, Math.min(goalSize, blockSize));查看源码得知minSize=1,blockSize默认为128M(本地模式32M)
ctl+H找到实现类
override def defaultParallelism(): Int = backend.defaultParallelism()点击defaultParallelism()
/** * A backend interface for scheduling systems that allows plugging in different ones under * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis def start(): Unit def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Intctl+h找到SchedulerBackend的实现类CoarseGrainedSchedulerBackend
// 默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行! override def defaultParallelism(): Int = { //点击getInt方法查看 conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } def getInt(key: String, defaultValue: Int): Int = catchIllegalValue(key) { getOption(key).map(_.toInt).getOrElse(defaultValue) }默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行!standalone / YARN模式, totalCores是Job申请的总的核数!