1.一堆待处理的文本
2.客户端读取它,submit()提交任务,获取切片文件,配置信息 ,如果是集群模式还是得有必要的jar包
3.job提交信息
4.MR appmaster(项目经理,job的老大负责读取切片信息),计算出MapTask的数量(切了两个片,给你起两个MapTask)
5.有了MapTask,启动之后先调用InputFormat组件去读数(hadoop默认的TextInputFormat,TextInputFormat 是调用LineRecorderReader来读数)
6.读到的kv传给Mapper里面的map方法
7.Context.write(k,v)这个数据从map方法写出去之后就到了shuffle这个阶段
8.shuffle首先进入环形缓冲区(底层是数组,两面用,找一个节点两边写,右边存的是真正的k,v数据,左边存的是元数据,例如索引)环形缓冲区默认大小是100M,80%后反向,也就不会再向里面写了,它会做两件事:第一件事在剩余的20%再找一个节点反向写(还是右边数据,左边元数据)在它反向事还做了一件事,那就是向磁盘溢写文件,溢写前面80%的文件,并且在这里面还要排序 (一面前面80%向外溢写到磁盘,一面那剩下的20%反向写)
8.在环形缓冲区里分区排序 (这里面排序算法用的快排 (之所以用快排是数据在环形缓冲区里并且是用数组存放,所以有条件快排,这个快排排的是索引,提高效率))它是分区内排序(区内有序),分区外无序
9.多个溢写文件,不是一个分区一个文件,一个MapTask可能在溢写时溢出多个溢写文件 ,每个文件区内有序。第一次排序使用快排,在内存里排。
10.多个溢写文件归并为一个大文件,区内有序,归并的前提是区内得有序。数据全放内存放不下,所以要用归并排序。第二次排序mapTask结束
11.所有MapTesk任务完成后,启动相应的reduceTask,MRappMaster告知RedeceTask处理数据的范围
12.有几个分区,起几个ReduceTask
13.ReduceTask主动向MapTask拷贝的,不是MapTask推过来的,拷过来的数据是分区内有序,所以再进行一次合并文件,归并排序。
14.按照key相同分组一次读一组用指针指向key,相同放一组。
15.Reduce(K,V)Context.Write写出去之后写给OutputFormat,默认的OutputFormat是TextOutputFormat调用RecordWriter写到磁盘上
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
map阶段产生的
reduce是拷贝的过来的数据
Shuffle的大致流程为: 1.Maptask会不断收集我们的map()方法输出的kv对,放到内存缓冲区中, 2.当缓冲区达到饱和的时候(默认占比为0.8)就会溢出到磁盘中, 3.如果map的输出结果很多,则会有多个溢出文件,多个溢出文件会被合并成一个大的溢出文件,在文件溢出、合并的过程中,都要调用partitoner进行分组和针对key进行排序(默认是按照Key的hash值对Partitoner个数取模), 4.之后reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据,reducetask会将这些文件再进行合并(归并排序)。 5.合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出每一个键值对的Group,调用UDF函数(用户自定义的方法))