IDEA开发第一个Hadoop程序WordCount

tech2022-09-20  124

1. 新建Java项目

2. 导入开发Hadoop所需的jar包

此处jar包比较多,可以专门整理下放在一个文件夹里。我放在了D:\source\hadoop\hadoop-lib目录下。导入jar包过程参照博客idea导入jar

3. 编写程序

主类MyWC.java package com.hpe.hadoop.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author achu * @create 2020-09-01 10:19 */ public class MyWC { public static void main(String[] args) throws Exception { // 1.读取配置文件 Configuration conf = new Configuration(true); // 创建作业并设置名称 Job job = Job.getInstance(conf); job.setJobName("myjob"); // 2.设置作业的处理类 job.setJarByClass(MyWC.class); // 3.设置作业的输入数据路径和输出结果路径 // 输入路径 Path input = new Path(args[0]); FileInputFormat.addInputPath(job,input); // 4. 判断输出路径是否存在,若存在则删除 Path output = new Path(args[1]); // 根据输出目录获取分布式文件系统对象fs FileSystem fs = output.getFileSystem(conf); if(fs.exists(output)){ fs.delete(output,true); } // 设置输出路径 FileOutputFormat.setOutputPath(job,output); // 5.设置作业处理的Mapper类 job.setMapperClass(MyMapper.class); // 6. 设置作业处理的Reduce类 job.setReducerClass(MyReducer.class); // 7.设置Mapper输出key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 8.提交作业 job.waitForCompletion(true); } } 逻辑实现类MyMapper.java package com.hpe.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; /** * @author achu * @create 2020-09-01 11:05 */ public class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // StringTokenizerzi是一个接口,处理字符串,进行分割,最终形成一个迭代器 StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ // 指定key word.set(itr.nextToken()); // 通过上下文对象指定Map的输出key和value context.write(word,one); } } } 逻辑实现类MyReducer.java package com.hpe.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author achu * @create 2020-09-02 8:29 * Map输出: * <hello,1> * <world,1> * <world,1> * Reduce输入: * <hello,list(1)> * <world,list(1,1)> * Reduce输出 * <hello,1> * <world,2> */ public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 输出的value private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 存放每个key对应的的单词总数 int sum = 0; // 遍历value集合 for (IntWritable value : values) { // 将Hadoop类型转化为Java类型,再累加 sum += value.get(); } // 将java类型转化为hadoop类型 result.set(sum); // 通过上下文对象输出key和value context.write(key,result); } }

4. 将所编写的程序打成jar包

打jar包的过程参照博客idea打jar包

5. 将jar包上传到集群上

使用WinSCP工具上传jar包

6. 安装zip命令,删除jar包中一个文件

安装命令 安装命令zip: yum install zip #提示输入时,请输入y; 安装命令unzip:yum install unzip #提示输入时,请输入y;删除jar包中一文件 zip -d hadoop.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF

7. 执行jar

命令:hadoop jar 具体Jar名称 主类全类名 输入参数文件(或文件夹) 输出路径文件夹 hadoop jar hadoop.jar com.hpe.hadoop.mr.wc.MyWC /user/root/hello.txt /output1

补充

访问分布式文件系统web网址:http://192.168.228.11:50070访问resourcemanager网址:http://192.168.228.13:8088/cluster基础程序,测试该程序之前需要下载配置文件core-site.xml,hdfs-site.xml package com.hpe.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.*; /** * @author achu * @create 2020-08-25 21:13 */ public class TestHDFS { Configuration conf = null; FileSystem fs = null; // 打开 @Before public void conn() throws Exception { // 读取配置文件(true:默认读取) conf = new Configuration(true); fs = FileSystem.get(conf); } // 创建目录 @Test public void mkdir() throws Exception { Path path = new Path("/tempDir"); // 如果目录存在则删除,若不存在则创建 if(fs.exists(path)){ // 删除 fs.delete(path,true); } // 创建 boolean mkdirs = fs.mkdirs(path); } // 上传文件 @Test public void uploadFile() throws Exception { // 准备输入流 InputStream is = new BufferedInputStream(new FileInputStream(new File("D:\\test.txt"))); // 准备输出流,得到输出流对象 Path path = new Path("/tempDir/test1.txt"); FSDataOutputStream fsDataOutputStream = fs.create(path); // 写入文件 IOUtils.copyBytes(is,fsDataOutputStream,conf,true); } // 下载文件 public void downloadFile(){ } // 关闭 @After public void close() throws Exception { fs.close(); } }
最新回复(0)