Apache Spark是一个分布式计算框架。
支持的调度器有:
Standalone Scheduler:Spark自己的调度器。当在没有Hadoop安装的集群上运行Spark程序时,这个调度器是一个选择;YARN:Hadoop的默认调度器。它对于批处理任务进行了优化。Mesos:使用和Linux内核同样的原理创建的,它没有绑定到Hadoop上,YARN对于批处理任务进行了优化,而Mesos比较适合流处理任务;Kubernetes:是一个容器的编排框架,可以提供在一个物理集群上运行多个版本的Spark和共享命名空间的能力。Hadoop主要由以下组件组成:
Hive和PigMapReduceYARNHDFSHDFS是存储层。
MapReduce是一个编程模型,主要由四个阶段组成:Map、Sort、Shuffle和Reduce。
Hadoop和Spark的一个区别是MR是和数据的格式紧耦合的,而Spark提供了一个RDD的概念,类似一个分布式数据的容器;另一个不同是Spark利用内存,它可以把数据缓存到内存中,来避免磁盘IO,而MR则包含多次磁盘IO。
使开发者可以操作结构化,或有语义结构的数据如:Hive tables、MySQL tables、Parquet文件、AVRO文件、JSON文件、CSV文件等。Spark SQL可以用SQL语句来操作数据
用于处理实时的数据流。
Spark MLlib和ML是spark用于机器学习的两个包,它们提供了:
内置的机器学习算法:分类、回归、聚类等;提供了一些,如:pipeline、vector创建等的特性用于处理图数据。图包含点和边。边用于定义点的关系。GraphX使用RDD用于计算、创建点和边;GraphFrame是一个外部的包,它使用DataFrame来定义点和边。
是Spark程序的基本创建单元。一个RDD表现为一个只读的、分布在多台机器上的数据的集合。RDD的全称是Resilient Distributed Datasets:
Resilient:RDD可以在发生问题时,重新创建自己。每个RDD会记录一些它父RDD和它自己如何创建的信息。Distributed:一个RDD可以把它的数据集分布到一组机器上,然后这些机器处理自己负责的部分数据。Dataset:就是数据的集合。RDD本身会包含一些metadata,用于帮助spark在发生问题时,重建一个RDD分区。而且在执行操作时,对优化提供支持。metadata提供了如下信息:
parent RDDS;用于从parent RDDS计算出RDD分区的函数;分区的推荐位置;分区信息;RDD可以通过四种方式进行创建:
Parallelize a collection:使用既有的集合,然后使用Spark把集合分布到集群上,用于并行处理。可以通过parallelize方法进行集合的分布: val numRDD = spark.sparkContext.parallelize(1 to 100) 从外部数据集创建: val filePath = "/Users/yangxiaochen/temp/a/03/sampleFile.log" val logRDD = spark.sparkContext.textFile(filePath) 基于其它RDD进行创建: val wholeNumRDD = spark.sparkContext.parallelize(1 to 10) val evenNumRDD = wholeNumRDD.filter(_ % 2 == 0) 从一个DataFrame或DataSet进行创建:不建议使用,因为DF和DS就是在RDD之上的抽象
val rangeDF = spark.range(1, 5) val rangeRDD = rangeDF.rdd转换有两种类型narrow transformation和wide transformation。
在已经分区的数据基础上进行transformation,这样就不会涉及到数据的移动,而且新的RDD和它的父RDD有着同样的分区数。如:map()、flatMap()、filter()、union()、mapPartitions()等
此种转换涉及到了分区间的数据流动。如:groupByKey()、reduceByKey()、sortby()、join()、distince()、subtract()和intersect()等。因为涉及到了数据流动,所以此类操作比较浪费计算资源。
只有在Action需要在RDD上执行时,在RDD上指定的Transformation才会被执行,Action不会创建一个新的RDD,它用于以下方面:
向Driver返回一个最终的结果向外部存储写入最终结果在RDD的每个元素上面执行一些操作返回RDD的所有元素;
返回RDD中元素的数量;
返回RDD中指定数目的元素;
返回RDD中尾部N个元素;
返回RDD中的第一个元素;
返回RDD中每个元素出现的次数;
组合RDD元素,形成汇总信息;
把结果存储到外部存储;
在RDD的每个元素上面执行一个函数;
把数据缓存到内存中,是Spark的一个主要的特性。可以把大的数据集缓存到内存或硬盘上。在两个主要场景中可以使用缓存:
多次使用同样的RDD;避免多次恢复包含大量计算的RDD;当Spark会话结束时,缓存的RDD的生命周期也就结束了。如果想在程序间使用同样的RDD,则需要使用Checkpointing,这个操作可以把RDD内容存储到磁盘上
分区决定应用程序的并行程度。正确的设置分区,可以显著的提高spark jobs的性能,可以通过两种方式控制RDD的分区:
repartition和coalesce
用于对已经存在的RDD的分区进行变更。repartition用于增减分区的数目。coalescs只是用来减少分区数目。在大部分情况下,coalescs不会涉及到数据的流动、而repartition会涉及到,所以通常来说,coalescs的性能要比repartition好
partitionBy
所有涉及到数据流动的操作,都会接收一个额外的参数:并行度。这个参数允许用户为新产生的RDD指定分区数。
DataFrame是RDD的抽象,以行和列的方式组织分布式数据。DataFrame的特性包括:
DF可以处理不同格式和不同存储介质中的数据;DF可以处理KB到PB级的数据;可以使用Spark-SQL查询优化器来处理数据;支持多种语言;以树形结构显示DF的映射关系
从DF中选择一些列
基于某些条件,从DF中过滤出一些行。
salesDF.filter($"id" < 50).show(50)基于某些列,把行形成组,然后应用一些汇总函数,如:count()、agv()等
salesDF.groupBy("ip").count().showDF允许在数据上直接运行SQL,为了运行SQL,我们需要在DF上创建临时view,这些view可以分为local和global view。
salesDF.createOrReplaceTempView("sales") val sqlDF = spark.sql("select * from sales") sqlDF.show临时View只在创建它的session中有效,如果希望view在多个session间有效,需要使用Global Temporary Views。
Dataset是强类型的对象集合。在Dataset上执行的操作包括Transformation和Action:
Transformation:用于产生新的dataset,包括:Map、FlatMap、Filter、Select和Aggregate等;action:对dataset进行计算,返回转换的结果,包括:count、show和save等; import org.apache.spark.sql.types._ import org.apache.spark.sql.Encoders case class Sales (id: Int, firstname: String,lastname: String,address: String,city: String,state: String,zip: String,ip: String,product_id: String,date_of_purchase: String) val salesDs = spark.read.option("sep", "\t").option("header", "true").csv(filePath).withColumn("id", 'id.cast(IntegerType)).as[Sales]通过import spark.implicits._可以引入Encoders。
Encoder的主要特性:
快速串行化;支持语义结构化的数据;Spark SQL是通过SchemaRDD对数据进行的一种抽象,允许通过schema对dataset进行定义,然后使用sql进行查询。
为了存储数据库、表名和schema,Spark会在启动spark sql的相同位置创建一个metastore.db的缺省数据库。spark也可以利用Hive metastore。
创建:create database if not exists mydb
显示数据库列表:show databases;
查看数据库详情:describe database [extended] mydb;
切换数据库:use mydb;
删除数据库:drop database if exists mydb [restrict|cascade];
restrict:删除非空数据库,抛出异常;
cascade:同时删除所有的表;
创建表
create [external] table [if not exists] [mydb.]mytable [(col_name1:col_type1)] --[partitioned by (col_name2:col_type2)] [row format row_format] [stored as file_format] [location path] [tblproperties (key1=val1,key2=val2,...)] [as select_statement]创建视图
create [or replace] view mydb.myview [(col1_name, col2_name)] [tblproperties (key1=val1,...)] as select ... 显示表的信息 describe mydb.mytable 重命名 alter table|view mydb.mytable rename to mydb.mytable1 设置属性 alter table|view mydb.mytable set tblproperties(k=v,...) 删除属性 alter table|view mydb.mytable unset tblproperties if exists (k1,k2,...) 删除表 drop table mydb.mytable 清空表 truncate table mytableSpark Stream支持的数据源:
KafkaFlumeKinesisZeroMQTwitterml的pipeline api提供了两个主要的核心特性:
Transformer:产生新的DF;Estimator:使用DF来产生一个新的Transformation;一个pipeline就是这些Transformer和Estimator的集合。例如:
package demo import org.apache.spark.ml.Pipeline import org.apache.spark.sql.SparkSession import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.DecisionTreeRegressor object Demo { def main(args:Array[String]): Unit = { val spark = SparkSession.builder().appName("ML").getOrCreate() val dataPath = "/Users/yangxiaochen/temp/a/Chapter07/ml/dt.data" val dataDF = spark.read.format("libsvm").load(dataPath) val Array(trainingDF, testDataDF) = dataDF.randomSplit(Array(0.7, 0.3)) val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(dataDF) val dtModel = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val pipeLine = new Pipeline().setStages(Array(featureIndexer, dtModel)) val model = pipeLine.fit(trainingDF) val predictionsDF = model.transform(testDataDF) predictionsDF.select("prediction", "label", "features").show(20) } }一个graph是由一组点和边组成。一个点是一个对象,边定义了两个点之间的关系。图可以分为有向图和无向图。在Spark的图lib中使用的是一种叫做property graph的结构。在这个结构中,每个点都是一个被赋予了一些属性的对象,用于提供一些信息。边也是被赋予了一些属性,如:权重、关系等的对象。为了处理图,Spark提供了两个库:
是一个底层的API,用于处理RDD。
用于处理DataFrames。这个库是一个外部的包,需要从https://spark-packages.org/package/graphframes/graphframes进行下载。
衡量Job性能的两个要素:运行时间和存储空间;
内存主要用于三个目的:
RDD存储Shuffle和汇总的存储用户代码默认情况下:RDD占60%;Shuff和汇总的存储占20%;用户代码占20%;
在缓存数据时,要仔细原则存储的级别:MEMORY_ONLY会把数据保持在内存里,在这种情况下,如果RDD比较大,会引起分区的重新计算。
Spark使用硬盘临时存储shuffle数据。在独立运行和结合Mesos使用时,这个位置是由SPARK_LOCAL_DIRS变量进行配置的。在YARN情况下,Spark会继承YARN的配置。
计算公式如下:
总核数:每台核数 * 台数
为YARN等保留的核数:1 * 台数
executor的数目:(总核数 - 为YARN等保留的核数)/ 台数 (需要保留一个用于application master)
在Spark driver执行过程中,可以启用dynamic executor分配,这样可以提高资源的利用率。这个特性可以通过spark.dynamicAllocation.*的相关属性进行配置。也可以通过driver-memory配置分配给driver的内存。
Project Tungsten主要用是于优化的,在以前分布式计算的瓶颈是硬盘IO和网络带宽。但是近些年CPU和内存变成了新的瓶颈。Project Tungsten主要为spark程序在CPU和内存方面提供优化,优化主要体现在以下几个方面:
内存管理:减少jvm对象,减少内存消耗和gc;二进制处理:以二进制的方式,而非JVM对象的方式处理数据;Code generation可以通过spark.sql.tungsten.enabled属性控制是否启动Project Tungsten。
如果在DF层面上进行操作,则语言的选择关系不大;如果需要操作RDD,则应该选择Scala或Java。
计算量很大的情况下应该优先使用DF,如果需要对程序有更好的控制,则应选择RDD。
文件格式的选择原则:
Binary vs textSplittable vs non-splittableColumn vs Row basedBinary会提高存储和网络传输的效率。Spark推荐使用Parquet和ORC格式存储数据。
一个主要的优化方式是避免data shuffle。
为结构化API提供性能优化。
DAG Constructor的职责是把一个JOB分解为一组stage。
stage是一组不需要移动数据就可以一起执行的task的集合。可以在DF或RDD上调用explain方法来查看stage信息。
一个task就是一个分区的数据和一组transformation的集合。task有两种类型:
Shuffle Map task:它执行transformation,创建新的数据分区;Result task:用于返回结果;职责是基于可用的核数、数据位置,结合executor来调度tasks。可以通过配置spark.task.cpus属性来指定task scheduler的行为,spark提供了两种策略:
FIFO:默认的调度策略。FAIR:这个策略会给所有的task以相等的资源;