IDEA开发第二个Hadoop程序TQ

tech2024-11-08  16

TQ介绍

数据、要求如下表:

在上一篇博文的Java工程基础上(已经导入jar),编写项目代码:

MyTQ.java主类 package com.hpe.hadoop.mr.tq; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author achu * @create 2020-09-03 14:28 */ public class MyTQ { public static void main(String[] args) throws Exception { // 读取配置文件 Configuration conf = new Configuration(true); // 创建一个作业 Job job = Job.getInstance(conf); // 指定作业的运行时类 job.setJarByClass(MyTQ.class); // 指定输入输出 Path input = new Path(args[0]); FileInputFormat.addInputPath(job,input); Path output = new Path(args[1]); // 获取文件系统 FileSystem fs = output.getFileSystem(conf); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job,output); /**MapTask * 1. map分为输入输出,产生key,value映射,key需要实现一个自定义类(包括年月日,温度) * 2. map输出以后会进入一个buffer环形缓冲区,再进入缓冲区前需要分区 * 3. 实现一个排序比较器 比较年月日 温度 * 4. map输出产生的中间结果集,由reduce出发的shuffle拉取所有map产生的数据,再进行分组 * 5. map端实现分区器 */ job.setMapperClass(TMapper.class); // 指定输出key类型和value类型 job.setMapOutputKeyClass(TQ.class); job.setOutputValueClass(IntWritable.class); // 设置分区的个数 job.setPartitionerClass(TPartitioner.class); // 实现排序比较器 job.setSortComparatorClass(TSorter.class); /**ReduceTask 实现分组比较器*/ job.setGroupingComparatorClass(TGroupComparator.class); job.setReducerClass(TReduce.class); // 设置Reduce的个数 job.setNumReduceTasks(2); // 提交作业 job.waitForCompletion(true); } } TMapper.java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; /** * @author achu * @create 2020-09-03 16:09 * 1. 需要重写map方法 * 2. 每一行数据可以用value获取,key中放置的是偏移量 * 3. 对每一行数据进行切割,会把年月日取出来,将温度的C去掉,最终将这几个属性放入TQ对象中 */ public class TMapper extends Mapper<LongWritable, Text,TQ, IntWritable> { // 输出key TQ mkey = new TQ(); // 输出value IntWritable mval = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1949-10-01 14:21:02 34c // 1949-10-01 19:21:02 38c try { // 对字符串进行切割 String[] strs = StringUtils.split(value.toString(), '\t'); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // 将字符串转化成一个日期对象 Date date = sdf.parse(strs[0]); Calendar cal = Calendar.getInstance(); cal.setTime(date); // 给输出对象赋值 mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int wd = Integer.parseInt(strs[1].substring(0,strs[1].lastIndexOf('c'))); mkey.setWd(wd); mval.set(wd); context.write(mkey,mval); } catch (ParseException e) { e.printStackTrace(); } } } TQ.java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author achu * @create 2020-09-03 14:52 * 自定义类型需要序列化反序列化和比较器,这里需要实现一个接口WritableComparable */ public class TQ implements WritableComparable<TQ> { private int year;// 年 private int month;// 月 private int day;// 天 private int wd;// 温度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getWd() { return wd; } public void setWd(int wd) { this.wd = wd; } // 序列化:将对象数据写入到字节数组 @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(wd); } // 反序列化:从字节数组中读回数据,然后给属性赋值 @Override public void readFields(DataInput in) throws IOException { this.setYear(in.readInt()); this.setMonth(in.readInt()); this.setDay(in.readInt()); this.setWd(in.readInt()); } /** * 实现比较 * 返回值为0相等 小于0:x<y 大于0:x>y */ @Override public int compareTo(TQ that) { // 比较年份 int c1 = Integer.compare(this.getYear(),that.getYear()); if(c1 == 0){ // 比较月份 int c2 = Integer.compare(this.getMonth(),that.getMonth()); // 比较日 if(c2 == 0){ return Integer.compare(this.getDay(),that.getDay()); } return c2; } return c1; } } TPartitioner .java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author achu * @create 2020-09-03 17:09 */ public class TPartitioner extends Partitioner<TQ, IntWritable> { @Override public int getPartition(TQ key, IntWritable value, int numPartitions) { // 每个map输出的key value都要执行这个方法 return key.getYear() % numPartitions; } } TSorter .java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author achu * @create 2020-09-03 18:49 */ public class TSorter extends WritableComparator { public TSorter(){ super(TQ.class,true); } TQ t1 = null; TQ t2 = null; // 年月温度排序 且温度降序排序 @Override public int compare(WritableComparable a, WritableComparable b) { t1 = (TQ) a; t2 = (TQ) b; int c1 = Integer.compare(t1.getYear(),t2.getYear()); if(c1 == 0){ int c2 = Integer.compare(t1.getMonth(),t2.getMonth()); if(c2 == 0){ return -Integer.compare(t1.getWd(),t2.getWd()); } return c2; } return c1; } } TGroupComparator .java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author achu * @create 2020-09-03 19:08 * 分组比较器 */ public class TGroupComparator extends WritableComparator { public TGroupComparator(){ super(TQ.class,true); } TQ t1 = null; TQ t2 = null; // 年月排序 @Override public int compare(WritableComparable a, WritableComparable b) { t1 = (TQ) a; t2 = (TQ) b; int c1 = Integer.compare(t1.getYear(),t2.getYear()); if(c1 == 0){ return Integer.compare(t1.getMonth(),t2.getMonth()); } return c1; } } TReduce .java package com.hpe.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author achu * @create 2020-09-03 19:18 */ public class TReduce extends Reducer<TQ, IntWritable, Text,IntWritable> { // 输出key value Text rkey = new Text(); IntWritable rval = new IntWritable(); @Override protected void reduce(TQ key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int flag = 0; int day = 0; for (IntWritable value : values) { if(flag == 0){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getWd()); context.write(rkey,rval); day = key.getDay(); flag++; } if(flag!=0 && day!=key.getDay()){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getWd()); context.write(rkey,rval); break; } } } }

如上篇博文一样打jar包,在集群上运行

补充

遇到java.lang.RuntimeException: java.io.EOFException问题时,可以参考博文java.io.EOFException

最新回复(0)