业务数据存放在Mysql中,使用sqoop去mysql将数据读取到hive的表中
1.执行mysql脚本
CREATE DATABASE IF NOT EXISTS sales_source
DEFAULT CHARSET utf8
COLLATE utf8_general_ci
;
USE sales_source
;
DROP TABLE IF EXISTS customer
;
DROP TABLE IF EXISTS product
;
DROP TABLE IF EXISTS sales_order
;
CREATE TABLE customer
(
customer_number
INT(11) NOT NULL AUTO_INCREMENT,
customer_name
VARCHAR(128) NOT NULL,
customer_street_address
VARCHAR(256) NOT NULL,
customer_zip_code
INT(11) NOT NULL,
customer_city
VARCHAR(32) NOT NULL,
customer_state
VARCHAR(32) NOT NULL,
PRIMARY KEY (customer_number
)
);
CREATE TABLE product
(
product_code
INT(11) NOT NULL AUTO_INCREMENT,
product_name
VARCHAR(128) NOT NULL,
product_category
VARCHAR(256) NOT NULL,
PRIMARY KEY (product_code
)
);
CREATE TABLE sales_order
(
order_number
INT(11) NOT NULL AUTO_INCREMENT,
customer_number
INT(11) NOT NULL,
product_code
INT(11) NOT NULL,
order_date
DATETIME NOT NULL,
entry_date
DATETIME NOT NULL,
order_amount
DECIMAL(18,2) NOT NULL,
PRIMARY KEY (order_number
)
);
2.在mysql的表中添加数据
INSERT INTO customer
( customer_name
, customer_street_address
, customer_zip_code
, customer_city
, customer_state
)
VALUES
('Big Customers', '7500 Louise Dr.', '17050',
'Mechanicsburg', 'PA')
, ( 'Small Stores', '2500 Woodland St.', '17055',
'Pittsburgh', 'PA')
, ('Medium Retailers', '1111 Ritter Rd.', '17055',
'Pittsburgh', 'PA'
)
, ('Good Companies', '9500 Scott St.', '17050',
'Mechanicsburg', 'PA')
, ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050',
'Mechanicsburg', 'PA')
, ('Loyal Clients', '7070 Ritter Rd.', '17055',
'Pittsburgh', 'PA')
;
INSERT INTO product
(product_name
,product_category
) VALUES
('Hard Disk','Storage'),
('Floppy Drive','Storage'),
('lcd panel','monitor')
;
DROP PROCEDURE IF EXISTS usp_generate_order_data
;
DELIMITER
CREATE PROCEDURE usp_generate_order_data
()
BEGIN
DROP TABLE IF EXISTS tmp_sales_order
;
CREATE TABLE tmp_sales_order
AS SELECT * FROM sales_order
WHERE 1=0;
SET @start_date :
= UNIX_TIMESTAMP
('2018-1-1');
SET @end_date :
= UNIX_TIMESTAMP
('2018-11-23');
SET @i :
= 1;
WHILE @i<=100000 DO
SET @customer_number :
= FLOOR
(1+RAND
()*6);
SET @product_code :
= FLOOR
(1+RAND
()* 3);
SET @order_date :
= FROM_UNIXTIME
(@start_date+RAND
()*(@end_date-@start_date));
SET @amount :
= FLOOR
(1000+RAND
()*9000);
INSERT INTO tmp_sales_order
VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount);
SET @i :
= @i +1;
END WHILE;
TRUNCATE TABLE sales_order
;
INSERT INTO sales_order
SELECT NULL,customer_number
,product_code
,order_date
,entry_date
,order_amount
FROM tmp_sales_order
;
COMMIT;
DROP TABLE tmp_sales_order
;
END
CALL usp_generate_order_data
();
3.在hive中创建rds层的数据表表
create database sales_rds
;
USE sales_rds
;
DROP TABLE IF EXISTS rds
.customer
;
DROP TABLE IF EXISTS rds
.product
;
DROP TABLE IF EXISTS rds
.sales_order
;
CREATE TABLE sales_rds
.customer
(
customer_number
INT ,
customer_name
VARCHAR(128) ,
customer_street_address
VARCHAR(256) ,
customer_zip_code
INT ,
customer_city
VARCHAR(32) ,
customer_state
VARCHAR(32)
);
CREATE TABLE sales_rds
.product
(
product_code
INT,
product_name
VARCHAR(128) ,
product_category
VARCHAR(256)
);
CREATE TABLE sales_rds
.sales_order
(
order_number
INT ,
customer_number
INT,
product_code
INT ,
order_date
timestamp ,
entry_date
timestamp ,
order_amount
DECIMAL(18,2)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED
AS TEXTFILE
;
4.在hive中创建dw层的数据表
create database sales_dw
;
use sales_dw
;
create table Dim_Product
(
product_sk
int ,
product_code
int ,
product_name
varchar(128),
product_category
varchar(256),
version
varchar(32),
effective_date
date,
expiry_date
date
)
clustered by (product_sk
) into 8 buckets
stored
as orc tblproperties
('transactional'='true');
create table dim_customer
(
customer_sk
int ,
customer_number
int ,
customer_name
varchar(128),
customer_street_address
varchar(256),
customer_zip_code
int,
customer_city
varchar(32),
customer_state
varchar(32),
version
varchar(32),
effective_date
date,
expiry_date
date
)
clustered by (customer_sk
) into 8 buckets
stored
as orc tblproperties
('transactional'='true');
create table dim_date
(
date_sk
int ,
date date,
month tinyint,
month_name
varchar(16),
quarter
tinyint,
year int
) row format delimited
fields terminated by ','
stored
as textfile
;
create table dim_order
(
order_sk
int ,
order_number
int,
version
varchar(32),
effective_date
date,
expiry_date
date
)
clustered by (order_sk
) into 8 buckets
stored
as orc tblproperties
('transactional'='true');
create table fact_sales_order
(
order_sk
int ,
customer_sk
int ,
product_sk
int ,
order_date_sk
int ,
order_amount
decimal(18,2)
)
partitioned
by(order_date string
)
clustered by (order_sk
) into 8 buckets
stored
as orc tblproperties
('transactional'='true');
5.使用sqoop将mysql中的数据导入到hive的rds层的表中 全量抽取customer表
sqoop
import \
--connect jdbc:mysql://localhost:3306/sales_source \
--username root \
--password root \
--table customer \
--hive-import \
--hive-table sales_rds.customer \
--hive-overwrite \
--target-dir temp
全量导入product表
sqoop
import \
--connect jdbc:mysql://localhost:3306/sales_source \
--username root \
--password root \
--table product \
--hive-import \
--hive-table sales_rds.product \
--hive-overwrite \
--target-dir temp
增量抽取 sales_order
sqoop
import \
--connect jdbc:mysql://localhost:3306/sales_source \
--username root \
--password root \
--table sales_order \
--hive-import \
--hive-table sales_rds.sales_order \
--fields-terminated-by
'\t' \
--lines-terminated-by
'\n' \
--check-column entry_date \
--incremental append \
--last-value
'1900-1-1'
使用job增量抽取sales_order
sqoop job \
--create myjob \
--
import \
--connect jdbc:mysql://localhost:3306/sales_source \
--username root \
--password root \
--table sales_order \
--hive-import \
--hive-table sales_rds.sales_order \
--fields-terminated-by
'\t' \
--lines-terminated-by
'\n' \
--check-column entry_date \
--incremental append \
--last-value
'1900-1-1'
sqoop job --list
sqoop job --exec myjob
sqoop job --delete myjob
6.dw层添加数据
加载 dim_product 表
from
(
select
row_number
() over(order by sp
.product_code
) product_sk
,
sp
.product_code
,
sp
.product_name
,
sp
.product_category
,
'1.0',
'2018-1-1',
'2050-1-1'
from sales_rds
.product sp
) tmp
insert into sales_dw
.dim_product
select *
;
加载 dim_customer 表
from
(
select
row_number
() over(order by sc
.customer_number
) customer_sk
,
sc
.customer_number
,
sc
.customer_name
,
sc
.customer_street_address
,
sc
.customer_zip_code
,
sc
.customer_city
,
sc
.customer_state
,
'1.0',
'2018-1-1',
'2050-1-1'
from sales_rds
.customer sc
) tmp
insert into sales_dw
.dim_customer
select *
加载 dim_order 表
from
(
select
row_number
() over(order by so
.order_number
) order_sk
,
order_number
,
'1.0',
'2018-1-1',
'2050-1-1'
from sales_rds
.sales_order so
) tmp
insert into sales_dw
.dim_order
select *
;
编写生成日期脚本
#!/bin/bash
date1
=$1
date2
=$2
tmpdate
=`date -d "$date1" +%F`
startSec
=`date -d "$date1" +%s`
endSec
=`date -d "$date2" +%s`
min
=1
max
=`expr \( $endSec - $startSec \) / 60 / 60 / 24`
while [ $min -le
$max ]
do
month
=`date -d "$tmpdate" +%m`
month_name
=`date -d "$tmpdate" +%B`
year
=`date -d "$tmpdate" +%Y`
quarter
=`expr \( $month - 1 \) \/ 3 + 1`
echo ${min}","${tmpdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv
tmpdate
=`date -d "+$min day $date1" +%F`
startSec
=`date -d "+$min day $date1" +%s`
min
=`expr $min + 1`
done