一、Linux 常用高级命令:ps进程,rpm安装,netstat端口,find,df磁盘,iotop磁盘读写,top内存,tar,df -h磁盘
查看系统中所有进程:ps -aux查看子父进程之间的关系:ps -ef列出所有安装包:rpm -qa卸载多个安装包:rpm -qa|grep -i mariadb mysql查看该进程网络信息:netstat -anp|grep 进程号查看网络端口号占用情况:netstat -nlp|grep 端口号根据名称查找文件:find xiyou/ -name *.txt根据用户查找文件:find / -user atguigu根据大小查找文件:find /home -size +204800查看磁盘存储情况:df -h查看磁盘IO读写:iotop查看高输出程序:iotop -o查看负载:top压缩多个文件:tar -zcvf 111.tar.gz a.txt b.txt解压到指定目录:tar -zxvf 222.tar.gz -C /opt二、Shell
常用工具:awk、sed、cut、sort单双引区别:最外层单引号 字符串,最外层双引号 解析变量脚本: (1) 集群启动,分发脚本 启停: #!/bin/bash case $1 in “start”){ for i in hadoop102 hadoop103 hadoop104 do ssh $i “绝对路径” done };; “stop”){ ssh $i “ps -ef | grep xxxname | grep -v grep | awk’{print $2}’ | xargs kill” };; esac (2) 数仓与mysql的导入导出:hdfs_to_mysql.sh sqoop默认4个map并行导入数据 (3) 数仓层级内部的导入:hdfs_to_ods_db.sh等三、Hadoop a) 入门
常用端口号: hadoop3.x:9870/hdfs 8088/mr 19888/历史服务器 8020/外部访问集群 hadoop2.x:50070 8088 19888 9000配置文件: hadoop3.x:core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers hadoop2.x:core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml slaves hadoop/yarn/mapred3个env 配置/etc/profile.d/my_env.sh则不需要在env.sh配置环境变量hadoop2和3区别: (1) jdk依赖7/8 (2) 端口hdfs50070/9870,外部访问9000/8020 (3) slaves/worker (4) 高可用单个standbyNN/多个standbyNN (5) 引入纠删码,降低副本磁盘占用 (6) 重写了shell脚本 (7) 仅支持NN间的数据均衡/还支持NN内多磁盘的数据均衡b) HDFS
HDFS读写流程(画图) (1) 读:client向NN请求下载,返回元数据,client通过元数据信息找到DN节点中的block下载(可能存在多个节点) (2) 写:client向NN请求上载,响应可以上载文件,请求上载第一个block,返回DN节点群,client和DN群依次建立通道并依次应答,client按packet上传block最近节点,DN间依次传输备份 如果有DN建立连接失败,则剩下节点重新建立通道,传输完成后DN向NN汇报,不足的副本会重新备份HDFS小文件【重点】: 缺点:耗namenode内存(1个文件元数据占150byte;128g服务器存9亿文件块)、多个maptask(默认每个文件一个切片) 解决:har归档(打包)、CombineTextInputFormat(小文件放一起切)、JVM重用(非小文件场景不要开启,因为任务完成才释放占用的task卡槽)副本数默认3块大小:本地:32、hadoop1.x:64、hadoop2.x:128、生产环境:128/256;hive:256c) MapReduce
Shuffle及优化 概念:map方法之后,reduce方法之前,混洗的过程 过程:map方法-环形缓冲区(一侧数据一侧索引)-80%反向-getpartition方法获取分区-对key的索引按照字典顺序快排-多次溢写排序-溢写文件按分区归并排序 -按分区写入reduce阶段内存-内存不足存磁盘-归并、排序-按key分组进入reduce方法 (1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中 (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (3)多个溢出文件会被合并成大的溢出文件 (4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序 (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据 (6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序) (7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法) 优化: (1)getpartition方法可自定义分区 (2)扩大环形缓冲区和反向百分比 (3)多次溢写的文件可进行Combine(不影响业务逻辑前提下;一次10-20个) (4)压缩(map前/map后/reduce后;文件大选支持切片的格式) (5)修改reduce每次拿多少个map的数据(默认5个,加内存改10-20) (6)NodeManager内存优化:生产环境128g,NM默认8g分配100g 对于单任务:默认8g,128m数据=1g内存 1g数据=8g内存;maptask内存根据切块大小,128m数据=1g内存;reducetask内存根据map过来的数据,128m数据=1g内存压缩选择:map前:大文件要支持切片:lzo/bzip2;map后:snappy/lzo;reduce后:看存储目标要快还是小参数配置解决数据倾斜: (1)提前在map进行combine,减少传输的数据量 (2)导致数据倾斜的key,key增加随机数、增加Reducer并行度、自定义分区d) Yarn
Yarn工作机制(画图) yarnRunner客户端向RM申请Application-返回hdfs路径-客户端上传(jar xml 切片信息)并申请AM-RM初始化Task放入队列 -NM领取Task并创建Container-AM下载job资源到Container本地-AM向RM申请maptask容器-RM创建容器-AM发送启动脚本到各个NM的Container并实时监控 -MapTask生成数据并申请启动对应分区数的ReduceTask资源 -reduceTask阶段结束后AM向RM注销资源调度器 (1) 默认调度器:Apache:容量调度器(消耗资源小);CDH:公平调度器(消耗资源大) (2) 调度器特点:FIFO:单队列等待,先进先出,生产环境不用 容量调度器:支持多队列;可以抢占其他队列空闲资源,优先保证先进入的任务资源 公平调度器:支持多队列;可以抢占其他队列空闲资源,每个任务公平享有队列资源,并发度高 (3) 生产环境选择:并发度高选公平调度器,并发度低选容量调度器 (4) 默认容量调度器:1个default调度器 (5) 生产环境配置多少任务队列:按框架分:hive/spark/flink 按业务分:灵活、安全、可配置(优先级等)四、Zookeeper
选举机制:半数机制(奇数台),算法(今日头条)常用命令:ls查看当前znode的子节点,create普通创建,get获得节点的值生产环境zk安装数量:10服务器 安装zk 3台 20服务器 安装zk 5台 50服务器 安装zk 7台 100服务器 安装zk 11台 台数多可靠性高,但存在通讯延迟五、Flume【重点】 a) source/channel/sink组成、事务
taildir source:断点续传、多目录 (1) 开始版本:apache1.7 cdh1.6 (2) 没有taildir想实现断点续传:自定义 (3) taildir挂了:会导致数据重复,不会导致数据丢失 (4) 数据重复: 1)自定义source,实现事务(效率低) 2)下一级处理(离线:hive/spark,实时:sparkstreaming/flink):group by、开窗(在窗口内部只取一条) 3)不处理 (5) 递归遍历文件夹,然后读取文件:taildir不支持,需自定义channel file channel:基于磁盘,可靠性高,传输效率低 容量:100万event 优化:配置多目录(多个磁盘) memory channel:基于内存,可靠性差,传输效率高 容量:100event kafka channel:基于磁盘(数据存在kafka),可靠性高,传输效率高于memory channel + kafka sink 开始版本:flume1.6,但有bug(parseAsFlumeEvent配置false无效,Event带header),需要ETL数据清洗,1.7解决bug 在生产环境选择: (1)下一级是kafka,选择kafka channel (2)传输普通的日志,对可靠性要求不高,选择memory channel (3)传输和钱相关的,对可靠性要求高,选择file channelhdfs sink:主要注意小文件 参数配置:时间(1小时) or 大小(128m)、event个数(0)事务:Source到Channel是Put事务,Channel到Sink是Take事务b) 拦截器、选择器、监控器
拦截器 (1) ETL拦截器:校验json完整性 (2) 自定义拦截器步骤: 1)定义类实现interceptor接口 2)重写4个方法(初始化、关闭、单event、多event) + 静态内部类Builder 3)打包、上传flume/lib、在配置文件里面关联全类名$Builder (3) 拦截器可以不用;对速度要求高,下一级处理(离线:hive的dwd层,实时:sparkstreaming/flink);对速度要求不高,在前面加拦截器过滤选择器 replicating(默认):把接收的数据发往下一级所有通道 multiplexing:选择性发往对应通道(start event),如根据业务划分:启动、页面、故障、action监控器:ganglia 优化: (1)提高内存:flume_env.sh中-Xmx/-Xms相同(4-6g) (2)同时增加日志服务器和flume台数。日志服务器配置(属于javaee,服务器配置比较低:16g/32/64g内存,8t磁盘)c) 优化 file channel:配置多目录(多个磁盘) hdfs sink:时间(1小时) or 大小(128m)、event个数(0) 监控器:调整内存 d) flume挂了: sink无影响,channel到sink有事务,source到channel有事务 memory channel会丢,但默认最多只丢失100event,file/kafka channel基于磁盘不会丢 taildir source会数据重复,需要在后级清洗
六、Kafka【重点】
基本信息 (1) 组成:producer生产者、broker、consumer消费者、ZK(存储了brokerId、consumer等信息) (2) 安装台数:2(生产者峰值速率副本/100)+1,一般是3台 (3) 压测:测试生产者峰值生产速率,自带脚本kafka-producer-perf-test.sh (4) 副本:2/3个,2个的居多 副本多好处坏处:可靠性高,但增加了网络传输 (5) kafka监控:eagle (6) 数据量: 100万日活,1人次100条埋点 => 100万100条=1亿条 1条日志1k => 总大小:1亿条1k=100g kafka中平均每秒多少条:1亿条/(243600)=1150条/秒 kafka中平均每秒多少兆:1m/s 峰值:30m/s,不要超过50m/s(因为3台kafka,根据公式算峰值速度不超过50m/s) (7) 数据默认保存7天,生产环境3天就够(因为是当天就消费了,而且日志服务器通常保存30天) (8) 磁盘空间预留30%:100g2个副本3天/0.7 (9) 分区设置:一般3-10个分区 先设置一个分区,目标吞吐量Tt/min(生产者Tp,消费者Tc) 如:Tt=100m/s,tp=20m/s,tc=40m/s => 100/20=5个分区(消费要一个分区对应一个CPU) (10) ISR队列:leader挂了,isr队列里的都有条件当leader 进入队列条件:老版本:延迟时间、延迟条数;新版本:延时时间 (11) 分区分配策略 1)Range平均(默认策略),容易数据倾斜 2)RoundRobin hash随机打散,轮询 3)Sticky 最优平均 Range和RoundRobin: Range:Range是对每个Topic而言的(即一个Topic一个Topic分)。 先对同一个Topic的分区按序号排序,并对消费者按字母顺序排序,然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如除不尽,则前几个消费者线程将会多消费一个分区。 例如:有10个分区,两个消费者(C1,C2),3个消费者线程,10/3=3而且除不尽。 C1-0 将消费 0, 1, 2, 3 分区 C2-0 将消费 4, 5, 6 分区 C2-1 将消费 7, 8, 9 分区 RoundRobin:前提:同一个Consumer Group里面的所有消费者的num.streams(消费者消费线程数)必须相等,且每个消费者订阅的主题必须相同。 先将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode排序,最后轮询发给每一个消费线程。 (12) kafka topic个数:日志类型个数,满足下一级所有消费者Kafka挂了 短期:flume的channel会继续缓存新数据 长期:日志服务器可以保存30天数据丢了 设置ack: 0 只发送 可靠性最差 传输效率最快 1 leader应答 可靠性一般 传输效率一般 -1 leader+follower应答 可靠性最高 传输效率最差 生产环境:不选择0,最多的是1,因为大多情况传输普通日志,准确性要求高的场景选择-1,如金融重复了 (1) 事务 + 幂等性(pid+Sequence Number,pid重启会变化,可以通过Transaction ID绑定原来pid) + ack=-1 可靠性越高,效率越低,生产环境用的少 (2) 下一级处理:dwd层用sparkstreaming/flink处理积压了 kafka自身:加分区(增加并发度),同时增加消费者CPU 提高flume消费速率:batchsize 1000条/s => 2000条/s优化 (1) 2副本、保存3天、加大副本间传输时间 (2) 默认生产不压缩,可以设置消费者支持的压缩格式 (3) kafka内存:bin/kafka-server-start.sh中export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"设置4-6g内存 (4) 增加kafka台数其他 (1) 为什么能够高效读写: 1)分布式集群 分区 2)顺序读写600m/s,随机读写100m/s 3)零拷贝 (2) Kafka传输一条2M日志会发生什么问题? 卡死,不能生产也不能消费数据。默认1M,需要修改两个配置 (3) Kafka过期数据清理七、Hive
组成 (1) 左侧:元数据(mysql存储) (2) 右侧:客户端 编译器、解析器、优化器、执行器 计算引擎:mr(大数据,周/月)、tez(小数据,测试)、spark(周期定时任务) HDFS与mysql的区别 hivemysql数据量大小速度大快小快操作查询增删改查 内部表外部表区别 (1) 删除情况: 内部表:原始数据、元数据全删除 外部表:只删除元数据 (2) 生产环境绝大多数是外部表,自己使用的临时表是内部表4个by (1) order by:全局排序,生产环境很少使用,只在一个reduce中容易数据倾斜 (2) sort by:排序 (3) distribute by:分区,通常是分区+排序结合使用 (4) cluster by:等于distrbute by+sort by且分区和排序字段相同,只能升序 (5) group by:和distribute by类似,都是按key值划分数据,都使用reduce操作,distribute by按照key列把数据分散到不同的reduce,而group by把相同key的数据聚集到一起,后续必须是聚合操作函数 (1) 系统函数 1)date_add、date_sub:加减日期 2)next_day:下周一 3)last_day:当月最后一天 4)collect_set:聚合函数,分组中的某列转为一个Set(去重),区别于collect_list 5)date_format:日期格式 6)current_date:当前日期 7)get_json_object:解析json 8)NVL(表达式1, 表达式2),表达式1为空值返回表达式2的值,否则返回表达式1的值,要求类型一致 (2) 自定义函数 UDF一进一出:继承UDF,重写evaluate方法 UDTF一进多出:继承GenericUD,重写初始化(输入参数个数、类型,输出值类型、名称)、关闭、process方法 打包=>上传到HDFS路径=>创建永久函数 自定义函数优点: 1)自定义Log打印日志,出错或者数据异常,方便调试 2)可以调用第三方jar包 (3) 窗口函数 1)RANK():排序相同时会重复,总数不会变(1,2,2,4) DENSE_RANK():排序相同时会重复,总数会减少(1,2,2,3) ROW_NUMBER():会根据顺序计算(1,2,3,4) 2)手写topN优化 (1) 默认打开mapjoin (2) 行列过滤:先join后where改为 先where后join (3) 合理设置map个数和reduce个数 公式:min(0,max(块大小, long的最大值)),128m数据=>1g内存 (4) 小文件 1)combinehiveinputformart减少切片个数,则减少maptask个数 2)jvm重用 3)merge:开启一个mr任务合并小文件(小于16m合并为256m) maponly任务,此功能默认打开;mr任务,此功能需要手动打开 (5) 在map阶段开启combiner (6) 采用压缩 (7) 列式存储 idnameage1zs192li203wu21行式存储:1 zs 19 2 li 20 3 wu 21 列式存储:1 2 3 zs li wu 19 20 21 (8) 分区技术:按天分区 (9) 合理选择引擎:mr tez spark 7. 数据倾斜 (1) join字段类型不同 解决:select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string) (2) 空值会全部进入key为空的reduce中,加随机数或删除解决 (3) 解决方案: 1)group by优于distinct,hadoop3.x底层对distinct进行了优化 2)mapjoin,小表缓存,小表join大表 3)开启数据倾斜时负载均衡 8. 其他 (1) 空值:hive中\N,mysql中null (2) 建表分隔符是\001或\t(旧版)时,和hive默认字段分隔符冲突,会导致存储数据错位而报类型错误。 可以修改分隔符或者预先进行转义 (3) mysql存储元数据,可以用keepalived实现HA (4) union结果集去重;union all不对结果集去重,效率高
八、Sqoop
用sqoop遇到哪些问题? (1) 空值: hivemysql\Nnull(2) 一致性问题:–staging-table选项,多个事务将数据存入临时表,然后一个事务一次性写入 2. 每天晚上几点执行:00:30分 3. 每天导多少数据?执行多久? 100万日活:10万订单,没人订单数据10条,每条1k => 10万*10条=1g数据 40分钟-50分钟 4. 每天订单、评价、支付数据? 1g数据/30张表 = 34m,下单是平均值的2-5倍 => 100m 5. 执行参数 –connect mysql地址 –root –000000 –target-dir hdfs://hadoop102:8020/user/ –delete-target-dir –map –compress –compress codec –null –分隔符 6. 并行化及并行化时发生数据倾斜 并行度:num-mappers 由于split-by的默认切分策略不均匀,导致数据分割倾斜,可以手动创建临时的理想字段,并指定按照此字段分割数据 ROWNUM()生成均匀分布的字段,–split-by指定这个字段 7. sqoop不能直接导出parquet表 mysql不支持: (1) 可以先复制到临时表转成txt格式 (2) ads不建parquet表
九、Azkaban
挂了任务挂了 自动重试、手动重跑 挂了通知:邮件、电话(www.oneatler.com)执行时间:00:30每天跑多少任务? 100-200,平时100,节假日、活动200