➢ 初始化装载
USE sales_dw; -- 清空表 TRUNCATE TABLE dim_customer; TRUNCATE TABLE dim_product; TRUNCATE TABLE dim_order; TRUNCATE TABLE fact_sales_order; -- 装载客户维度表 from ( select row_number() over(order by sp.customer_number) customer_sk, sp.customer_number, sp.customer_name, sp.customer_street_address, sp.customer_zip_code, sp.customer_city, sp.customer_state, '1.0', '2018-1-1', '2050-1-1' from sales_rds.customer sp ) tmp insert into sales_dw.dim_customer select *; -- 装载产品维度表 from ( select row_number() over(order by sp.product_code) product_sk, sp.product_code, sp.product_name, sp.product_category, '1.0', '2018-1-1', '2050-1-1' from sales_rds.product sp ) tmp insert into sales_dw.dim_product select *; -- 装载订单维度表 from ( select row_number() over(order by sp.order_number) order_sk, sp.order_number, '1.0', '2018-1-1', '2050-1-1' from sales_rds.sales_order sp ) tmp insert into sales_dw.dim_order select *; -- 装载销售订单事实表 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=10000; set hive.exec.max.dynamic.partitions.pernode=10000; from ( select b.order_sk, c.customer_sk, d.product_sk, e.date_sk order_date_sk, a.order_amount, substr(a.order_date,1,7) order_date from sales_rds.sales_order a join sales_dw.dim_order b on a.order_number=b.order_number join sales_dw.dim_customer c on a.customer_number=c.customer_number join sales_dw.dim_product d on a.product_code=d.product_code join sales_dw.dim_date e on date(a.order_date)=e.date ) temp insert into table sales_dw.fact_sales_order partition(order_date) select order_sk,customer_sk,product_sk,order_date_sk,order_amount,order_date; #!/bin/bash # 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0 sqoop job --delete rds_incremental_import_job sqoop job --create rds_incremental_import_job \ -- \ import \ --connect jdbc:mysql://localhost:3306/sales_source \ --username root \ --password ok \ --table sales_order \ --hive-import \ --hive-table sales_rds.sales_order \ --fields-terminated-by '\t' \ --lines-terminated-by '\n' \ --incremental append \ --check-column order_number \ --last-value 0 # 首次抽取,将全部数据导入RDS库 sqoop import --connect jdbc:mysql://localhost:3306/sales_source \ --username root --password ok --table customer --hive-import --hive-table sales_rds.customer --hive-overwrite --target-dir temp sleep 2 sqoop import --connect jdbc:mysql://localhost:3306/sales_source --username root --password ok --table product --hive-import --hive-table sales_rds.product --hive-overwrite --target-dir temp beeline -u jdbc:hive2://hadoop01:10000/sales_dw -e "TRUNCATE TABLE sales_rds.sales_order" # 执行增量导入,因为last-value初始值为0,所以此次会导入全部数据 sqoop job --exec rds_incremental_import_job # 调用init_etl.sql文件执行初始装载 spark-sql --master yarn-client -f init_dw_etl.sql➢ 定期装载
涉及到update操纵需要更改hive-site.xml文件,添加支持事务操作,添加如下属性 <!-- hive事务支持--> <property> <name>hive.optimize.sort.dynamic.partition</name> <value>false</value> </property> <property> <name>hive.support.concurrency</name> <value>true</value> </property> <property> <name>hive.enforce.bucketing</name> <value>true</value> </property> <property> <name>hive.exec.dynamic.partition.mode</name> <value>nonstrict</value> </property> <property> <name>hive.txn.manager</name> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> </property> <property> <name>hive.compactor.initiator.on</name> <value>true</value> </property> <property> <name>hive.compactor.worker.threads</name> <value>1</value> </property> <!-- 注意下面这句话千万不能加 <property> <name>hive.in.test</name> <value>true</value> </property> --> -- 设置scd的生效时间和过期时间 use sales_dw; SET hivevar:cur_date = CURRENT_DATE(); SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1); SET hivevar:max_date = CAST('2050-01-01' AS DATE); -- 设置cdc的开始结束日期 INSERT overwrite TABLE sales_rds.cdc_time SELECT end_time, ${hivevar:cur_date} FROM sales_rds.cdc_time; -- 装载customer维度 -- 获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天 UPDATE dim_customer SET expiry_date = ${hivevar:pre_date} WHERE dim_customer.customer_sk IN(SELECT a.customer_sk FROM (SELECT customer_sk, customer_number, customer_street_address FROM dim_customer WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN sales_rds.customer b ON a.customer_number = b.customer_number WHERE b.customer_number IS NULL OR a.customer_street_address <> b.customer_street_address); -- 装载product维度 -- 取源数据中删除或者属性发生变化的产品 UPDATE dim_product SET expiry_date = ${hivevar:pre_date} WHERE dim_product.product_sk IN(SELECT a.product_sk FROM(SELECT product_sk, product_code, product_name, product_category FROM dim_product WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN sales_rds.product b ON a.product_code = b.product_code WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category)); -- 将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入 INSERT INTO dim_customer SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max, t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, t1.version, t1.effective_date, t1.expiry_date FROM(SELECT t2.customer_number customer_number, t2.customer_name customer_name, t2.customer_street_address customer_street_address, t2.customer_zip_code, t2.customer_city, t2.customer_state, t1.version + 1 `version`, ${hivevar:pre_date} effective_date, ${hivevar:max_date} expiry_date FROM dim_customer t1 INNER JOIN sales_rds.customer t2 ON t1.customer_number = t2.customer_number AND t1.expiry_date = ${hivevar:pre_date} LEFT JOIN dim_customer t3 ON t1.customer_number = t3.customer_number AND t3.expiry_date = ${hivevar:max_date} WHERE t1.customer_street_address <> t2.customer_street_address AND t3.customer_sk IS NULL ) t1 CROSS JOIN(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM dim_customer) t2; -- 处理customer_name列上的scd1,覆盖 -- 不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表 -- 将 dim_couster中这些数据删除、 -- 将临时表中的数据插入 DROP TABLE IF EXISTS tmp; CREATE TABLE tmp AS SELECT a.customer_sk, a.customer_number, b.customer_name, a.customer_street_address, a.customer_zip_code, a.customer_city, a.customer_state, a.version, a.effective_date, a.expiry_date FROM dim_customer a JOIN sales_rds.customer b ON a.customer_number = b.customer_number where a.customer_name != b.customer_name; -- 删除数据 DELETE FROM dim_customer WHERE dim_customer.customer_sk IN (SELECT customer_sk FROM tmp); -- 插入数据 INSERT INTO dim_customer SELECT * FROM tmp; -- 处理新增的customer记录 INSERT INTO dim_customer SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max, t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, 1, ${hivevar:pre_date}, ${hivevar:max_date} FROM( SELECT t1.* FROM sales_rds.customer t1 LEFT JOIN dim_customer t2 ON t1.customer_number = t2.customer_number WHERE t2.customer_sk IS NULL) t1 CROSS JOIN(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM dim_customer) t2; -- 处理product_name、product_category列上scd2的新增行 INSERT INTO dim_product SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max, t1.product_code, t1.product_name, t1.product_category, t1.version, t1.effective_date, t1.expiry_date FROM( SELECT t2.product_code product_code, t2.product_name product_name, t2.product_category product_category, t1.version + 1 `version`, ${hivevar:pre_date} effective_date, ${hivevar:max_date} expiry_date FROM dim_product t1 INNER JOIN sales_rds.product t2 ON t1.product_code = t2.product_code AND t1.expiry_date = ${hivevar:pre_date} LEFT JOIN dim_product t3 ON t1.product_code = t3.product_code AND t3.expiry_date = ${hivevar:max_date} WHERE(t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL ) t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM dim_product) t2; -- 处理新增的 product 记录 INSERT INTO dim_product SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max, t1.product_code, t1.product_name, t1.product_category, 1, ${hivevar:pre_date}, ${hivevar:max_date} FROM( SELECT t1.* FROM sales_rds.product t1 LEFT JOIN dim_product t2 ON t1.product_code = t2.product_code WHERE t2.product_sk IS NULL ) t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM dim_product) t2; -- 装载order维度 INSERT INTO dim_order SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max, t1.order_number, t1.version, t1.effective_date, t1.expiry_date FROM( SELECT order_number order_number, 1 `version`, order_date effective_date, '2050-01-01' expiry_date FROM sales_rds.sales_order, sales_rds.cdc_time WHERE entry_date >= end_time AND entry_date < start_time ) t1 CROSS JOIN( SELECT COALESCE(MAX(order_sk),0) sk_max FROM dim_order) t2; -- 装载销售订单事实表 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=10000; set hive.exec.max.dynamic.partitions.pernode=10000; from ( select b.order_sk, c.customer_sk, d.product_sk, e.date_sk order_date_sk, a.order_amount, substr(a.order_date,1,7) order_date from sales_rds.sales_order a join sales_dw.dim_order b on a.order_number=b.order_number join sales_dw.dim_customer c on a.customer_number=c.customer_number join sales_dw.dim_product d on a.product_code=d.product_code join sales_dw.dim_date e on date(a.order_date)=e.date, sales_rds.cdc_time f where a.order_date >= c.effective_date AND a.order_date < c.expiry_date AND a.order_date >= d.effective_date AND a.order_date < d.expiry_date AND a.entry_date >= f.end_time AND a.entry_date < f.start_time ) temp insert into table sales_dw.fact_sales_order partition(order_date) select order_sk,customer_sk,product_sk,order_date_sk,order_amount,order_date; -- 更新时间戳表的字段 INSERT overwrite TABLE sales_rds.cdc_time SELECT start_time,start_time FROM sales_rds.cdc_time; #!/bin/bash # 整体拉取customer、product表数据 sqoop import --connect jdbc:mysql://localhost:3306/sales_source --username root \ --password ok --table customer --hive-import --hive-table sales_rds.customer --hive-overwrite --target-dir temp sleep 2 sqoop import --connect jdbc:mysql://localhost:3306/sales_source --username root --password \ ok --table product --hive-import --hive-table sales_rds.product --hive-overwrite --target-dir temp # 执行增量导入 sqoop job --exec rds_incremental_import_job # 调用 sql 文件执行定期装载 hive -f schedule_daily_etl.sql # spark-sql不支持hive事务,不要用下面的语句 #spark-sql --master yarn-client -f schedule_daily_etl.sqlcrontab -e 定时任务执行如下:
crontab