SparkSQL编程—— DataFrame(2)
一、SparkSession二、DataFrame1.创建DataFrame①通过数据源创建DataFrame②从RDD转换创建DataFrame1.方式一:Case Class方式2.方式二:createDataFrame方式
③从Hive Table查询创建
三、使用SQL风格编程1.对DataFrame创建一个临时表2.对创建的people表进行SQL查询
四、使用DSL(Domain Specific Language)风格编程1.查看Schema2. 指定列查询① 指定一列② 指定多列③ 多种方式引用列④ expr访问列⑤ selectExpr
3.限制条件查询4.分组查询5.添加列6.删除列7.去重统计8.排序9.限制行数10.其他操作
五、where to go
一、SparkSession
SparkSession,SQLContext和HiveContext在早期的Spark版本中,SQLContext和HiveContext提供了使用DataFrame和SparkSQL的功能,并且通常在示例、文档和遗留代码中以变量名sqlContext存储。曾经在Spark 1.X版本中有两个有效的上下文类,SparkContext和SQLContext,它们各自负责不同的功能。 前者侧重于更精细地控制Spark的核心抽象,而后者侧重于Spark SQL等更高级别的工具。 在Spark 2.X版本中,社区将这两个API组合成了我们今天所使用的SparkSession。但是,这两个API仍然存在,你可以通过SparkSession访问它们。需要注意的是,你应该永远不需要使用SQLContext,并尽量避免使用SparkContextSparkSession是Spark最新的SQL查询起始点,是SQLContext和HiveContext的组合**,所以在SQLContext和HiveContext上可用的API,在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,计算实际上是由sparkContext完成的。推荐使用SparkSession。应用程序开发使用SparkSeesion代码如下:
import org
.apache
.spark
.sql
.SparkSession
object WordCount
{
def main
(args
:Array
[String]): Unit ={
val spark
= SparkSession
.builder
()
.appName
("Spark SQL basic example")
.config
("spark.some.config.option", "some-value")
.getOrCreate
()
import spark
.implicits
._
val df
= spark
.read
.json
("/usr/local/spark/examples/src/main/resources/people.json")
spark
.stop
()
}
}
如果在本地使用spark-shell操作,直接使用"spark"可以了,例如读取文件数据,创建df:spark.read.json("/usr/local/spark/examples/src/main/resources/people.json") 或者spark.read.format("json").load("xxx/people.json") people.json文件内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
二、DataFrame
1.创建DataFrame
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:①通过Spark的数据源进行创建;② 从一个存在的RDD进行转换;③还可以从Hive Table进行查询返回。
①通过数据源创建DataFrame
Spark支持一下格式的数据源:csv 、jdbc、parquet 、textFile 、json、orc 、text
val df
= spark
.read
.json
("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")```
val df
= spark
.read
.format
("json").load
("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")
显示结果:df.show 显示Schema:df.printSchema
②从RDD转换创建DataFrame
Spark提供了两种方式将RDD转换成DataFrames:
1.通过定义case class,使用反射推断Schema (case class方式)。 2.通过可编程接口,定义Schema,并应用到RDD上( createDataFrame方式)。
前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。
如果需要RDD与DF或者DS之间操作,那么都需要引入import spark.implicits._(spark不是包名,而是sparkSession对象的名称)
1.方式一:Case Class方式
首先看看不定义Case Class。 在清楚数据源结构情况下,可以手动转换:
import spark
.implicits
._
val peopleRDD
= sc
.textFile
("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")
peopleRDD
.map
{
x
=> val para
= x
.split
(" ")
(para
(0),para
(1).trim
.toInt
)
}.toDF
("name","age")
通过样例类case class转换
import spark
.implicits
._
case class People
(name
: String, age
: Int)
val peopleRDD
= sc
.textFile
("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")
peopleRDD
.map
{
x
=> val para
= x
.split
(" ")
(para
(0),para
(1).trim
.toInt
)
}.toDF
调用toDF后,编译器会通过隐式转换,去寻找是否有接受两个参数的样例类,然后通过样例类给RDD赋值。
2.方式二:createDataFrame方式
createDataFrame方式比较复杂,通常有3步过程:
从源RDD创建rowRDD。 ① 导入需要的类型Row
import org
.apache
.spark
.sql
.Row
②创建给定类型的rowRDD
val rowRDD
= peopleRDD
.map
{
x
=> val para
= x
.split
(",")
Row
(para
(0), para
(1).trim
.toInt
)
}
创建与rowRDD 匹配的Schema。 ①导入需要的类型
import org
.apache
.spark
.sql
.types
._
②创建与rowRDD 匹配的Schema
val structType
: StructType
= StructType
(StructField
("name", StringType
) :: StructField
("age", IntegerType
) :: Nil
)
将Schema 通过 createDataFrame 应用到 rowRDD
val dataFrame
= spark
.createDataFrame
(rowRDD
, structType
)
完成代码以及执行结果:
import org
.apache
.spark
.sql
.Row
import org
.apache
.spark
.sql
.types
._
val rowRDD
= peopleRDD
.map
{
x
=> val para
= x
.split
(",")
Row
(para
(0), para
(1).trim
.toInt
)
}
val structType
: StructType
= StructType
(StructField
("name", StringType
) :: StructField
("age", IntegerType
) :: Nil
)
val dataFrame
= spark
.createDataFrame
(rowRDD
, structType
)
③从Hive Table查询创建
todo
三、使用SQL风格编程
DataFrame创建好以后,就可以对其进一步编程,得到我们想要的结果。SparkSQL提供了两种风格的编程方式:SQL风格 和 DSL风格。开发中,推荐使用SQL风格,使用简单,将重点放在SQL业务逻辑,无需过多的关注Spark的语法。
1.对DataFrame创建一个临时表
createTempView : 创建一个当前Session范围内的临时表,Session退出后表就会失效。createGlobalTempView:创建一个全局范围的临时表。createOrReplaceTempView:创建或者替换原有的临时表(当前Session范围内)。createOrReplaceGlobalTempView:创建或者替换原有临时表(全局范围)。注意使用全局表时需要全路径global_temp访问,如:global_temp.people_info df.createOrReplaceGlobalTempView("people_info")
2.对创建的people表进行SQL查询
使用spark.sql开启查询窗口 spark.sql(select * from global_temp.people_info ).show
限制条件查询: spark.sql("select * from global_temp.people_info where age > 10").show
如果全局表不加全局路径global_temp,查询会报错,找不到people_info:
新创建另一个SparkSeesion,也能查询到全局表people_info: spark.newSession().sql("select * from global_temp.people_info").show
四、使用DSL(Domain Specific Language)风格编程
1.查看Schema
df.printSchema
2. 指定列查询
① 指定一列
df.select("name").show():查询的列用引号" "括起来
② 指定多列
df.select(df.col("name"),df.col("age")).show()或 df.select("name","age").show() 或df.select("*").show()
③ 多种方式引用列
你可以通过多种不同的方式引用列,而且这些方式可以等价互换:
import org
.apache
.spark
.sql
.functions
.{expr
, col
, column
}
df
.select
(
df
.col
("NAME"),
col
("NAME"),
column
("NAME"),
'NAME,
$
"NAME",
expr
("NAME")).show
()
需要注意的是,如果使用字符串双引号的访问方式,不能和上述方法一起使用: df.select("name",df.col("name")).show //错误的使用方式
④ expr访问列
expr是我们目前使用到的最灵活的引用方式。它能够引用一列,也可以引用对列进行操纵的字符串表达式。例如:添加列的别名。df
.select
(expr
("name as p_name")).show
()
⑤ selectExpr
因为select后跟着一系列expr是非常常见的写法,所以Spark有一个有效地描述此操作序列的接口:selectExpr,它可能是最常用的接口。这是Spark最强大的地方,我们可以利用selectExpr构建复杂表达式来创建DataFrame。添加别名 df.selectExpr("name as p_name", "name").show() 使用聚合函数 df.selectExpr("avg(age)", "count(distinct(name)) as cnt").show()
3.限制条件查询
有两种实现过滤的方式,分别是where和filter,它们可以执行相同的操作,接受相同参数类型。
df.filter("age>0").filter('age>1).filter(col("age")>2).filter($"age">20).show()df.where("age>0").where('age>1).where(col("age")>2).where($"age">20).show()
我们可能本能地想把多个过滤条件放到一个表达式中,尽管这种方式可行,但是并不总有效。 因为Spark会同时执行所有过滤操作,不管过滤条件的先后顺序,因此当你想指定多个AND过滤操作时, 只要按照先后顺序以链式的方式把这些过滤条件串联起来
4.分组查询
df.groupBy("age").count().show()
5.添加列
有两种常用的添加列的方式:字面量(literal)和 WithColumn
① 字面量(literal):有时候需要给Spark传递显式的值,它们只是一个值而非新列。这可能是一个常量值,或接下来需要比较的值。我们的方式是通过字面量(literal)传递。当你需要比较一个值是否大于一个常量或者程序创建的变量时,推荐使用字面量(literal)
import org
.apache
.spark
.sql
.functions
.lit
df
.select
(expr
("*"),lit
(1).alias
("new_one")).show
()
② withColumn: 使用WithColumn可以为DataFrame增加新列,这种方式更为正式一些。 df.select(expr("*")).withColumn("new_one",lit(1)).show()
6.删除列
我们可以通过select实现。但是也可以使用drop方法来删除列。 df.drop("name")
7.去重统计
使用distinct去重,使用count() df.select("name").distinct().count() //结果输出 res32: Long = 3
8.排序
当对DataFrame中的值进行排序时,可以使用sort和orderBy方法,两者相互等价的操作,执行的方式也一样。接收列表达式和字符串,多个列。默认设置是按升序排序,若要更明确地指定升序或是降序,则需使用asc函数和desc函数:
import org
.apache
.spark
.sql
.functions
.{desc
, asc
}
df
.sort
("name").show
()
df
.orderBy
($
"name",desc
("age")).show
()
9.限制行数
使用limit()、show()、take()限制从DataFrame中提取的内容。 df.limit(2).show() df.show(2) df.take(2)
10.其他操作
上面学习了DataFramef常用的操作,这只是冰山一角。DataFrame还至此很多其他操作,几乎支持了sql、hql所有的语法,功能十分强大。但是实际工作开发中,更多使用SQL风格编程,而不是DSL风格编程,因此不再一一列举,以后如有使用,再记录更新。
五、where to go
第三章:SparkSQL编程——DataSet(2)