最近感觉工作一直在做一些重复性的东西,自我提升太小,想要做一些自我提升,想做一些大数据相关的学习,但是发现这个是个系统比较庞大的东西,涉及到的东西多,自学的话不知从何下手,而且大部分的资料都比较零散,不系统,感觉还是需要一个系统的课程。正好看到拉钩推出的大数据训练营,看了下课程内容比较适合自己,有系统的讲解也有几个项目的实践,现在也学了两个月开始做第一个项目实践了,感觉讲师讲的深入浅出,既学会了相关的技术,理解相关技术的原理,使用场景,相同类型不同框架的优缺点。课后也有讲师在线答疑,能及时的解决学习中的疑惑,定期的直播课进行一下知识点的整理回顾和在线答疑。其实课程内容还是很丰富的,大概每天要花两三个小时在学习上面才跟的上学习的进度,由于我自己时间相对充裕些,没有被班主任催过进度哈哈哈。
在拉勾教育大数据训练营的学习中,关于离线数仓的实践
数据仓库(Data Warehouse)是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。
假设我要对用户会员活跃度做分析
分析的数据源:用户登陆日志
计算指标是:每日,每周活跃会员总数(当然一个系统不会只有一个指标,一定是N多个指标形成一个指标体系,而且为了方便展示,这是最基础的一个指标)
技术方案选型
hadoop框架选型
Apache / 第三方发行版(CDH / HDP / Fusion Insight)
软件选型
数据采集:DataX、Flume、Sqoop、Logstash、Kafka
数据存储:HDFS、HBase
数据计算:Hive、MapReduce、Tez、Spark、Flink
调度系统:Airflow、azkaban、Oozie
元数据管理:Atlas
数据质量管理:Griffin
即席查询:Impala、Kylin、ClickHouse、Presto、Druid
其他:MySQL
服务器选型
选择物理机还是云主机
集群规模的估算
主要考虑存储量,存储量主要是日志,计算出日志总量,通常要给磁盘预留20-30%的空间,数据仓库应用有1-2倍的数据膨胀
逻辑架构
对于业务数据,我们可以通过datax从mysql放到ODS进入数仓
对于日志,我们可以通过flume放到hdfs再转入ODS进入数仓
数仓包括:ODS,DWD,DWS,ADS还有元数据管理任务调度数据质量监控等,最后ADS数据可以通过datax放入mysql方便以后进行数据可视化,或者让impala做即席查询
开发物理环境
命名规范
数据库命名:ods / dwd / dws/ dim / temp / ads
数仓各层对应数据库:eg:ods层 -> ods_{业务线|业务项目}
表命名(数据库表命名规则):eg:命名规则:ods{业务线|业务项目}[数据来源类型]_{业务}
分层的原因:清晰的数据结构;将复杂的问题简单化;减少重复开发;屏蔽原始数据的异常;数据血缘的追踪
ODS(数据准备区):数据仓库源头系统的数据表通常会 原封不动的存储一份,数据来源主要包括:业务数据库;埋点日志;其他数据源。从第三方购买的数据、或是网络爬虫抓取的数据;
DW(数据仓库层)
DWD(细节数据层):以业务过程作为建模驱动,基于每个具体的业务过程特点,构建细粒度的 明细层事实表。
DWS(服务数据层):基于DWD的基础数据,整合 汇总成分析某一个主题域的服务数据。以分析的主题为建模驱动,基于上层的 应用和产品的指标需求,构建公共粒度的汇总指标事实表;
公共维度层(DIM):基于维度建模理念思想,建立一致性维度;
TMP层:临时层,存放计算过程中临时产生的数据;
ADS(应用数据层):基于DW数据,整合汇总成主题域的服务数据,用于提供后续的业务查询等。
数据采集:日志文件 => Flume => HDFS
Flume:编写我们的flume配置文件使用taildir监控我们的日志文件,将日志文件的内容传输到我们的hdfs中
新建flume-log2hdfs.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/lagoudw/logs/start/.*log a1.sources.r1.headers.f1.logtype = start a1.sources.r1.filegroups.f2 = /opt/lagoudw/logs/event/.*log a1.sources.r1.headers.f2.logtype = event # 自定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = startlog a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 #a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动我们的flume就可以采集日志数据了 flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf --conf-file /opt/lagoudw/conf/flume-log2hdfs.conf -name a1 - Dflume.roog.logger=INFO,console检测数据是否同步成功
hdfs dfs -ls /user/data/logs/start
创建我们的ods表,ods_start_log,并将hdfs的数据加载到我们的ods表中,这里没有对数据做任何特殊的处理,我们只对是将日志数据加载到了hive里面方便我们后面的处理
alter table ods.ods_start_log add partition(dt='2020-07-21');但是这里会有一个问题,那就是我们不能每天手动的去加载数据,所以我们编写脚本文件
到时候通过调度系统定时的处理我们的加载数据的需求
#!/bin/bash APP=ODS hive=/opt/lagou/servers/hive-2.3.7/bin/hive # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" alter table "$APP".ods_start_log add partition(dt='$do_date'); " $hive -e "$sql"检查数据
我们创建dwd相关的表 dwd_start_log,通过json数据解析ods的数据,丢弃无用数据(数据清洗),保留有效信息,并将数据展开,形成每日启动明细表。
with tmp as( select split(str, ' ')[7] line from ods.ods_start_log where dt='$do_date' ) insert overwrite table dwd.dwd_start_log partition(dt='$do_date') select get_json_object(line, '$.attr.device_id'), get_json_object(line, '$.attr.area'), get_json_object(line, '$.attr.uid'), get_json_object(line, '$.attr.app_v'), get_json_object(line, '$.attr.event_type'), get_json_object(line, '$.attr.os_type'), get_json_object(line, '$.attr.channel'), get_json_object(line, '$.attr.language'), get_json_object(line, '$.attr.brand'), get_json_object(line, '$.app_active.json.entry'), get_json_object(line, '$.app_active.json.action'), get_json_object(line, '$.app_active.json.error_code') from tmp;同样我们也可以通过编写脚本来实现,之后任务的定时调度
这一步之后我们得到的数据已经有了基本数据的样子
我们创建dws.dws_member_start_day用于存储会员日启动汇总,dws.dws_member_start_week每周会员日启动汇总
通过一下hive sql可以查询得到结果,并将结果插入到dws.dws_member_start_day中
select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)) from dwd.dwd_start_log where dt='$do_date' group by device_id;通过一下hive sql可以查询得到结果,并将结果插入到dws.dws_member_start_week中
select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)), date_add(next_day('$do_date', 'mo'), -7) from dws.dws_member_start_day where dt >= date_add(next_day('$do_date', 'mo'), -7) and dt <= '$do_date' group by device_id;同样我们也可以通过编写脚本来实现,之后任务的定时调度
我们创建ads.ads_member_active_count用于存储日,周活跃用户总数
通过统计函数得到我们想要的统计结果
with tmp as( select 'day' datelabel, count(*) cnt, dt from dws.dws_member_start_day where dt='$do_date' group by dt union all select 'week' datelabel, count(*) cnt, dt from dws.dws_member_start_week where dt='$do_date' group by dt ) select sum(case when datelabel='day' then cnt end) as day_count, sum(case when datelabel='week' then cnt end) as week_count from tmp group by dt;同样我们也可以通过编写脚本来实现,之后任务的定时调度
#!/bin/bash hive=/opt/lagou/servers/hive-2.3.7/bin/hive # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" with tmp as( select 'day' datelabel, count(*) cnt, dt from dws.dws_member_start_day where dt='$do_date' group by dt union all select 'week' datelabel, count(*) cnt, dt from dws.dws_member_start_week where dt='$do_date' group by dt ) insert overwrite table ads.ads_member_active_count partition(dt='$do_date') select sum(case when datelabel='day' then cnt end) as day_count, sum(case when datelabel='week' then cnt end) as week_count from tmp group by dt; " $hive -e "$sql"
1.在mysql中建立对应的表结构
-- MySQL 建表 -- 活跃会员数 create database dwads; drop table if exists dwads.ads_member_consecutive3day_count; create table dwads.ads_member_consecutive3day_count( `dt` varchar(10) COMMENT '统计日期', `day_count` int COMMENT '当日会员数量', `week_count` int COMMENT '当周会员数量', primary key (dt) );2.编写datax的脚本
{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/user/hive/warehouse/ads.db/ads_member_consecutive3day_count/dt=$do_date/*", "defaultFS": "hdfs://linux169:9000", "column" : [ { "type": "string", "value": "$do_date" }, { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ],"fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "replace", "username": "root", "password": "XXXXX", "column": [ "dt","day_count","week_count" ], "preSql": [ "" ], "connection": [ { "jdbcUrl":"jdbc:mysql://linux168:3306/dwads?useUnicode=true&characterEncoding=utf-8", "table": [ "ads_member_consecutive3day_count" ] } ] } } } ] } }3.执行脚本
python datax.py -p "-Ddo_date=2020-07-24" /opt/lagoudw/script/member_active/ads_membercount_to_mysql.json
以上是一个基本的列子,其实里面有很多技术框架是可替换的,比如日志的采集不一定要用flume,数据的计算可以选择tez spark等不同的技术来提升计算的性能,但是建立数仓的整体流程是类似的,我们首先要进行需求分析,确认我们的指标体系和模型,然后选择合适自己系统的技术框架,开始对数据进行采集处理分析最后将我们的数据同步到其他系统去用于之后的数据可视化,报表生成等