TQ介绍
数据、要求如下表:
在上一篇博文的Java工程基础上(已经导入jar),编写项目代码:
MyTQ.java主类
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class MyTQ {
public static void main(String
[] args
) throws Exception
{
Configuration conf
= new Configuration(true);
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(MyTQ
.class);
Path input
= new Path(args
[0]);
FileInputFormat
.addInputPath(job
,input
);
Path output
= new Path(args
[1]);
FileSystem fs
= output
.getFileSystem(conf
);
if(fs
.exists(output
)){
fs
.delete(output
,true);
}
FileOutputFormat
.setOutputPath(job
,output
);
job
.setMapperClass(TMapper
.class);
job
.setMapOutputKeyClass(TQ
.class);
job
.setOutputValueClass(IntWritable
.class);
job
.setPartitionerClass(TPartitioner
.class);
job
.setSortComparatorClass(TSorter
.class);
job
.setGroupingComparatorClass(TGroupComparator
.class);
job
.setReducerClass(TReduce
.class);
job
.setNumReduceTasks(2);
job
.waitForCompletion(true);
}
}
TMapper.java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import org
.apache
.hadoop
.util
.StringUtils
;
import java
.io
.IOException
;
import java
.text
.ParseException
;
import java
.text
.SimpleDateFormat
;
import java
.util
.Calendar
;
import java
.util
.Date
;
public class TMapper extends Mapper<LongWritable, Text,TQ, IntWritable> {
TQ mkey
= new TQ();
IntWritable mval
= new IntWritable();
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
try {
String
[] strs
= StringUtils
.split(value
.toString(), '\t');
SimpleDateFormat sdf
= new SimpleDateFormat("yyyy-MM-dd");
Date date
= sdf
.parse(strs
[0]);
Calendar cal
= Calendar
.getInstance();
cal
.setTime(date
);
mkey
.setYear(cal
.get(Calendar
.YEAR
));
mkey
.setMonth(cal
.get(Calendar
.MONTH
)+1);
mkey
.setDay(cal
.get(Calendar
.DAY_OF_MONTH
));
int wd
= Integer
.parseInt(strs
[1].substring(0,strs
[1].lastIndexOf('c')));
mkey
.setWd(wd
);
mval
.set(wd
);
context
.write(mkey
,mval
);
} catch (ParseException e
) {
e
.printStackTrace();
}
}
}
TQ.java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class TQ implements WritableComparable<TQ> {
private int year
;
private int month
;
private int day
;
private int wd
;
public int getYear() {
return year
;
}
public void setYear(int year
) {
this.year
= year
;
}
public int getMonth() {
return month
;
}
public void setMonth(int month
) {
this.month
= month
;
}
public int getDay() {
return day
;
}
public void setDay(int day
) {
this.day
= day
;
}
public int getWd() {
return wd
;
}
public void setWd(int wd
) {
this.wd
= wd
;
}
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeInt(year
);
out
.writeInt(month
);
out
.writeInt(day
);
out
.writeInt(wd
);
}
@Override
public void readFields(DataInput in
) throws IOException
{
this.setYear(in
.readInt());
this.setMonth(in
.readInt());
this.setDay(in
.readInt());
this.setWd(in
.readInt());
}
@Override
public int compareTo(TQ that
) {
int c1
= Integer
.compare(this.getYear(),that
.getYear());
if(c1
== 0){
int c2
= Integer
.compare(this.getMonth(),that
.getMonth());
if(c2
== 0){
return Integer
.compare(this.getDay(),that
.getDay());
}
return c2
;
}
return c1
;
}
}
TPartitioner .java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.mapreduce
.Partitioner
;
public class TPartitioner extends Partitioner<TQ, IntWritable> {
@Override
public int getPartition(TQ key
, IntWritable value
, int numPartitions
) {
return key
.getYear() % numPartitions
;
}
}
TSorter .java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import org
.apache
.hadoop
.io
.WritableComparator
;
public class TSorter extends WritableComparator {
public TSorter(){
super(TQ
.class,true);
}
TQ t1
= null
;
TQ t2
= null
;
@Override
public int compare(WritableComparable a
, WritableComparable b
) {
t1
= (TQ
) a
;
t2
= (TQ
) b
;
int c1
= Integer
.compare(t1
.getYear(),t2
.getYear());
if(c1
== 0){
int c2
= Integer
.compare(t1
.getMonth(),t2
.getMonth());
if(c2
== 0){
return -Integer
.compare(t1
.getWd(),t2
.getWd());
}
return c2
;
}
return c1
;
}
}
TGroupComparator .java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.RawComparator
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import org
.apache
.hadoop
.io
.WritableComparator
;
public class TGroupComparator extends WritableComparator {
public TGroupComparator(){
super(TQ
.class,true);
}
TQ t1
= null
;
TQ t2
= null
;
@Override
public int compare(WritableComparable a
, WritableComparable b
) {
t1
= (TQ
) a
;
t2
= (TQ
) b
;
int c1
= Integer
.compare(t1
.getYear(),t2
.getYear());
if(c1
== 0){
return Integer
.compare(t1
.getMonth(),t2
.getMonth());
}
return c1
;
}
}
TReduce .java
package com
.hpe
.hadoop
.mr
.tq
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class TReduce extends Reducer<TQ, IntWritable, Text,IntWritable> {
Text rkey
= new Text();
IntWritable rval
= new IntWritable();
@Override
protected void reduce(TQ key
, Iterable
<IntWritable> values
, Context context
) throws IOException
, InterruptedException
{
int flag
= 0;
int day
= 0;
for (IntWritable value
: values
) {
if(flag
== 0){
rkey
.set(key
.getYear()+"-"+key
.getMonth()+"-"+key
.getDay());
rval
.set(key
.getWd());
context
.write(rkey
,rval
);
day
= key
.getDay();
flag
++;
}
if(flag
!=0 && day
!=key
.getDay()){
rkey
.set(key
.getYear()+"-"+key
.getMonth()+"-"+key
.getDay());
rval
.set(key
.getWd());
context
.write(rkey
,rval
);
break;
}
}
}
}
如上篇博文一样打jar包,在集群上运行
补充
遇到java.lang.RuntimeException: java.io.EOFException问题时,可以参考博文java.io.EOFException
转载请注明原文地址:https://tech.qufami.com/read-19004.html