MapReduce案例-统计手机号总流量

tech2022-08-23  120

map方法和reduce方法都是循环调用的 map方法---每行数据调用一次 reduce方法---每个KV调用一次 只执行一次的代码写在setup和cleanup中

统计每个手机号的总流量

数据格式: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 需求:统计每个手机号的总流量 数据为多个手机号访问不同网站消耗的流量 每个手机号对应多个网站 第二位: 手机号 倒数第二位: 下行流量 倒数第三位: 上行流量

 

public class Flow {     /**      * Map阶段计算每行数据---手机号对应网站总流量---以Key Value的形式输出      * key 手机号      * value 手机号对应网站总流量      *      * Reduce阶段会获取每个手机号对应的多个站点总流量 手机号<总流量1,总流量2...>      * 最终输出key 手机号 value 总流量      *      * 泛型类型:      * LongWritable             每行数据的偏移量      * Text                     行数据      * Text                     手机号      * LongWritable             总流量      */     static class FLowMapper extends Mapper<LongWritable, Text, Text, LongWritable> {         Text k = new Text();         LongWritable v = new LongWritable();         @Override         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             try {                 //将Text类型转为String类型 , 进行数据切分                 //行数据:     1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200                 String line = value.toString();                 //按照空格进行切分                 String[] split = line.split("\\s+");                 //1号索引是手机号                 String phone = split[1];                 //上行流量                 Long upFlow = Long.parseLong(split[split.length - 3]);                 //下行流量                 Long downFlow = Long.parseLong(split[split.length - 2]);                 //给k和v赋值,输出                 k.set(phone);                 v.set(upFlow + downFlow);                 context.write(k, v);             } catch (Exception e) {                 //将错误行打印出来                 System.out.println(value.toString());             }         }     }     static class FlowReducer extends Reducer<Text, LongWritable, Text, LongWritable> {         LongWritable v = new LongWritable();         @Override         protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {             //一个手机号一个迭代器---手机号 <总流量1 , 总流量2 , 总流量3>             Long totalFlow = 0L;             for (LongWritable value : values) {                 totalFlow += value.get();             }             //k是手机号---v是总流量             v.set(totalFlow);             context.write(key, v);         }     }     public static void main(String[] args) throws Exception {         //获取配置对象         Configuration conf = new Configuration();         //获取Job对象         Job job = Job.getInstance(conf, "flow");         //设置实现Mapper的类和实现Reducer的类         job.setMapperClass(FLowMapper.class);         job.setReducerClass(FlowReducer.class);         //设置Map阶段的输出的KV数据类型         //如果Map阶段和Reducer阶段KV数据类型一致---可以省略一个不写         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(LongWritable.class);         //设置Reducer阶段也是最终输出         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(LongWritable.class);         //设置文件输入输出         FileInputFormat.setInputPaths(job, new Path("D://mrdata/flow/input/flow.log"));         FileOutputFormat.setOutputPath(job, new Path("D://mrdata/flow/input/out"));         提交任务,等待任务执行完毕         job.waitForCompletion(true);     } }

 

最新回复(0)