Spark指南——第三章:SparkSQL编程—— DataFrame(2)

tech2023-03-03  106

SparkSQL编程—— DataFrame(2)

一、SparkSession二、DataFrame1.创建DataFrame①通过数据源创建DataFrame②从RDD转换创建DataFrame1.方式一:Case Class方式2.方式二:createDataFrame方式 ③从Hive Table查询创建 三、使用SQL风格编程1.对DataFrame创建一个临时表2.对创建的people表进行SQL查询 四、使用DSL(Domain Specific Language)风格编程1.查看Schema2. 指定列查询① 指定一列② 指定多列③ 多种方式引用列④ expr访问列⑤ selectExpr 3.限制条件查询4.分组查询5.添加列6.删除列7.去重统计8.排序9.限制行数10.其他操作 五、where to go

一、SparkSession

SparkSession,SQLContext和HiveContext在早期的Spark版本中,SQLContext和HiveContext提供了使用DataFrame和SparkSQL的功能,并且通常在示例、文档和遗留代码中以变量名sqlContext存储。曾经在Spark 1.X版本中有两个有效的上下文类,SparkContext和SQLContext,它们各自负责不同的功能。 前者侧重于更精细地控制Spark的核心抽象,而后者侧重于Spark SQL等更高级别的工具。 在Spark 2.X版本中,社区将这两个API组合成了我们今天所使用的SparkSession。但是,这两个API仍然存在,你可以通过SparkSession访问它们。需要注意的是,你应该永远不需要使用SQLContext,并尽量避免使用SparkContextSparkSession是Spark最新的SQL查询起始点,是SQLContext和HiveContext的组合**,所以在SQLContext和HiveContext上可用的API,在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,计算实际上是由sparkContext完成的。推荐使用SparkSession。应用程序开发使用SparkSeesion代码如下:import org.apache.spark.sql.SparkSession object WordCount{ def main(args:Array[String]): Unit ={ val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ //重要 //read file,create df val df = spark.read.json("/usr/local/spark/examples/src/main/resources/people.json") //val df = spark.read.format("json").load("path") // do some other logic action //stop spark spark.stop() } } 如果在本地使用spark-shell操作,直接使用"spark"可以了,例如读取文件数据,创建df:spark.read.json("/usr/local/spark/examples/src/main/resources/people.json") 或者spark.read.format("json").load("xxx/people.json") people.json文件内容如下:{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}

二、DataFrame

1.创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:①通过Spark的数据源进行创建;② 从一个存在的RDD进行转换;③还可以从Hive Table进行查询返回。

①通过数据源创建DataFrame

Spark支持一下格式的数据源:csv 、jdbc、parquet 、textFile 、json、orc 、textval df = spark.read.json("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")``` val df = spark.read.format("json").load("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json") 显示结果:df.show 显示Schema:df.printSchema

②从RDD转换创建DataFrame

Spark提供了两种方式将RDD转换成DataFrames:

1.通过定义case class,使用反射推断Schema (case class方式)。 2.通过可编程接口,定义Schema,并应用到RDD上( createDataFrame方式)。

前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。

如果需要RDD与DF或者DS之间操作,那么都需要引入import spark.implicits._(spark不是包名,而是sparkSession对象的名称)

1.方式一:Case Class方式

首先看看不定义Case Class。 在清楚数据源结构情况下,可以手动转换:

import spark.implicits._ val peopleRDD = sc.textFile("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json") peopleRDD.map{ x => val para = x.split(" ") (para(0),para(1).trim.toInt) // para(0)-->"name", para(1)-->"age" }.toDF("name","age") // RDD ---toDF---> DF

通过样例类case class转换

import spark.implicits._ case class People(name: String, age: Int) val peopleRDD = sc.textFile("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json") peopleRDD.map{ x => val para = x.split(" ") (para(0),para(1).trim.toInt) }.toDF

调用toDF后,编译器会通过隐式转换,去寻找是否有接受两个参数的样例类,然后通过样例类给RDD赋值。

2.方式二:createDataFrame方式
createDataFrame方式比较复杂,通常有3步过程: 从源RDD创建rowRDD。 ① 导入需要的类型Rowimport org.apache.spark.sql.Row ②创建给定类型的rowRDDval rowRDD = peopleRDD.map{ x => val para = x.split(",") Row(para(0), para(1).trim.toInt) } 创建与rowRDD 匹配的Schema。 ①导入需要的类型import org.apache.spark.sql.types._ ②创建与rowRDD 匹配的Schemaval structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil) 将Schema 通过 createDataFrame 应用到 rowRDDval dataFrame = spark.createDataFrame(rowRDD, structType) 完成代码以及执行结果:import org.apache.spark.sql.Row import org.apache.spark.sql.types._ val rowRDD = peopleRDD.map{ x => val para = x.split(",") Row(para(0), para(1).trim.toInt) } val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil) /* 需要使用List 或者 Array存储 代码等价于 StructType( List( StructField("name", StringType), StructField("age", IntegerType) ) ) 或者 StructType( Array( StructField("name", StringType), StructField("age", IntegerType) ) ) */ val dataFrame = spark.createDataFrame(rowRDD, structType)

③从Hive Table查询创建

todo

三、使用SQL风格编程

DataFrame创建好以后,就可以对其进一步编程,得到我们想要的结果。SparkSQL提供了两种风格的编程方式:SQL风格 和 DSL风格。开发中,推荐使用SQL风格,使用简单,将重点放在SQL业务逻辑,无需过多的关注Spark的语法。

1.对DataFrame创建一个临时表
createTempView : 创建一个当前Session范围内的临时表,Session退出后表就会失效。createGlobalTempView:创建一个全局范围的临时表。createOrReplaceTempView:创建或者替换原有的临时表(当前Session范围内)。createOrReplaceGlobalTempView:创建或者替换原有临时表(全局范围)。注意使用全局表时需要全路径global_temp访问,如:global_temp.people_info df.createOrReplaceGlobalTempView("people_info")
2.对创建的people表进行SQL查询

使用spark.sql开启查询窗口 spark.sql(select * from global_temp.people_info ).show

限制条件查询: spark.sql("select * from global_temp.people_info where age > 10").show

如果全局表不加全局路径global_temp,查询会报错,找不到people_info:

新创建另一个SparkSeesion,也能查询到全局表people_info: spark.newSession().sql("select * from global_temp.people_info").show

四、使用DSL(Domain Specific Language)风格编程

1.查看Schema

df.printSchema

2. 指定列查询

① 指定一列
df.select("name").show():查询的列用引号" "括起来
② 指定多列

df.select(df.col("name"),df.col("age")).show()或 df.select("name","age").show() 或df.select("*").show()

③ 多种方式引用列
你可以通过多种不同的方式引用列,而且这些方式可以等价互换:import org.apache.spark.sql.functions.{expr, col, column} df.select( df.col("NAME"), col("NAME"), column("NAME"), 'NAME, //不能有空格 $"NAME", expr("NAME")).show() 需要注意的是,如果使用字符串双引号的访问方式,不能和上述方法一起使用: df.select("name",df.col("name")).show //错误的使用方式
④ expr访问列
expr是我们目前使用到的最灵活的引用方式。它能够引用一列,也可以引用对列进行操纵的字符串表达式。例如:添加列的别名。df.select(expr("name as p_name")).show()
⑤ selectExpr
因为select后跟着一系列expr是非常常见的写法,所以Spark有一个有效地描述此操作序列的接口:selectExpr,它可能是最常用的接口。这是Spark最强大的地方,我们可以利用selectExpr构建复杂表达式来创建DataFrame。添加别名 df.selectExpr("name as p_name", "name").show() 使用聚合函数 df.selectExpr("avg(age)", "count(distinct(name)) as cnt").show()

3.限制条件查询

有两种实现过滤的方式,分别是where和filter,它们可以执行相同的操作,接受相同参数类型。

df.filter("age>0").filter('age>1).filter(col("age")>2).filter($"age">20).show()df.where("age>0").where('age>1).where(col("age")>2).where($"age">20).show()

我们可能本能地想把多个过滤条件放到一个表达式中,尽管这种方式可行,但是并不总有效。 因为Spark会同时执行所有过滤操作,不管过滤条件的先后顺序,因此当你想指定多个AND过滤操作时, 只要按照先后顺序以链式的方式把这些过滤条件串联起来

4.分组查询

df.groupBy("age").count().show()

5.添加列

有两种常用的添加列的方式:字面量(literal)和 WithColumn

① 字面量(literal):有时候需要给Spark传递显式的值,它们只是一个值而非新列。这可能是一个常量值,或接下来需要比较的值。我们的方式是通过字面量(literal)传递。当你需要比较一个值是否大于一个常量或者程序创建的变量时,推荐使用字面量(literal)import org.apache.spark.sql.functions.lit df.select(expr("*"),lit(1).alias("new_one")).show() ② withColumn: 使用WithColumn可以为DataFrame增加新列,这种方式更为正式一些。 df.select(expr("*")).withColumn("new_one",lit(1)).show()

6.删除列

我们可以通过select实现。但是也可以使用drop方法来删除列。 df.drop("name")

7.去重统计

使用distinct去重,使用count() df.select("name").distinct().count() //结果输出 res32: Long = 3

8.排序

当对DataFrame中的值进行排序时,可以使用sort和orderBy方法,两者相互等价的操作,执行的方式也一样。接收列表达式和字符串,多个列。默认设置是按升序排序,若要更明确地指定升序或是降序,则需使用asc函数和desc函数:

import org.apache.spark.sql.functions.{desc, asc} df.sort("name").show() df.orderBy($"name",desc("age")).show()

9.限制行数

使用limit()、show()、take()限制从DataFrame中提取的内容。 df.limit(2).show() df.show(2) df.take(2)

10.其他操作

上面学习了DataFramef常用的操作,这只是冰山一角。DataFrame还至此很多其他操作,几乎支持了sql、hql所有的语法,功能十分强大。但是实际工作开发中,更多使用SQL风格编程,而不是DSL风格编程,因此不再一一列举,以后如有使用,再记录更新。

五、where to go

第三章:SparkSQL编程——DataSet(2)

最新回复(0)