Spark基本使用-DataFrame

tech2024-06-23  101

部分内容已更新,请见:https://editor.csdn.net/md/?articleId=108562743

1. 基本使用说明

Spark是一个计算框架,其最简单的对数据的处理方法是对DataFrame,虽然功能有限,但是对于非特大的数据量,也基本上够用了,故此处记录DataFrame的处理方法;

创建DataFrame的通用前提及方法; 创建SparkSession的实例;由SparkSession的实例去创建DataFrame; 操作DataFrame的两种方式; 类pandas方法;类SQL方法; 简单的性能优化;写好脚本后,如何提交; 用spark-submit提交的方法;

2. 创建DataFrame的通用前提及方法

必须先创建SparkSession,再通过SparkSession来得到一个或多个DataFrame。主要原因如下:

Spark是一个集群计算框架,其中需要用到通信,故使用通用的名称Session来代表含有DataFrame的应用程序。获取SparkSession的实例之后,才能过其拥有的方法来创建DataFrame,同时SparkSession也会通过创建各种Schedule来管理对DataFrame的操作;

创建SparkSession实例 的最简示例:

from pyspark.sql import SparkSession spark = SparkSession.builder.appName("app_name").getOrCreate()

SparkSession实例创建DataFrame 的示例:

从文件获取: df = spark.read.format("csv").option("header", "true").schema(STRUCTED_TYPE).load("file://file_path") 从数据库获取:见下方示例;

特别说明**.schema(STRUCTED_TYPE)**:

因为SparkSession实例创建DataFrame时,并无法从数据源中获得DataFrame各列的数据类型,而在对DataFrame处理时必然需要类型,所以需要专门为DataFrame各列赋予类型,有以下两种方式: .option("inferSchema", "true"):Spark根据DataFrame的第一行数据来推断各列的数据类型,因为存在推断错误的可能,不推荐;.schema(STRUCTED_TYPE):要求人工先设定好各列的数据类型,且该类型是Spark的数据类型,再用.schema()传入,具体数据类型见《spark权威指南》P58;

来源:

http://spark.apache.org/docs/latest/api/python/index.html

2.1. Spark从数据库获取数据

要求先下载对应的mysql-connect.jar(这个是mysql的驱动,Spark需要有这个驱动才能连接mysql),然后将其放入jar文件夹中;在spark-env.sh中增加export EXTRA_SPARK_CLASSPATH=spark路径/jars from pyspark.sql import SparkSession from pyspark.sql import SQLContext # 连接spark spark = SparkSession.builder.appName('connect_to_mysql')\ .config('spark.executor.extraClassPath', 'spark路径/*')\ .config('spark.driver.extraClassPath', 'spark路径/*')\ .getOrCreate() # 连接mysql(jdbc:mysql://IP:端口/数据库?&useSSL=false) url = 'jdbc:mysql://IP:端口/数据库名?&useSSL=false' properties = {'user': 'username', 'password': 'password'} # 查看需数据(注意必须写成:(mysql语句) temp,才可以,修改为其它形式则不行) table = "(select * from 数据库名.表 limit 1) temp" # 写sql # sql_test = 'select * from test_hjp limit 1' df = spark.read.jdbc(url=url, table=table, properties=properties) df.show() +-------+----------+----------+ |user_id|start_date| end_date| +-------+----------+----------+ | A|2019-01-03|2020-01-03| +-------+----------+----------+

2.2. Spark写入数据库

df.write.mode("append").jdbc(url, 'customer_alive_count', properties=properties)

3. 操作DataFrame的两种方式

前提说明: 使用类SQL方法时,需要先将df注册成表或者视图:df.createOrReplaceTempView(view_name); select功能: 类pandas方法:df.selectExpr("col(column_1) + 1, column_2")类SQL方法:spark.sql("select column_1 + 1, column_2 from view_name") distinct功能: 类pandas方法:df.select("column_1").distinct()类SQL方法:spark.sql("select distinct column_1 from view_name") limit功能: 类pandas方法:df.select("column_1").limit(n)类SQL方法:spark.sql("select column_1 from view_name limit n") 排序功能: 类pandas方法: df.sort(expr("column_1 asc"))df.orderBy(expr("column_1 asc"))df.orderBy(asc("column_1")) 类SQL方法:spark.sql("select * from view_name order by column_1 asc") 过滤功能: 类pandas方法:df.where("column_1 < 1").where("column_2 < 2")或df.filter(col("column_1") < 1).filter(col("column_2") < 2)类SQL方法:spark.sql("select * from view_name where column_1 < 1 and .filter(col("column_2") < 2)") like功能: 类pandas方法:df.where("column_1 like '%s'")类SQL方法:spark.sql("select * from view_name where column_1 like '%s' join功能: 类pandas方法:df1.join(df2, df1.column1==df2.column2, 'inner')类SQL方法:spark.sql("select * from view_1 inner join view_2 on column_1") 行拼接功能: 类pandas方法:df1.select('column_1', 'column_2').unionAll(df2.select('column_1', 'column_2'))类SQL方法:spark.sql("select column_1, column_2 from view_1 union/union all select column_1, column_2 from view_2") 列分组功能: 类pandas方法:df.groupBy("column_1, column_2")类SQL方法:spark.sql("select column_1 from view group by column_1, column_2") 聚合运算及having功能: 类pandas方法:df.groupBy("column_1").sum("column_2").where("sum(column_2) > 20")类SQL方法:spark.sql("select column_2 from view group by column_1 having sum(column_2) > 20") 分组集功能: 类pandas方法: .rollup(column_1, column_2):相当于将 按column_1聚合得到的结果 跟 按column_2聚合得到的结果 union起来;column_1也可以用(column_1, column_3)来代替,做到按 column_1-column_3聚合得到结果;rollup_df = df.rollup(列1,列2).agg(sum(列3)).show().cube(column_1, column_2):比.rollup(column_1, column_2)更进阶,在.rollup()的基础上,还多出来一个结果,即按column_1-column_2分类的结果; 类SQL方法:group by ... grouping sets (column_1, column_2),作用同.rollup(); 字段更名: 类pandas方法:df.withColumnRenamed("old_colname", "new_colname")类SQL方法:select old_colname as new_colname from xxx

4. 简单的性能优化

4.1. 间接性能优化

使用的语言: 简单说明:scala和java性能优于python,主要原因在于数据的序列化和反序列化;详细说明:如果用python写用户自定义函数UDF,当Spark调用该UDF,需要将Spark中的数据先序列化python支持的数据类型,再由python去对该数据进行处理;python处理完该数据后,Spark需要对python获得的数据结果进行反序列化得到Spark支持的数据类型;另一角度:如果未调用UDF,数据全部在Spark内部,则使用python跟使用scala的性能一样; 调度: 设置--max-executor-cores来设置程序所用的最大执行核心数量;设置spark.scheduler.mode为FAIR来公平调度; 表分区/分桶: 存储时按照进行过滤的字段进行分区(非连续型字段)或分桶(连续型字段);

4.2. 直接性能优化

并行度:最好是为每个Core安排2~3个任务; spark.default.parallelism:设置任务并行度;spark.sql.shuffle.partitions:设置shuffle后分区数量,避免因为shuffle后默认的分区数过多导致调度次数过多或过少,或导致调度浪费,或导致计算资源未充分利用; 过滤优化:尽可能将过滤提前,较少后面处理的数据量(此处使用DataFrame API时无须考虑,Spark引擎会自行优化);重分区和合并: 重分区.repartition(分区列,分区数):shuffle的时候将数据按照经常过滤的字段进行分区;合并.coalesce(分区数):减少分区的量; 缓存: .cache():因为Spark的转换惰性特性,所以当某部分数据会被多个结果数据所依赖时,需手动将其缓存,较少每个结果数据都需要去计算得到该部分数据;

5. spark-submit提交任务

当用python写好一个spark应用程序application时,有两种运行方式: 激活python虚拟环境,然后运行application,此种方式要求在代码里面写好SparkConf(SparkSession的配置)的各种配置;使用spark-submit提交任务: 简单示例:./bin/spark-submit --master local ./examples/src/main/python/pi.py具体参数见:《spark权威指南》P270;

6. 问题汇总

读取txt文件: spark.read.format("csv").option().load(txt_path)spark.read.format("text").option().load(txt_path) 以下两个句子功能相同: SQL实现: spark.sql("""select a.City, f.origin, sum(f.delay) as Delays from flightperformance f join airports a on a.IATA = f.origin where a.State = "WA" group by a.City, f.origin order by sum(f.delay) desc """).show() DataFrame实现: from pyspark.sql.functions import * flightperf.join( airports.filter(col("State")=="WA"), flightperf.origin==airports.IATA, "inner" ).groupBy("City", "origin").sum("delay").select( "City", "origin", "sum(delay)" ).withColumnRenamed("sum(delay)", "Delays").orderBy( desc("sum(delay)")).show() 展示df的数据有三种: .collect():以list形式返回所有行;.take(n):必填n,以list形式返回n行;.show(n):以图表的形式展示n行,可无n,无n则默认20行; 判断两个DataFrame内的数据是否相同(不考虑位置,只考虑元素): 通过做差集:df1.subtract(df2).count()结果为0即为相同;顺便提到交集:df1.intersect(df2)顺便提到并集:df1.union(df2)
最新回复(0)