创建 SparkSession
from pyspark.sql import SparkSession spark = SparkSession.builder.enableHiveSupport().appName('test_app').getOrCreate() sc = spark.sparkContext hc = HiveContext(sc)saveAsTable 会自动创建hive表,partitionBy指定分区字段,默认存储为 parquet 文件格式。对于从文件生成的DataFrame,字段类型也是自动转换的,有时会转换成不符合要求的类型。
需要自定义字段类型的,可以在创建DataFrame时指定类型:
from pyspark.sql.types import StringType, StructType, BooleanType, StructField schema = StructType([ StructField("vin", StringType(), True), StructField("cust_id", StringType(), True), StructField("is_maintain", BooleanType(), True), StructField("is_wash", BooleanType(), True), StructField("pt_day", StringType(), True), ] ) data = pd.read_csv('/path/to/data.csv', header=0) df = spark.createDataFrame(data, schema=schema) # 写入hive表时就是指定的数据类型了 df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])这种情况其实和建表语句一样就可以了 不需要开启动态分区
df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day']) 1代码:
# 建表 sql_str = "CREATE TABLE IF NOT EXISTS default.csd_test_partition (cust_id string, vin string, is_maintain boolean, is_wash boolean) partitioned by (pt_day string)" hc.sql(sql_str) # 开启动态分区 spark.sql("set hive.exec.dynamic.partition.mode = nonstrict") spark.sql("set hive.exec.dynamic.partition=true") # 指定文件格式 df.write.saveAsTable(save_table, format='Hive', mode='append', partitionBy=['pt_day'])通过临时视图创建
# 数据包含分区字段的,不要指定分区,要开启动态分区 df1.createTempView('temp_table') # 或 df1.registerTempTable('temp_table') hc.sql('insert into default.csd_test_partition select * from temp_table') # 数据不包含分区字段的,可以直接指定分区插入,可以不用开启动态分区 df2 = df1.drop('pt_day') df2.registerTempTable('temp_table') hc.sql('insert into default.csd_test_partition partition(pt_day="20190516") select * from temp_table1')mode=‘overwrite’ 模式时,会创建新的表,若表名已存在则会被删除,整个表被重写。而 mode=‘append’ 模式会在直接在原有数据增加新数据。
sql 语句插入只能先行建表,在执行插入操作。
INSERT INTO tableName PARTITION(pt=pt_value) select * from temp_table 的语句类似于 append 追加的方式。
INSERT OVERWRITE TABLE tableName PARTITION(pt=pt_value) SELECT * FROM temp_table 的语句能指定分区进行重写,而不会重写整张表。
sql 语句的方式比 .write.saveAsTable() 方法更灵活。
默认的方式将会在hive分区表中保存大量的小文件,在保存之前对 DataFrame 用 .repartition() 重新分区,这样就能控制保存的文件数量。
如:
df.repartition(5).write.saveAsTable(...) 或 df.repartition(5).registerTempTable('temp_table')以上设置一个分区只会保存 5 个数据文件。
一:API
saveAsTable 会根据表的schema匹配df的字段进行存储
insertInto,要求表的schema与df必须一致才可以
对于Hive分区表的写入,insertInto要待参数覆盖为True,这样每次会覆盖分区。注意不要使用saveAsTable!,会将全表覆盖,
正确语句,具体变化参考pyspark版本:
df.write.format("hive").insertInto("dev.dev_rep_rebate_bjcouple_partion_orc",True)
二:sql方式
将df创建为临时表,再使用spark.sql 里传hive语句insert select。。。。。。。。。