现有用户点击行为数据文件,每天产生会上传到 hdfs 目录,按天区分目录。 现在需要每天凌晨两点定时导入 Hive 表指定分区中,并统计出今日活跃用户数插入指标表中。
日志文件 clicklog用户点击行为数据,三个字段是用户 id, 点击时间,访问页面
userId click_time index uid1 2020-06-21 12:10:10 a.html uid2 2020-06-21 12:15:10 b.html uid1 2020-06-21 13:10:10 c.html uid1 2020-06-21 15:10:10 d.html uid2 2020-06-21 18:10:10 e.html hdfs 目录会以日期划分文件,例如: /user_clicks/20200621/clicklog.dat /user_clicks/20200622/clicklog.dat /user_clicks/20200623/clicklog.dat ... 开发需求 开发一个 import.job 每日从 hdfs 对应日期目录下同步数据到该表指定分区。(日期格式同上或者自定义)开发一个 analysis.job 依赖 import.job 执行,统计出每日活跃用户(一个用户出现多次算作一次)数并插入 user_info 表中。user_clicks.txt
uid1 2020-09-02 12:10:10 a.html uid2 2020-09-02 12:15:10 b.html uid1 2020-09-02 13:10:10 c.html uid1 2020-09-02 15:10:10 d.html uid2 2020-09-02 18:10:10 e.html uid3 2020-09-02 12:10:10 a.html uid4 2020-09-02 12:15:10 b.html uid5 2020-09-02 13:10:10 c.html uid6 2020-09-02 15:10:10 d.html uid7 2020-09-02 18:10:10 e.html 将文件上传到 HDFS # 在 Hadoop 创建文件夹 hdfs dfs -mkdir -p /user_clicks/20200902/ hdfs dfs -ls /user_clicks/20200902 # 因为测试,今天日期为 20200902 hdfs dfs -put user_clicks.txt /user_clicks/20200902开发思路:
写 job 脚本写 shell 脚本写 hive sql上传至 azkaban调度设置,调度首先在 hive 中,创建表
创建 import.job type=command command=sh import.sh创建 import.sh,如下
#!/bin/sh echo 'import data from hdfs。。。' currDate=`date +%Y%m%d` echo "现在时间:'$currDate'" /opt/lagou/servers/hive-2.3.7/bin/hive -e "USE default;LOAD DATA INPATH '/user_clicks/$currDate/*' OVERWRITE INTO TABLE user_clicks PARTITION (dt='$currDate');" 创建 analysis.job type=command dependencies=import command=sh analysis.sh创建 analysis.sh,如下:
#!/bin/sh echo 'analysis user click。。。' currDate=`date +%Y-%m-%d` echo "现在时间:'$currDate'" /opt/lagou/servers/hive-2.3.7/bin/hive -e "USE default;INSERT INTO TABLE user_info SELECT COUNT(DISTINCT id) active_num, TO_DATE(click_time) `date` FROM user_clicks WHERE TO_DATE(click_time) = '$currDate' GROUP BY TO_DATE(click_time);"格式化 sql , 如下:
只插入当天统计数据
INSERT INTO TABLE user_info SELECT COUNT(DISTINCT id), TO_DATE(click_time) FROM user_clicks WHERE TO_DATE(click_time) = '$currDate' GROUP BY TO_DATE(click_time); # 实例 INSERT INTO TABLE user_info SELECT COUNT(DISTINCT id), TO_DATE(click_time) FROM user_clicks WHERE TO_DATE(click_time) = '2020-09-02' GROUP BY TO_DATE(click_time); 打包,上传至 azkaban zip -r user.zip * drwxrwxr-x 2 donald donald 4096 Sep 3 01:15 ./ drwxrwxr-x 3 donald donald 4096 Sep 2 19:17 ../ -rw-rw-r-- 1 donald donald 55 Sep 2 19:37 analysis.job -rw-rw-r-- 1 donald donald 276 Sep 3 01:15 analysis.sh -rw-rw-r-- 1 donald donald 33 Sep 2 19:17 import.job -rw-rw-r-- 1 donald donald 270 Sep 3 01:15 import.sh -rw-r--r-- 1 donald donald 1134 Sep 3 01:15 user.zip 调度如图:
定时调度每天两点进行调度
# cron 表达式如下: 0 0 2 * * ? *如图:
截图如下:
hive 查询结果 hive (default)> select * from user_info; OK user_info.active_num user_info.date 7 2020-09-03 Time taken: 0.132 seconds, Fetched: 1 row(s) hive (default)> select * from user_clicks; OK user_clicks.id user_clicks.click_time user_clicks.index user_clicks.dt uid1 2020-09-03 12:10:10 a.html 20200903 uid2 2020-09-03 12:15:10 b.html 20200903 uid1 2020-09-03 13:10:10 c.html 20200903 uid1 2020-09-03 15:10:10 d.html 20200903 uid2 2020-09-03 18:10:10 e.html 20200903 uid3 2020-09-03 12:10:10 a.html 20200903 uid4 2020-09-03 12:15:10 b.html 20200903 uid5 2020-09-03 13:10:10 c.html 20200903 uid6 2020-09-03 15:10:10 d.html 20200903 uid7 2020-09-03 18:10:10 e.html 20200903 Time taken: 0.15 seconds, Fetched: 10 row(s)