数仓搭建

tech2024-06-15  63

业务数据存放在Mysql中,使用sqoop去mysql将数据读取到hive的表中

1.执行mysql脚本

/*==============================================================*/ /* DBMS name: MySQL 5.0 */ /* Created on: 2018/11/23 1:09:10 */ /*==============================================================*/ CREATE DATABASE IF NOT EXISTS sales_source DEFAULT CHARSET utf8 COLLATE utf8_general_ci; USE sales_source; DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS product; DROP TABLE IF EXISTS sales_order; /*==============================================================*/ /* Table: customer */ /*==============================================================*/ CREATE TABLE customer ( customer_number INT(11) NOT NULL AUTO_INCREMENT, customer_name VARCHAR(128) NOT NULL, customer_street_address VARCHAR(256) NOT NULL, customer_zip_code INT(11) NOT NULL, customer_city VARCHAR(32) NOT NULL, customer_state VARCHAR(32) NOT NULL, PRIMARY KEY (customer_number) ); /*==============================================================*/ /* Table: product */ /*==============================================================*/ CREATE TABLE product ( product_code INT(11) NOT NULL AUTO_INCREMENT, product_name VARCHAR(128) NOT NULL, product_category VARCHAR(256) NOT NULL, PRIMARY KEY (product_code) ); /*==============================================================*/ /* Table: sales_order */ /*==============================================================*/ CREATE TABLE sales_order ( order_number INT(11) NOT NULL AUTO_INCREMENT, customer_number INT(11) NOT NULL, product_code INT(11) NOT NULL, order_date DATETIME NOT NULL, entry_date DATETIME NOT NULL, order_amount DECIMAL(18,2) NOT NULL, PRIMARY KEY (order_number) );

2.在mysql的表中添加数据

/*==============================================================*/ /* insert data */ /*==============================================================*/ INSERT INTO customer ( customer_name , customer_street_address , customer_zip_code , customer_city , customer_state ) VALUES ('Big Customers', '7500 Louise Dr.', '17050', 'Mechanicsburg', 'PA') , ( 'Small Stores', '2500 Woodland St.', '17055', 'Pittsburgh', 'PA') , ('Medium Retailers', '1111 Ritter Rd.', '17055', 'Pittsburgh', 'PA' ) , ('Good Companies', '9500 Scott St.', '17050', 'Mechanicsburg', 'PA') , ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050', 'Mechanicsburg', 'PA') , ('Loyal Clients', '7070 Ritter Rd.', '17055', 'Pittsburgh', 'PA') ; INSERT INTO product(product_name,product_category) VALUES ('Hard Disk','Storage'), ('Floppy Drive','Storage'), ('lcd panel','monitor') ; DROP PROCEDURE IF EXISTS usp_generate_order_data; DELIMITER // CREATE PROCEDURE usp_generate_order_data() BEGIN DROP TABLE IF EXISTS tmp_sales_order; CREATE TABLE tmp_sales_order AS SELECT * FROM sales_order WHERE 1=0; SET @start_date := UNIX_TIMESTAMP('2018-1-1'); SET @end_date := UNIX_TIMESTAMP('2018-11-23'); SET @i := 1; WHILE @i<=100000 DO SET @customer_number := FLOOR(1+RAND()*6); SET @product_code := FLOOR(1+RAND()* 3); SET @order_date := FROM_UNIXTIME(@start_date+RAND()*(@end_date-@start_date)); SET @amount := FLOOR(1000+RAND()*9000); INSERT INTO tmp_sales_order VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount); SET @i := @i +1; END WHILE; TRUNCATE TABLE sales_order; INSERT INTO sales_order SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount FROM tmp_sales_order; COMMIT; DROP TABLE tmp_sales_order; END // CALL usp_generate_order_data();

3.在hive中创建rds层的数据表表

create database sales_rds; USE sales_rds; DROP TABLE IF EXISTS rds.customer; DROP TABLE IF EXISTS rds.product; DROP TABLE IF EXISTS rds.sales_order; CREATE TABLE sales_rds.customer ( customer_number INT , customer_name VARCHAR(128) , customer_street_address VARCHAR(256) , customer_zip_code INT , customer_city VARCHAR(32) , customer_state VARCHAR(32) ); CREATE TABLE sales_rds.product ( product_code INT, product_name VARCHAR(128) , product_category VARCHAR(256) ); CREATE TABLE sales_rds.sales_order ( order_number INT , customer_number INT, product_code INT , order_date timestamp , entry_date timestamp , order_amount DECIMAL(18,2) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE ;

4.在hive中创建dw层的数据表

create database sales_dw; use sales_dw; create table Dim_Product ( product_sk int , product_code int , product_name varchar(128), product_category varchar(256), version varchar(32), effective_date date, expiry_date date ) clustered by (product_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); create table dim_customer ( customer_sk int , customer_number int , customer_name varchar(128), customer_street_address varchar(256), customer_zip_code int, customer_city varchar(32), customer_state varchar(32), version varchar(32), effective_date date, expiry_date date ) clustered by (customer_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); create table dim_date ( date_sk int , date date, month tinyint, month_name varchar(16), quarter tinyint, year int ) row format delimited fields terminated by ',' stored as textfile; create table dim_order ( order_sk int , order_number int, version varchar(32), effective_date date, expiry_date date ) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); create table fact_sales_order ( order_sk int , customer_sk int , product_sk int , order_date_sk int , order_amount decimal(18,2) ) partitioned by(order_date string) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true');

5.使用sqoop将mysql中的数据导入到hive的rds层的表中 全量抽取customer表

sqoop import \ --connect jdbc:mysql://localhost:3306/sales_source \ --username root \ --password root \ --table customer \ --hive-import \ --hive-table sales_rds.customer \ --hive-overwrite \ --target-dir temp

全量导入product表

sqoop import \ --connect jdbc:mysql://localhost:3306/sales_source \ --username root \ --password root \ --table product \ --hive-import \ --hive-table sales_rds.product \ --hive-overwrite \ --target-dir temp

增量抽取 sales_order

sqoop import \ --connect jdbc:mysql://localhost:3306/sales_source \ --username root \ --password root \ --table sales_order \ --hive-import \ --hive-table sales_rds.sales_order \ --fields-terminated-by '\t' \ --lines-terminated-by '\n' \ --check-column entry_date \ --incremental append \ --last-value '1900-1-1'

使用job增量抽取sales_order

sqoop job \ --create myjob \ -- import \ --connect jdbc:mysql://localhost:3306/sales_source \ --username root \ --password root \ --table sales_order \ --hive-import \ --hive-table sales_rds.sales_order \ --fields-terminated-by '\t' \ --lines-terminated-by '\n' \ --check-column entry_date \ --incremental append \ --last-value '1900-1-1' # 查看job sqoop job --list # 执行job sqoop job --exec myjob # 删除job sqoop job --delete myjob

6.dw层添加数据

加载 dim_product 表

from ( select row_number() over(order by sp.product_code ) product_sk, sp.product_code, sp.product_name, sp.product_category, '1.0', '2018-1-1', '2050-1-1' from sales_rds.product sp ) tmp insert into sales_dw.dim_product select * ;

加载 dim_customer 表

from ( select row_number() over(order by sc.customer_number) customer_sk, sc.customer_number , sc.customer_name , sc.customer_street_address, sc.customer_zip_code, sc.customer_city, sc.customer_state , '1.0', '2018-1-1', '2050-1-1' from sales_rds.customer sc ) tmp insert into sales_dw.dim_customer select *

加载 dim_order 表

from ( select row_number() over(order by so.order_number) order_sk, order_number, '1.0', '2018-1-1', '2050-1-1' from sales_rds.sales_order so ) tmp insert into sales_dw.dim_order select * ;

编写生成日期脚本

#!/bin/bash # 起始日期 date1=$1 # 终止日期 date2=$2 # 日期 tmpdate=`date -d "$date1" +%F` # 起始时间戳 startSec=`date -d "$date1" +%s` # 终止时间戳 endSec=`date -d "$date2" +%s` # 循环的起始值 min=1 # 循环的终止值 max=`expr \( $endSec - $startSec \) / 60 / 60 / 24` while [ $min -le $max ] do # 计算月份 month=`date -d "$tmpdate" +%m` # 计算月份名称 month_name=`date -d "$tmpdate" +%B` # 计算年 year=`date -d "$tmpdate" +%Y` # 计算季度 quarter=`expr \( $month - 1 \) \/ 3 + 1` # 输出到文件 echo ${min}","${tmpdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv # 计算下一次的日期 tmpdate=`date -d "+$min day $date1" +%F` # 计算下一次的时间戳 startSec=`date -d "+$min day $date1" +%s` # 变量+1 min=`expr $min + 1` done
最新回复(0)