微小型数仓案例整理

tech2023-07-09  103

一、MySQL导入数据

数仓是建立在 hive 上,有两层(ODS 层 rds 库)和 DW 层(tds 库),存储格式 日期维度 textfile,其他 orc。 可使用命令导入sql文件。 建mysql表语句:

CREATE DATABASE IF NOT EXISTS sales_source DEFAULT CHARSET utf8 COLLATE utf8_general_ci; USE sales_source; DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS product; DROP TABLE IF EXISTS sales_order; #/*==============================================================*/ #/* Table: customer */ #/*==============================================================*/ CREATE TABLE customer ( customer_number INT(11) NOT NULL AUTO_INCREMENT, customer_name VARCHAR(128) NOT NULL, customer_street_address VARCHAR(256) NOT NULL, customer_zip_code INT(11) NOT NULL, customer_city VARCHAR(32) NOT NULL, customer_state VARCHAR(32) NOT NULL, PRIMARY KEY (customer_number) ); /*==============================================================*/ /* Table: product */ /*==============================================================*/ CREATE TABLE product ( product_code INT(11) NOT NULL AUTO_INCREMENT, product_name VARCHAR(128) NOT NULL, product_category VARCHAR(256) NOT NULL, PRIMARY KEY (product_code) ); /*==============================================================*/ /* Table: sales_order */ /*==============================================================*/ CREATE TABLE sales_order ( order_number INT(11) NOT NULL AUTO_INCREMENT, customer_number INT(11) NOT NULL, product_code INT(11) NOT NULL, order_date DATETIME NOT NULL, entry_date DATETIME NOT NULL, order_amount DECIMAL(18,2) NOT NULL, PRIMARY KEY (order_number) );

生成mysql表数据代码:

INSERT INTO customer ( customer_name , customer_street_address , customer_zip_code , customer_city , customer_state ) VALUES ('Big Customers', '7500 Louise Dr.', '17050', 'Mechanicsburg', 'PA') , ( 'Small Stores', '2500 Woodland St.', '17055', 'Pittsburgh', 'PA') , ('Medium Retailers', '1111 Ritter Rd.', '17055', 'Pittsburgh', 'PA' ) , ('Good Companies', '9500 Scott St.', '17050', 'Mechanicsburg', 'PA') , ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050', 'Mechanicsburg', 'PA') , ('Loyal Clients', '7070 Ritter Rd.', '17055', 'Pittsburgh', 'PA') ; INSERT INTO product(product_name,product_category) VALUES ('Hard Disk','Storage'), ('Floppy Drive','Storage'), ('lcd panel','monitor') ; DROP PROCEDURE IF EXISTS usp_generate_order_data; DELIMITER // CREATE PROCEDURE usp_generate_order_data() BEGIN DROP TABLE IF EXISTS tmp_sales_order; CREATE TABLE tmp_sales_order AS SELECT * FROM sales_order WHERE 1=0; SET @start_date := UNIX_TIMESTAMP('2018-1-1'); SET @end_date := UNIX_TIMESTAMP('2018-11-23'); SET @i := 1; WHILE @i<=100000 DO SET @customer_number := FLOOR(1+RAND()*6); SET @product_code := FLOOR(1+RAND()* 3); SET @order_date := FROM_UNIXTIME(@start_date+RAND()*(@end_date-@start_date)); SET @amount := FLOOR(1000+RAND()*9000); INSERT INTO tmp_sales_order VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount); SET @i := @i +1; END WHILE; TRUNCATE TABLE sales_order; INSERT INTO sales_order SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount FROM tmp_sales_order; COMMIT; DROP TABLE tmp_sales_order; END // CALL usp_generate_order_data();

建立hive数据库和表 建立sales_rds数据库和表:

USE sales_rds; DROP TABLE IF EXISTS rds.customer; DROP TABLE IF EXISTS rds.product; DROP TABLE IF EXISTS rds.sales_order; /*==============================================================*/ /* Table: customer */ /*==============================================================*/ CREATE TABLE sales_rds.customer ( customer_number INT , customer_name VARCHAR(128) , customer_street_address VARCHAR(256) , customer_zip_code INT , customer_city VARCHAR(32) , customer_state VARCHAR(32) ); /*==============================================================*/ /* Table: product */ /*==============================================================*/ CREATE TABLE sales_rds.product ( product_code INT, product_name VARCHAR(128) , product_category VARCHAR(256) ); /*==============================================================*/ /* Table: sales_order */ /*==============================================================*/ CREATE TABLE sales_rds.sales_order ( order_number INT , customer_number INT, product_code INT , order_date timestamp , entry_date timestamp , order_amount DECIMAL(18,2) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE ;

建立sales_dw数据库和表:

create database sales_dw; use sales_dw; create table Dim_Product ( product_sk int , product_code int , product_name varchar(128), product_category varchar(256), version varchar(32), effective_date date, expiry_date date ) clustered by (product_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); /*==============================================================*/ /* Table: dim_customer */ /*==============================================================*/ create table dim_customer ( customer_sk int , customer_number int , customer_name varchar(128), customer_street_address varchar(256), customer_zip_code int, customer_city varchar(32), customer_state varchar(32), version varchar(32), effective_date date, expiry_date date ) clustered by (customer_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); /*==============================================================*/ /* Table: dim_date */ /*==============================================================*/ create table dim_date ( date_sk int , date date, month tinyint, month_name varchar(16), quarter tinyint, year int ) row format delimited fields terminated by ',' stored as textfile; /*==============================================================*/ /* Table: dim_order */ /*==============================================================*/ create table dim_order ( order_sk int , order_number int, version varchar(32), effective_date date, expiry_date date ) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); /*==============================================================*/ /* Table: fact_sales_order */ /*==============================================================*/ create table fact_sales_order ( order_sk int , customer_sk int , product_sk int , order_date_sk int , order_amount decimal(18,2) ) partitioned by(order_date string) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true');

二、数据导入到hive中

数据模型图 使用sqoop把mysql数据导入到hive中,在hive建立数据库;

#全量抽取customer表 sqoop import \ --connect jdbc:mysql://192.168.255.11:3306/sales_source \ --username root \ --password root \ --table customer \ --hive-import \ --hive-table sales_rds.customer \ --hive-overwrite \ --target-dir temp #全量导入product表 sqoop import \ --connect jdbc:mysql://192.168.255.11:3306/sales_source \ --username root \ --password 123 \ --table product \ --hive-import \ --hive-table sales_rds.product \ --hive-overwrite \ --target-dir temp #增量抽取 sales_order sqoop import \ --connect jdbc:mysql://192.168.255.11:3306/sales_source \ --username root \ --password 123 \ --table sales_order \ --hive-import \ --hive-table sales_rds.sales_order \ --fields-terminated-by '\t' \ --lines-terminated-by '\n' \ --check-column entry_date \ --incremental append \ --last-value '1900-1-1'

这是rds层数据,再把rds层数据加载到dw层中。

# 加载 dim_product 表 from ( select row_number() over(order by sp.product_code ) product_sk, sp.product_code, sp.product_name, sp.product_category, '1.0', '2018-1-1', '2050-1-1' from sales_rds.product sp ) tmp insert into sales_dw.dim_product select * ; # 加载 dim_customer 表 from ( select row_number() over(order by sc.customer_number) customer_sk, sc.customer_number , sc.customer_name , sc.customer_street_address, sc.customer_zip_code, sc.customer_city, sc.customer_state , '1.0', '2018-1-1', '2050-1-1' from sales_rds.customer sc ) tmp insert into sales_dw.dim_customer select * ; # 加载 dim_order 表 from ( select row_number() over(order by so.order_number) order_sk, order_number, '1.0', '2018-1-1', '2050-1-1' from sales_rds.sales_order so ) tmp insert into sales_dw.dim_order select * ;

加载完之后,sales_dw中的dim_Product、dim_customer和dim_order都是有数据的,只有dim_date,fact_sales_order没有数据。 数据模型如下: 向dim_date表中放入数据,这里是有脚本自动生成数据: 给脚本赋权限,运行脚本,./**.sh 日期参数 日期参数

#!/bin/bash # 起始日期 date1=$1 # 终止日期 date2=$2 # 日期 tmpdate=`date -d "$date1" +%F` # 起始时间戳 startSec=`date -d "$date1" +%s` # 终止时间戳 endSec=`date -d "$date2" +%s` # 循环的起始值 min=1 # 循环的终止值 max=`expr \( $endSec - $startSec \) / 60 / 60 / 24` while [ $min -le $max ] do # 计算月份 month=`date -d "$tmpdate" +%m` # 计算月份名称 month_name=`date -d "$tmpdate" +%B` # 计算年 year=`date -d "$tmpdate" +%Y` # 计算季度 quarter=`expr \( $month - 1 \) \/ 3 + 1` # 输出到文件 echo ${min}","${tmpdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv # 计算下一次的日期 tmpdate=`date -d "+$min day $date1" +%F` # 计算下一次的时间戳 startSec=`date -d "+$min day $date1" +%s` # 变量+1 min=`expr $min + 1` done

有dim_Product、dim_customer和dim_order、dim_date向fact_sales_order表中插入数据,这里用SQL语句插入,对应好列名。 在导入数据到fact_sales_order中,需要设置一下:

set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=10000; set hive.exec.max.dynamic.partitions.pernode=10000;

由于我个人的Linux配置问题,需要修改task个数:

set hive.auto.convert.join=false; set mapred.reduce.tasks=24; from( select b.order_sk, c.customer_sk, d.product_sk, e.date_sk order_date_sk, a.order_amount, substr(a.order_date,1,10) order_date from sales_rds.sales_order a join sales_dw.dim_order b on a.order_number=b.order_number join sales_dw.dim_customer c on a.customer_number=c.customer_number join sales_dw.dim_product d on a.product_code=d.product_code join sales_dw.dim_date e on date(a.order_date)=e.date ) temp insert into table sales_dw.fact_sales_order partition(order_date) select order_sk,customer_sk,product_sk,order_date_sk,order_amount,order_date;

这里的表就已经全部完成,数仓建立完成。后面可以通过SQL代码解决问题需求。 例:

select c.customer_sk , c.customer_number , c.customer_name , c.customer_street_address , c.customer_zip_code , c.customer_city , c.customer_state , p.product_sk , p.product_code , p.product_name , p.product_category, dd.date_sk, dd.date , dd.month , dd.month_name , dd.quarter , dd.year , sum(case when datediff("2018-10-20",dd.date)=0 then 1 else 0 end) one_order_cnt, sum(case when datediff("2018-10-20",dd.date)<=1 then 1 else 0 end) two_order_cnt, sum(case when datediff("2018-10-20",dd.date)<=0 then fso.order_amount else 0 end) one_order_cnt_amount, sum(case when datediff("2018-10-20",dd.date)<=1 then 1 else 0 end) two_order_cnt_amount from sales_dw.fact_sales_order fso join sales_dw.dim_customer c on fso.customer_sk=c.customer_sk join sales_dw.dim_product p on fso.product_sk=p.product_sk join sales_dw.dim_date dd on fso.order_date_sk=dd.date_sk where dd.date>='2018-10-19' and dd.date<='2018-10-20' group by c.customer_sk , c.customer_number , c.customer_name , c.customer_street_address , c.customer_zip_code , c.customer_city , c.customer_state , p.product_sk , p.product_code , p.product_name , p.product_category, dd.date_sk, dd.date , dd.month , dd.month_name , dd.quarter , dd.year;
最新回复(0)