Spark学习笔记

tech2025-12-05  5

概述

Apache Spark是一个分布式计算框架。

支持的数据存储

HIVECassandraHBaseParquestAmazon S3MySQLHDFSKafakaJSON

体系结构

Driver:解析用户代码,在worker节点上创建多个executor;Executor:运行在worker节点上的JVM,用于提供运行任务的硬件资源;

集群管理

支持的调度器有:

Standalone Scheduler:Spark自己的调度器。当在没有Hadoop安装的集群上运行Spark程序时,这个调度器是一个选择;YARN:Hadoop的默认调度器。它对于批处理任务进行了优化。Mesos:使用和Linux内核同样的原理创建的,它没有绑定到Hadoop上,YARN对于批处理任务进行了优化,而Mesos比较适合流处理任务;Kubernetes:是一个容器的编排框架,可以提供在一个物理集群上运行多个版本的Spark和共享命名空间的能力。

Hadoop和Spark的区别

Hadoop主要由以下组件组成:

Hive和PigMapReduceYARNHDFS

HDFS是存储层。

MapReduce是一个编程模型,主要由四个阶段组成:Map、Sort、Shuffle和Reduce。

Hadoop和Spark的一个区别是MR是和数据的格式紧耦合的,而Spark提供了一个RDD的概念,类似一个分布式数据的容器;另一个不同是Spark利用内存,它可以把数据缓存到内存中,来避免磁盘IO,而MR则包含多次磁盘IO。

Spark的执行模式

Master可以和executor运行在同一个本地机器上;通过提供host和port运行在一台指定的机器上;

Spark核心元素

Spark SQLSpark StreamingMachine LearningGraphXSpark Core

Spark Core

定义了基础组件,如:RDD和DataFrame等;定义了操作基础组件的API;定义了共享或分布式变量,如:广播变量和聚集器;定义了所有的基本功能,如:任务管理、内存管理和基础的IO功能等;

Spark SQL

使开发者可以操作结构化,或有语义结构的数据如:Hive tables、MySQL tables、Parquet文件、AVRO文件、JSON文件、CSV文件等。Spark SQL可以用SQL语句来操作数据

Spark Streaming

用于处理实时的数据流。

Spark Machine Learning

Spark MLlib和ML是spark用于机器学习的两个包,它们提供了:

内置的机器学习算法:分类、回归、聚类等;提供了一些,如:pipeline、vector创建等的特性

Spark GraphX

用于处理图数据。图包含点和边。边用于定义点的关系。GraphX使用RDD用于计算、创建点和边;GraphFrame是一个外部的包,它使用DataFrame来定义点和边。

基础

RDD

是Spark程序的基本创建单元。一个RDD表现为一个只读的、分布在多台机器上的数据的集合。RDD的全称是Resilient Distributed Datasets:

Resilient:RDD可以在发生问题时,重新创建自己。每个RDD会记录一些它父RDD和它自己如何创建的信息。Distributed:一个RDD可以把它的数据集分布到一组机器上,然后这些机器处理自己负责的部分数据。Dataset:就是数据的集合。

Resilient metadata

RDD本身会包含一些metadata,用于帮助spark在发生问题时,重建一个RDD分区。而且在执行操作时,对优化提供支持。metadata提供了如下信息:

parent RDDS;用于从parent RDDS计算出RDD分区的函数;分区的推荐位置;分区信息;

RDD的三个重要特征

依赖(指定如何构建需要的输入)分区(基于分区,可以分割工作进行并行计算)计算函数

使用RDD

创建

RDD可以通过四种方式进行创建:

Parallelize a collection:使用既有的集合,然后使用Spark把集合分布到集群上,用于并行处理。可以通过parallelize方法进行集合的分布: val numRDD = spark.sparkContext.parallelize(1 to 100) 从外部数据集创建: val filePath = "/Users/yangxiaochen/temp/a/03/sampleFile.log" val logRDD = spark.sparkContext.textFile(filePath) 基于其它RDD进行创建: val wholeNumRDD = spark.sparkContext.parallelize(1 to 10) val evenNumRDD = wholeNumRDD.filter(_ % 2 == 0) 从一个DataFrame或DataSet进行创建:

不建议使用,因为DF和DS就是在RDD之上的抽象

val rangeDF = spark.range(1, 5) val rangeRDD = rangeDF.rdd

转换和动作

转换

转换有两种类型narrow transformation和wide transformation。

Narrow Transformation

在已经分区的数据基础上进行transformation,这样就不会涉及到数据的移动,而且新的RDD和它的父RDD有着同样的分区数。如:map()、flatMap()、filter()、union()、mapPartitions()等

Wide transformation

此种转换涉及到了分区间的数据流动。如:groupByKey()、reduceByKey()、sortby()、join()、distince()、subtract()和intersect()等。因为涉及到了数据流动,所以此类操作比较浪费计算资源。

Action

只有在Action需要在RDD上执行时,在RDD上指定的Transformation才会被执行,Action不会创建一个新的RDD,它用于以下方面:

向Driver返回一个最终的结果向外部存储写入最终结果在RDD的每个元素上面执行一些操作
collect

返回RDD的所有元素;

count

返回RDD中元素的数量;

take

返回RDD中指定数目的元素;

top

返回RDD中尾部N个元素;

first

返回RDD中的第一个元素;

countByValue

返回RDD中每个元素出现的次数;

reduce

组合RDD元素,形成汇总信息;

saveAsTextFile

把结果存储到外部存储;

foreach

在RDD的每个元素上面执行一个函数;

Caching

把数据缓存到内存中,是Spark的一个主要的特性。可以把大的数据集缓存到内存或硬盘上。在两个主要场景中可以使用缓存:

多次使用同样的RDD;避免多次恢复包含大量计算的RDD;

Checkpointing

当Spark会话结束时,缓存的RDD的生命周期也就结束了。如果想在程序间使用同样的RDD,则需要使用Checkpointing,这个操作可以把RDD内容存储到磁盘上

分区

分区决定应用程序的并行程度。正确的设置分区,可以显著的提高spark jobs的性能,可以通过两种方式控制RDD的分区:

repartition和coalesce

用于对已经存在的RDD的分区进行变更。repartition用于增减分区的数目。coalescs只是用来减少分区数目。在大部分情况下,coalescs不会涉及到数据的流动、而repartition会涉及到,所以通常来说,coalescs的性能要比repartition好

partitionBy

所有涉及到数据流动的操作,都会接收一个额外的参数:并行度。这个参数允许用户为新产生的RDD指定分区数。

缺点

RDD代码比较难理解RDD不能被Spark优化RDD在非JVM的环境中比较慢

DataFrame和Dataset

DataFrame

DataFrame是RDD的抽象,以行和列的方式组织分布式数据。DataFrame的特性包括:

DF可以处理不同格式和不同存储介质中的数据;DF可以处理KB到PB级的数据;可以使用Spark-SQL查询优化器来处理数据;支持多种语言;

创建

val salesDF = spark.read.option("sep", "\t").option("header", "true").csv(filePath) salesDF.show

操作和相关的功能

printSchema

以树形结构显示DF的映射关系

select

从DF中选择一些列

filter

基于某些条件,从DF中过滤出一些行。

salesDF.filter($"id" < 50).show(50)
groupBy

基于某些列,把行形成组,然后应用一些汇总函数,如:count()、agv()等

salesDF.groupBy("ip").count().show

在DF上运行SQL

DF允许在数据上直接运行SQL,为了运行SQL,我们需要在DF上创建临时view,这些view可以分为local和global view。

salesDF.createOrReplaceTempView("sales") val sqlDF = spark.sql("select * from sales") sqlDF.show

临时View只在创建它的session中有效,如果希望view在多个session间有效,需要使用Global Temporary Views。

Dataset

Dataset是强类型的对象集合。在Dataset上执行的操作包括Transformation和Action:

Transformation:用于产生新的dataset,包括:Map、FlatMap、Filter、Select和Aggregate等;action:对dataset进行计算,返回转换的结果,包括:count、show和save等; import org.apache.spark.sql.types._ import org.apache.spark.sql.Encoders case class Sales (id: Int, firstname: String,lastname: String,address: String,city: String,state: String,zip: String,ip: String,product_id: String,date_of_purchase: String) val salesDs = spark.read.option("sep", "\t").option("header", "true").csv(filePath).withColumn("id", 'id.cast(IntegerType)).as[Sales]

DS和DF的区别

使用DS时,需要定义一个和列对应的类型;如果解释的类型和实际类型不符时,需要使用withColumn进行类型转换;

如何处理类型不匹配

Permissive:这是mode的缺省值,如何数据类型不匹配,则数据域的值为null;DROPMALFORMED:当数据类型不匹配时,会跳过相应的记录;FAILFAST:当数据类型发生不匹配时,会取消进一步的处理;

Encoders

通过import spark.implicits._可以引入Encoders。

Encoder的主要特性:

快速串行化;支持语义结构化的数据;

Spark SQL

Spark SQL是通过SchemaRDD对数据进行的一种抽象,允许通过schema对dataset进行定义,然后使用sql进行查询。

Spark metastore

为了存储数据库、表名和schema,Spark会在启动spark sql的相同位置创建一个metastore.db的缺省数据库。spark也可以利用Hive metastore。

SQL语句

数据库

创建:create database if not exists mydb

显示数据库列表:show databases;

查看数据库详情:describe database [extended] mydb;

切换数据库:use mydb;

删除数据库:drop database if exists mydb [restrict|cascade];

restrict:删除非空数据库,抛出异常;

cascade:同时删除所有的表;

Table和View

创建表

create [external] table [if not exists] [mydb.]mytable [(col_name1:col_type1)] --[partitioned by (col_name2:col_type2)] [row format row_format] [stored as file_format] [location path] [tblproperties (key1=val1,key2=val2,...)] [as select_statement]

创建视图

create [or replace] view mydb.myview [(col1_name, col2_name)] [tblproperties (key1=val1,...)] as select ... 显示表的信息 describe mydb.mytable 重命名 alter tableview mydb.mytable rename to mydb.mytable1 设置属性 alter table|view mydb.mytable set tblproperties(k=v,...) 删除属性 alter table|view mydb.mytable unset tblproperties if exists (k1,k2,...) 删除表 drop table mydb.mytable 清空表 truncate table mytable

加载数据

从本地文件加载数据:load data local inapt ‘local_path’ into table mytable;从hdfs加载数据:load data inpth ‘hdfs_path’ into table mytable;把数据加载到表的一个分区上:load data [local] inpath ‘path’ into table mytable partition(part_col1_name=val1)

Streaming,Machine Learning 和Graph Analysis

Spark Streaming

Spark Stream支持的数据源:

KafkaFlumeKinesisZeroMQTwitter

Machine Learning

MLlib:使用RDD进行机器学习;ML:使用DF进行机器学习;

ML

ml的pipeline api提供了两个主要的核心特性:

Transformer:产生新的DF;Estimator:使用DF来产生一个新的Transformation;

一个pipeline就是这些Transformer和Estimator的集合。例如:

package demo import org.apache.spark.ml.Pipeline import org.apache.spark.sql.SparkSession import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.DecisionTreeRegressor object Demo { def main(args:Array[String]): Unit = { val spark = SparkSession.builder().appName("ML").getOrCreate() val dataPath = "/Users/yangxiaochen/temp/a/Chapter07/ml/dt.data" val dataDF = spark.read.format("libsvm").load(dataPath) val Array(trainingDF, testDataDF) = dataDF.randomSplit(Array(0.7, 0.3)) val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(dataDF) val dtModel = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val pipeLine = new Pipeline().setStages(Array(featureIndexer, dtModel)) val model = pipeLine.fit(trainingDF) val predictionsDF = model.transform(testDataDF) predictionsDF.select("prediction", "label", "features").show(20) } }

处理图

一个graph是由一组点和边组成。一个点是一个对象,边定义了两个点之间的关系。图可以分为有向图和无向图。在Spark的图lib中使用的是一种叫做property graph的结构。在这个结构中,每个点都是一个被赋予了一些属性的对象,用于提供一些信息。边也是被赋予了一些属性,如:权重、关系等的对象。为了处理图,Spark提供了两个库:

GraphX

是一个底层的API,用于处理RDD。

GraphFrames

用于处理DataFrames。这个库是一个外部的包,需要从https://spark-packages.org/package/graphframes/graphframes进行下载。

图算法
PageRanktriplet countconnected componentsbreadth-first search

优化

衡量Job性能的两个要素:运行时间和存储空间;

集群级优化

内存

内存主要用于三个目的:

RDD存储Shuffle和汇总的存储用户代码

默认情况下:RDD占60%;Shuff和汇总的存储占20%;用户代码占20%;

在缓存数据时,要仔细原则存储的级别:MEMORY_ONLY会把数据保持在内存里,在这种情况下,如果RDD比较大,会引起分区的重新计算。

硬盘

Spark使用硬盘临时存储shuffle数据。在独立运行和结合Mesos使用时,这个位置是由SPARK_LOCAL_DIRS变量进行配置的。在YARN情况下,Spark会继承YARN的配置。

CPU核数

计算公式如下:

总核数:每台核数 * 台数

为YARN等保留的核数:1 * 台数

executor的数目:(总核数 - 为YARN等保留的核数)/ 台数 (需要保留一个用于application master)

在Spark driver执行过程中,可以启用dynamic executor分配,这样可以提高资源的利用率。这个特性可以通过spark.dynamicAllocation.*的相关属性进行配置。也可以通过driver-memory配置分配给driver的内存。

Project Tungsten

Project Tungsten主要用是于优化的,在以前分布式计算的瓶颈是硬盘IO和网络带宽。但是近些年CPU和内存变成了新的瓶颈。Project Tungsten主要为spark程序在CPU和内存方面提供优化,优化主要体现在以下几个方面:

内存管理:减少jvm对象,减少内存消耗和gc;二进制处理:以二进制的方式,而非JVM对象的方式处理数据;Code generation

可以通过spark.sql.tungsten.enabled属性控制是否启动Project Tungsten。

应用程序优化

语言选择

如果在DF层面上进行操作,则语言的选择关系不大;如果需要操作RDD,则应该选择Scala或Java。

结构化 vs 非结构化API

计算量很大的情况下应该优先使用DF,如果需要对程序有更好的控制,则应选择RDD。

文件格式的选择

文件格式的选择原则:

Binary vs textSplittable vs non-splittableColumn vs Row based

Binary会提高存储和网络传输的效率。Spark推荐使用Parquet和ORC格式存储数据。

RDD优化

选择正确的transformation

一个主要的优化方式是避免data shuffle。

DF和DS的优化

Catalyst optimizer

为结构化API提供性能优化。

Spark体系结构和应用程序执行流

DAG Constructor

DAG Constructor的职责是把一个JOB分解为一组stage。

Stage

stage是一组不需要移动数据就可以一起执行的task的集合。可以在DF或RDD上调用explain方法来查看stage信息。

Task

一个task就是一个分区的数据和一组transformation的集合。task有两种类型:

Shuffle Map task:它执行transformation,创建新的数据分区;Result task:用于返回结果;

Task scheduler

职责是基于可用的核数、数据位置,结合executor来调度tasks。可以通过配置spark.task.cpus属性来指定task scheduler的行为,spark提供了两种策略:

FIFO:默认的调度策略。FAIR:这个策略会给所有的task以相等的资源;

应用程序执行模式

Local mode:在同一台机器上运行driver和executor,如: spark-submit --master local example.py Client mode:driver运行在job提交的节点上,executor运行在集群节点上,如: spark-submit --master yarn --deploy-mode client --num-executor 3 --executor-memory 2g --total-executor-cores 1 example.py Cluster mode :与client mode相似,但driver运行在集群节点上,如: spark-submit --master yarn --deploy-mode cluster --num-executor 3 --executor-memory 2g --total-executor-cores 1 example.py

–master的可用选项

Master URL含义local以一个线程的方式,本地运行spark(没有并行)local[K]以K个线程的方式,本地运行sparklocal[K,F]以K个线程的方式,本地运行spark,最大失败数为Flocal[*]以与本地逻辑核数相同的数目,本地运行sparklocal[*,F]以与本地逻辑核数相同的数目,本地运行spark,最大失败数为Fspark://HOST:PORT连接一个独立运行的spark集群,port是master配置的要使用的端口号,默认是7077spark://HOST1:PORT,HOST2:PORT连接一个zookeeper托管的独立运行的spark集群mesos://Host:Port连接一个Mesos集群。port缺省是5050yarn连接到一个yarn集群,集群地址应通过HADOOP_CONF_DIR或YARN_CONF_DIR环境变量指定K8s://host:port连接一个kubernetes集群。

应用程序监控

Spark UI:默认端口4040,可以通过spark.ui.port属性进行修改;应用程序日志:可以使用日志库(logging in python或log4j in scala/java)来记录应用程序日志;外部监控方案:可以使用Graphite、Ganglia、ELK或Prometheus。

资料

Hortonworks和Cloudera提供了Spark sandboxDatabricks提供了MLFlow用于快速开发ML model, https://databricks.com/mlflow
最新回复(0)