部分内容已更新,请见:https://editor.csdn.net/md/?articleId=108562743
1. 基本使用说明
Spark是一个计算框架,其最简单的对数据的处理方法是对DataFrame,虽然功能有限,但是对于非特大的数据量,也基本上够用了,故此处记录DataFrame的处理方法;
创建DataFrame的通用前提及方法;
创建SparkSession的实例;由SparkSession的实例去创建DataFrame; 操作DataFrame的两种方式;
类pandas方法;类SQL方法; 简单的性能优化;写好脚本后,如何提交;
用spark-submit提交的方法;
2. 创建DataFrame的通用前提及方法
必须先创建SparkSession,再通过SparkSession来得到一个或多个DataFrame。主要原因如下:
Spark是一个集群计算框架,其中需要用到通信,故使用通用的名称Session来代表含有DataFrame的应用程序。获取SparkSession的实例之后,才能过其拥有的方法来创建DataFrame,同时SparkSession也会通过创建各种Schedule来管理对DataFrame的操作;
创建SparkSession实例 的最简示例:
from pyspark
.sql
import SparkSession
spark
= SparkSession
.builder
.appName
("app_name").getOrCreate
()
SparkSession实例创建DataFrame 的示例:
从文件获取: df
= spark
.read
.format("csv").option
("header", "true").schema
(STRUCTED_TYPE
).load
("file://file_path")
从数据库获取:见下方示例;
特别说明**.schema(STRUCTED_TYPE)**:
因为SparkSession实例创建DataFrame时,并无法从数据源中获得DataFrame各列的数据类型,而在对DataFrame处理时必然需要类型,所以需要专门为DataFrame各列赋予类型,有以下两种方式:
.option("inferSchema", "true"):Spark根据DataFrame的第一行数据来推断各列的数据类型,因为存在推断错误的可能,不推荐;.schema(STRUCTED_TYPE):要求人工先设定好各列的数据类型,且该类型是Spark的数据类型,再用.schema()传入,具体数据类型见《spark权威指南》P58;
来源:
http://spark.apache.org/docs/latest/api/python/index.html
2.1. Spark从数据库获取数据
要求先下载对应的mysql-connect.jar(这个是mysql的驱动,Spark需要有这个驱动才能连接mysql),然后将其放入jar文件夹中;在spark-env.sh中增加export EXTRA_SPARK_CLASSPATH=spark路径/jars
from pyspark
.sql
import SparkSession
from pyspark
.sql
import SQLContext
spark
= SparkSession
.builder
.appName
('connect_to_mysql')\
.config
('spark.executor.extraClassPath', 'spark路径/*')\
.config
('spark.driver.extraClassPath', 'spark路径/*')\
.getOrCreate
()
url
= 'jdbc:mysql://IP:端口/数据库名?&useSSL=false'
properties
= {'user': 'username', 'password': 'password'}
table
= "(select * from 数据库名.表 limit 1) temp"
df
= spark
.read
.jdbc
(url
=url
, table
=table
, properties
=properties
)
df
.show
()
+-------+----------+----------+
|user_id|start_date| end_date|
+-------+----------+----------+
| A|2019-01-03|2020-01-03|
+-------+----------+----------+
2.2. Spark写入数据库
df
.write
.mode
("append").jdbc
(url
, 'customer_alive_count', properties
=properties
)
3. 操作DataFrame的两种方式
前提说明:
使用类SQL方法时,需要先将df注册成表或者视图:df.createOrReplaceTempView(view_name); select功能:
类pandas方法:df.selectExpr("col(column_1) + 1, column_2")类SQL方法:spark.sql("select column_1 + 1, column_2 from view_name") distinct功能:
类pandas方法:df.select("column_1").distinct()类SQL方法:spark.sql("select distinct column_1 from view_name") limit功能:
类pandas方法:df.select("column_1").limit(n)类SQL方法:spark.sql("select column_1 from view_name limit n") 排序功能:
类pandas方法:
df.sort(expr("column_1 asc"))df.orderBy(expr("column_1 asc"))df.orderBy(asc("column_1")) 类SQL方法:spark.sql("select * from view_name order by column_1 asc") 过滤功能:
类pandas方法:df.where("column_1 < 1").where("column_2 < 2")或df.filter(col("column_1") < 1).filter(col("column_2") < 2)类SQL方法:spark.sql("select * from view_name where column_1 < 1 and .filter(col("column_2") < 2)") like功能:
类pandas方法:df.where("column_1 like '%s'")类SQL方法:spark.sql("select * from view_name where column_1 like '%s' join功能:
类pandas方法:df1.join(df2, df1.column1==df2.column2, 'inner')类SQL方法:spark.sql("select * from view_1 inner join view_2 on column_1") 行拼接功能:
类pandas方法:df1.select('column_1', 'column_2').unionAll(df2.select('column_1', 'column_2'))类SQL方法:spark.sql("select column_1, column_2 from view_1 union/union all select column_1, column_2 from view_2") 列分组功能:
类pandas方法:df.groupBy("column_1, column_2")类SQL方法:spark.sql("select column_1 from view group by column_1, column_2") 聚合运算及having功能:
类pandas方法:df.groupBy("column_1").sum("column_2").where("sum(column_2) > 20")类SQL方法:spark.sql("select column_2 from view group by column_1 having sum(column_2) > 20") 分组集功能:
类pandas方法:
.rollup(column_1, column_2):相当于将 按column_1聚合得到的结果 跟 按column_2聚合得到的结果 union起来;column_1也可以用(column_1, column_3)来代替,做到按 column_1-column_3聚合得到结果;rollup_df = df.rollup(列1,列2).agg(sum(列3)).show().cube(column_1, column_2):比.rollup(column_1, column_2)更进阶,在.rollup()的基础上,还多出来一个结果,即按column_1-column_2分类的结果; 类SQL方法:group by ... grouping sets (column_1, column_2),作用同.rollup(); 字段更名:
类pandas方法:df.withColumnRenamed("old_colname", "new_colname")类SQL方法:select old_colname as new_colname from xxx
4. 简单的性能优化
4.1. 间接性能优化
使用的语言:
简单说明:scala和java性能优于python,主要原因在于数据的序列化和反序列化;详细说明:如果用python写用户自定义函数UDF,当Spark调用该UDF,需要将Spark中的数据先序列化python支持的数据类型,再由python去对该数据进行处理;python处理完该数据后,Spark需要对python获得的数据结果进行反序列化得到Spark支持的数据类型;另一角度:如果未调用UDF,数据全部在Spark内部,则使用python跟使用scala的性能一样; 调度:
设置--max-executor-cores来设置程序所用的最大执行核心数量;设置spark.scheduler.mode为FAIR来公平调度; 表分区/分桶:
存储时按照进行过滤的字段进行分区(非连续型字段)或分桶(连续型字段);
4.2. 直接性能优化
并行度:最好是为每个Core安排2~3个任务;
spark.default.parallelism:设置任务并行度;spark.sql.shuffle.partitions:设置shuffle后分区数量,避免因为shuffle后默认的分区数过多导致调度次数过多或过少,或导致调度浪费,或导致计算资源未充分利用; 过滤优化:尽可能将过滤提前,较少后面处理的数据量(此处使用DataFrame API时无须考虑,Spark引擎会自行优化);重分区和合并:
重分区.repartition(分区列,分区数):shuffle的时候将数据按照经常过滤的字段进行分区;合并.coalesce(分区数):减少分区的量; 缓存:
.cache():因为Spark的转换惰性特性,所以当某部分数据会被多个结果数据所依赖时,需手动将其缓存,较少每个结果数据都需要去计算得到该部分数据;
5. spark-submit提交任务
当用python写好一个spark应用程序application时,有两种运行方式:
激活python虚拟环境,然后运行application,此种方式要求在代码里面写好SparkConf(SparkSession的配置)的各种配置;使用spark-submit提交任务:
简单示例:./bin/spark-submit --master local ./examples/src/main/python/pi.py具体参数见:《spark权威指南》P270;
6. 问题汇总
读取txt文件:
spark.read.format("csv").option().load(txt_path)spark.read.format("text").option().load(txt_path) 以下两个句子功能相同:
SQL实现:
spark
.sql
("""select a.City, f.origin, sum(f.delay) as Delays from flightperformance f
join airports a
on a.IATA = f.origin
where a.State = "WA"
group by a.City, f.origin
order by sum(f.delay) desc
""").show
()
DataFrame实现:
from pyspark
.sql
.functions
import *
flightperf
.join
(
airports
.filter(col
("State")=="WA"),
flightperf
.origin
==airports
.IATA
,
"inner"
).groupBy
("City", "origin").sum("delay").select
(
"City", "origin", "sum(delay)"
).withColumnRenamed
("sum(delay)", "Delays").orderBy
(
desc
("sum(delay)")).show
()
展示df的数据有三种:
.collect():以list形式返回所有行;.take(n):必填n,以list形式返回n行;.show(n):以图表的形式展示n行,可无n,无n则默认20行; 判断两个DataFrame内的数据是否相同(不考虑位置,只考虑元素):
通过做差集:df1.subtract(df2).count()结果为0即为相同;顺便提到交集:df1.intersect(df2)顺便提到并集:df1.union(df2)