map方法和reduce方法都是循环调用的 map方法---每行数据调用一次 reduce方法---每个KV调用一次 只执行一次的代码写在setup和cleanup中
public class Flow { /** * Map阶段计算每行数据---手机号对应网站总流量---以Key Value的形式输出 * key 手机号 * value 手机号对应网站总流量 * * Reduce阶段会获取每个手机号对应的多个站点总流量 手机号<总流量1,总流量2...> * 最终输出key 手机号 value 总流量 * * 泛型类型: * LongWritable 每行数据的偏移量 * Text 行数据 * Text 手机号 * LongWritable 总流量 */ static class FLowMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text k = new Text(); LongWritable v = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { //将Text类型转为String类型 , 进行数据切分 //行数据: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 String line = value.toString(); //按照空格进行切分 String[] split = line.split("\\s+"); //1号索引是手机号 String phone = split[1]; //上行流量 Long upFlow = Long.parseLong(split[split.length - 3]); //下行流量 Long downFlow = Long.parseLong(split[split.length - 2]); //给k和v赋值,输出 k.set(phone); v.set(upFlow + downFlow); context.write(k, v); } catch (Exception e) { //将错误行打印出来 System.out.println(value.toString()); } } } static class FlowReducer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //一个手机号一个迭代器---手机号 <总流量1 , 总流量2 , 总流量3> Long totalFlow = 0L; for (LongWritable value : values) { totalFlow += value.get(); } //k是手机号---v是总流量 v.set(totalFlow); context.write(key, v); } } public static void main(String[] args) throws Exception { //获取配置对象 Configuration conf = new Configuration(); //获取Job对象 Job job = Job.getInstance(conf, "flow"); //设置实现Mapper的类和实现Reducer的类 job.setMapperClass(FLowMapper.class); job.setReducerClass(FlowReducer.class); //设置Map阶段的输出的KV数据类型 //如果Map阶段和Reducer阶段KV数据类型一致---可以省略一个不写 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置Reducer阶段也是最终输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置文件输入输出 FileInputFormat.setInputPaths(job, new Path("D://mrdata/flow/input/flow.log")); FileOutputFormat.setOutputPath(job, new Path("D://mrdata/flow/input/out")); 提交任务,等待任务执行完毕 job.waitForCompletion(true); } }