MapReduce案例WordCount

tech2023-01-25  45

WordCount项目

在IDEA中写WordCount案例

前期准备:准备一个文本文档hello.txt 内容如下:

新建Maven项目在pom.xml添加依赖,这里我的Hadoop是2.7.2版本的 <repositories> <repository> <id>cloudera</id> <name>cloudera</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies> 在resources中新建log4j.properties文件添加以下内容 hadoop.root.logger=DEBUG, console log4j.rootLogger = DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n 接下来开始写Mapper的代码 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * LongWritable 偏移量 long,表示该行在文件中的位置,而不是行号 * Text map阶段的输入数据 一行文本信息 字符串类型 String * Text map阶段的数据字符串类型 String * IntWritable map阶段输出的value类型,对应java中的int型,表示行号 */ public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //一行数据 String line = value.toString(); //按照空格切分数据 String[] words = line.split(" "); //遍历数组,把单词变成(word,1)形式交给框架 for (String word : words) { System.out.println(word); context.write(new Text(word), new IntWritable(1)); } } } Reduce的代码 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> { @Override //key 1个key, values 很多个1 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { sum+=val.get(); } context.write(key,new IntWritable(sum)); } } Driver的代码 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) { BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境 args=new String[2]; //hello.txt的文件路径 args[0]= "D:\\hello.txt"; //放置输出结果的文件路径,最好是一个不存在的路径 args[1]="D:\\file\\output\\put"; //1:获取job信息 Configuration configuration=new Configuration (); try { Job job=Job.getInstance (configuration); //2:获取jar包信息 job.setJarByClass (WordCountDriver.class); //3:关联自定义的mapper和reduceer job.setMapperClass (WordCountMap.class); job.setReducerClass (WordCountReduce.class); //4:设置map输出类型数据 job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); //5:设置最终输出数据类型 job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); Path path = new Path(args[1]);// 取第1个表示输出目录参数(第0个参数是输入目录) FileSystem fileSystem = path.getFileSystem(configuration);// 根据path找到这个文件 if (fileSystem.exists(path)) { fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除 } //6:设置数据输入和输出文件路径 FileInputFormat.setInputPaths (job,new Path (args[0])); FileOutputFormat.setOutputPath (job,new Path (args[1])); boolean a = job.waitForCompletion (true); System.out.println (a ); } catch (IOException e) { e.printStackTrace ( ); } catch (InterruptedException e) { e.printStackTrace ( ); } catch (ClassNotFoundException e) { e.printStackTrace ( ); } //提交代码 } }

运行结果:

最新回复(0)