1. 新建Java项目
2. 导入开发Hadoop所需的jar包
此处jar包比较多,可以专门整理下放在一个文件夹里。我放在了D:\source\hadoop\hadoop-lib目录下。导入jar包过程参照博客idea导入jar
3. 编写程序
主类MyWC.java
package com
.hpe
.hadoop
.mr
.wc
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class MyWC {
public static void main(String
[] args
) throws Exception
{
Configuration conf
= new Configuration(true);
Job job
= Job
.getInstance(conf
);
job
.setJobName("myjob");
job
.setJarByClass(MyWC
.class);
Path input
= new Path(args
[0]);
FileInputFormat
.addInputPath(job
,input
);
Path output
= new Path(args
[1]);
FileSystem fs
= output
.getFileSystem(conf
);
if(fs
.exists(output
)){
fs
.delete(output
,true);
}
FileOutputFormat
.setOutputPath(job
,output
);
job
.setMapperClass(MyMapper
.class);
job
.setReducerClass(MyReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(IntWritable
.class);
job
.waitForCompletion(true);
}
}
逻辑实现类MyMapper.java
package com
.hpe
.hadoop
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
import java
.util
.StringTokenizer
;
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one
= new IntWritable(1);
private Text word
= new Text();
@Override
protected void map(Object key
, Text value
, Context context
) throws IOException
, InterruptedException
{
StringTokenizer itr
= new StringTokenizer(value
.toString());
while(itr
.hasMoreTokens()){
word
.set(itr
.nextToken());
context
.write(word
,one
);
}
}
}
逻辑实现类MyReducer.java
package com
.hpe
.hadoop
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result
= new IntWritable();
@Override
protected void reduce(Text key
, Iterable
<IntWritable> values
, Context context
) throws IOException
, InterruptedException
{
int sum
= 0;
for (IntWritable value
: values
) {
sum
+= value
.get();
}
result
.set(sum
);
context
.write(key
,result
);
}
}
4. 将所编写的程序打成jar包
打jar包的过程参照博客idea打jar包
5. 将jar包上传到集群上
使用WinSCP工具上传jar包
6. 安装zip命令,删除jar包中一个文件
安装命令 安装命令zip: yum install zip #提示输入时,请输入y; 安装命令unzip:yum install unzip #提示输入时,请输入y;删除jar包中一文件
zip
-d hadoop
.jar META
-INF
7. 执行jar
命令:hadoop jar 具体Jar名称 主类全类名 输入参数文件(或文件夹) 输出路径文件夹
hadoop jar hadoop
.jar com
.hpe
.hadoop
.mr
.wc
.MyWC
/user
/root
/hello
.txt
/output1
补充
访问分布式文件系统web网址:http://192.168.228.11:50070访问resourcemanager网址:http://192.168.228.13:8088/cluster基础程序,测试该程序之前需要下载配置文件core-site.xml,hdfs-site.xml
package com
.hpe
.hadoop
.hdfs
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IOUtils
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.*
;
public class TestHDFS {
Configuration conf
= null
;
FileSystem fs
= null
;
@Before
public void conn() throws Exception
{
conf
= new Configuration(true);
fs
= FileSystem
.get(conf
);
}
@Test
public void mkdir() throws Exception
{
Path path
= new Path("/tempDir");
if(fs
.exists(path
)){
fs
.delete(path
,true);
}
boolean mkdirs
= fs
.mkdirs(path
);
}
@Test
public void uploadFile() throws Exception
{
InputStream is
= new BufferedInputStream(new FileInputStream(new File("D:\\test.txt")));
Path path
= new Path("/tempDir/test1.txt");
FSDataOutputStream fsDataOutputStream
= fs
.create(path
);
IOUtils
.copyBytes(is
,fsDataOutputStream
,conf
,true);
}
public void downloadFile(){
}
@After
public void close() throws Exception
{
fs
.close();
}
}