Spark学习总结一

tech2025-03-02  10

一、Spark运行架构

Spark框架核心是一个计算引擎,采用了标准的master-slave标准。其中Driver表示Master,负责管理整个集群的作业任务调度。Excutor是slave,负责实际执行任务。

二、任务提交流程

2.1 通用提交流程

2.2 Standalone模式

2.3 YARN模式

Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:Driver程序的运行节点。

2.4核心概念解释

概念解释client申请运行Job任务的设备或程序server处理客户端请求,响应结果master集群的管理者,负责处理提交到集群上的Job,类比为RMworker工作者!实际负责接受master分配的任务,运行任务,类比为NMdriver创建SparkContext的程序,称为Driver程序executor计算器,计算者。是一个进程,负责Job核心运算逻辑的运行!task计算任务! task是线程级别!在一个executor中可以同时运行多个task并行度取决于executor申请的core数application可以提交SparkJob的应用程序,在一个application中,可以调用多次行动算子,每个行动算子都是一个Job!Job一个Spark的任务,在一个Job中,Spark又会将Job划分为若干阶段(stage),在划分阶段时,会使用DAG调度器,将算子按照特征(是否shuffle)进行划分。stage阶段。一个Job的stage的数量= shuffle算子的个数+1。 只要遇到会产生shuffle的算子,就会产生新的阶段! 阶段划分的意义: 同一个阶段的每个分区的数据,可以交给1个Task进行处理!

三、程序解析

3.1 WordCount程序解析

3.1.1wordcount程序

object WordCount { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("My app") //手动创建应用的上下文 val sparkContext = new SparkContext(conf) // 进行wc的编程 // 创建RDD RDD[String]: RDD[一行内容] HadoopRDD val rdd: RDD[String] = sparkContext.textFile("input") // RDD[String]: RDD[单词] val rdd1: RDD[String] = rdd.flatMap(line => line.split(" ")) // RDD[(String, Int)]: RDD[(单词,1)] val rdd2: RDD[(String, Int)] = rdd1.map(word => (word, 1)) // reduceByKey: 将相同key对应的values进行reduce运算! val result: RDD[(String, Int)] = rdd2.reduceByKey((value1, value2) => value1 + value2) // 对结果进行输出 // println(result.collect().mkString(",")) // println(result.collect().mkString(",")) result.saveAsTextFile("output") // 运算完成后,需要关闭sparkContext sparkContext.stop() } }

3.1.2简写版

object WordCount2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("My app") //手动创建应用的上下文 val sparkContext = new SparkContext(conf) //进行wc的编程 //创建RDD RDD[String]:RDD[一行内容] val result:RDD[(String,Int)]=sparkContext.textFile("sparkday01/input") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) //对结果进行打印输出 println(result.collect().mkString(",")) //关闭sparkContext sparkContext.stop() } }

3.1.3 数据流图解

3.2 RDD的创建

3.2.1.从集合(内存)中创建RDD(parallelize和makeRDD方法都可以)

val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) def testMakeRDD()={ val list = List(1,2,3,4,5,6) val rdd1 = sparkContext.makeRDD(list,5) val rdd2 = sparkContext.parallelize(List(1,2,3,4)}

从底层代码来讲,makeRDD方法就是parallize()方法

def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }

3.2.2.从外部存储文件创建RDD(textFile方法)

本地文件系统,所有Hadoop支持的数据集,比如HDFS和HBase等

val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) //使用textFile方法从指定路径获取 val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()

3.2.3.从其他RDD创建(转换算子)

3.4textFile分区策略源码解析

3.4.1 分区数源码追寻

def textFile( path: String, //读取文件时,传递的参数为最小分区数,但不一定就是这个分区数,取决于hadoop读取文件时的分片规则 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }

//比较defaultParralelism的和2的最小值,默认的SparkConf中没有设置spark.default.parallelism,默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行,取默认值和2的最小值,defaultMinPartitions和minPartitions不是最终分区数,但是会影响最终分区数!最终分区数,还取决于切片数! def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

3.4.2 分区策略源码分析

override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try { // 调用输入格式(org.apache.hadoop.mapred.TextInputFormat)进行切片,切片时, minPartitions=2 val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) // 是否过滤空切片后的切片集合 val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } // 如果切的是1片,且是针对文件的切片,做特殊处理 if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { val codecFactory = new CompressionCodecFactory(jobConf) if (Utils.isFileSplittable(path, codecFactory)) { logWarning(s"Loading one large file ${path.toString} with only one partition, " + s"we can increase partition numbers for improving performance.") } else { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, because the file is compressed by unsplittable compression codec.") } } } // 分区数=过滤后的切片数 val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { case e: InvalidInputException if ignoreMissingFiles => logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" + s" partitions returned from this path.", e) Array.empty[Partition] }
切片机制源码

由上述源码可知分区数=过滤后的切片数,因此查看切片方法源码

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // 当前切片的数据的总大小 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } // 计算 goalsize(期望每片大小),numSplits受并行度影响,如果设置了则按照设置个数来算,没设置就按照并行度和2取最小值def defaultMinPartitions: Int = math.min(defaultParallelism, 2) long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 默认为1,调节 org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 改变minSize long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片以文件为单位切片 for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); //文件非空 if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { // 获取文件的块大小,块大小在上传文件时,指定,如果不指定,默认 128M long blockSize = file.getBlockSize(); // 计算片大小 一般等于 blockSize long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; // 循环切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } // 剩余部分 <=片大小1.1倍,整体作为1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { //如果文件不可切,整个文件作为1片 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { // 文件长度为0,创建一个空切片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); } long splitSize = computeSplitSize(goalSize, minSize, blockSize); // 在处理大数据时,goalsize一般很大,(例如10T/100)一般情况下,blockSize作为片大小 return Math.max(minSize, Math.min(goalSize, blockSize)); //下面是hadoop中FileInputFormat的切片大小公式,很类似 Math.max(minSize,Math.min(maxSize,blockSize))其中minsize默认值为1,maxsize非常大

总结:从分析源码中获取的关键信息为,分区数=切片数,切片大小计算方法为max(minSize, Math.min(goalSize, blockSize));查看源码得知minSize=1,blockSize默认为128M(本地模式32M)

3.4.3 textFile分区实测解析

@Test def TestTextFile(): Unit ={ val result:RDD[(String,Int)]=sparkContext.textFile("input1",6) .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) //对结果进行打印输出 //println(result.collect().mkString(",")) result.saveAsTextFile("output") } input1中有aa.txt=351b bb.txt=349b文件goalSize=700/6=116.7b 切片大小为max(1,min(333.3b,32m))=116.7b 切片数为351/116.7=3+余数 349/116.7=2+余数,剩余部分小于1.1倍的116.7,因此放在一个分区内,总共有0-5六个partition

3.5 makeRDD分区策略源码解析

3.5.1分区数源码分析

def makeRDD[T: ClassTag]( seq: Seq[T], //numSlices: 控制集合创建几个分区! numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }

def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism }

//抽象方法 def defaultParallelism(): Int

ctl+H找到实现类

override def defaultParallelism(): Int = backend.defaultParallelism()

点击defaultParallelism()

/** * A backend interface for scheduling systems that allows plugging in different ones under * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis def start(): Unit def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int

ctl+h找到SchedulerBackend的实现类CoarseGrainedSchedulerBackend

// 默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行! override def defaultParallelism(): Int = { //点击getInt方法查看 conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) }

def getInt(key: String, defaultValue: Int): Int = catchIllegalValue(key) { getOption(key).map(_.toInt).getOrElse(defaultValue) }

默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行!standalone / YARN模式, totalCores是Job申请的总的核数!

3.5.2分区策略

def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { //检查分区数是否合法 if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected /*length:6 numSlices:4 */ def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { /* (0 until numSlices) = [0,4) 0: (0,1) 1: (1,3) 2: (3,4) 3: (4,6) */ (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } ...... 对range类型进行特殊处理 // 非Ranger,按此方法处理 case _ => //Array(1,2,3,4,5,6) val array = seq.toArray // To prevent O(n^2) operations for List etc // 返回 {(0,1),(1,3),(3,4),(4,6)} // 1,(2,3),4,(5,6) positions(array.length, numSlices) .map { case (start, end) => array.slice(start, end).toSeq // 1,(2,3),4,(5,6) }.toSeq }

3.5.3 makeRD创建实例分析

class RDDTest { //本地集群总核数取决于local[N] local:1核local[2]: 2核 local[*] : 所有核 val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app")) //由于本机cpu为4核,makeRDD方法没有设置并行度,按照local[*]参数来配置分区,因此该例计算文件个数为4个 @Test def testMakeRDD()={ val list = List(1,2,3,4,5,6) val rdd1 = sparkContext.makeRDD(list) rdd1.saveAsTextFile("output") } } partition-00000 1 partition-00001 2,3 partition-00002 4 partition-00003 5,6
最新回复(0)