1. HDFS客户端环境准备
根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径(例如:E:\hadoop)配置HADOOP_HOME环境变量 配置Path环境变量
创建一个Maven工程HdfsClientDemo
导入相应的依赖坐标+日志添加 版本根据你们自己的情况改
<dependencies
>
<dependency
>
<groupId
>junit
</groupId
>
<artifactId
>junit
</artifactId
>
<version
>4.12</version
>
<scope
>test
</scope
>
</dependency
>
<!-- https
://mvnrepository
.com
/artifact
/org
.apache
.hadoop
/hadoop
-common
-->
<dependency
>
<groupId
>org
.apache
.hadoop
</groupId
>
<artifactId
>hadoop
-common
</artifactId
>
<version
>2.6.0</version
>
</dependency
>
<dependency
>
<groupId
>org
.apache
.hadoop
</groupId
>
<artifactId
>hadoop
-hdfs
</artifactId
>
<version
>2.6.0</version
>
</dependency
>
<dependency
>
<groupId
>org
.apache
.hadoop
</groupId
>
<artifactId
>hadoop
-core
</artifactId
>
<version
>1.2.0</version
>
</dependency
>
</dependencies
>
创建包名:com.ersan.hdfs
创建TestHDFS类
package com
.ersan
.hdfs
.hdfs
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import java
.net
.URI;
public class TestHDFS {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
conf
.set("fs.defaultFS","hdfs://192.168.137.34:9000");
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),conf
,
"root");
fs
.mkdirs(new Path("/test/ersan/myself"));
fs
.close();
}
}
HDFS文件上传(测试参数优先级)
package com
.ersan
.hdfs
.hd
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import java
.net
.URI;
public class TestCopyFromLocalFile {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
conf
.set("dfs.replication","2");
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),
conf
,"root");
fs
.copyFromLocalFile(new Path("E:\\diray/11.txt"),new Path("/test/ersan/myself.txt"));
fs
.close();
System
.out
.println("over");
}
}
HDFS文件下载
package com
.ersan
.hdfs
.hd
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import java
.net
.URI;
public class TestCopyTolocalFile {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),
conf
,"root");
fs
.copyToLocalFile(false,new Path("/test/ersan/myself.txt",true),
new Path("E:\\diray/111.txt"));
fs
.close();
}
}
HDFS文件夹删除
package com
.ersan
.hdfs
.hd
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import java
.net
.URI;
public class TestDelete {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),conf
,"root");
fs
.delete(new Path("/myself.txt"),true);
fs
.close();
}
}
HDFS文件名更改
package com
.ersan
.hdfs
.hd
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import java
.net
.URI;
public class TestRename {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),conf
,"root");
fs
.rename(new Path("/test/ersan/myself.txt"),new Path("/test/ersan/m.txt"));
}
}
HDFS文件详情查看
package com
.ersan
.hdfs
.hdfs
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.*;
import java
.net
.URI;
public class TestListFiles {
public static void main(String
[] args
) throws Exception
{
Configuration conf
=new Configuration();
FileSystem fs
=FileSystem
.get(new URI("hdfs://192.168.137.34:9000"),conf
,"root");
RemoteIterator
<LocatedFileStatus
> listFiles
=fs
.listFiles(new Path("/"),true);
while(listFiles
.hasNext()){
LocatedFileStatus status
= listFiles
.next();
System
.out
.println(status
.getPath().getName());
System
.out
.println(status
.getLen());
System
.out
.println(status
.getPermission());
System
.out
.println(status
.getGroup());
BlockLocation
[] blockLocations
=status
.getBlockLocations();
for (BlockLocation blockLocation
: blockLocations
){
String
[] hosts
= blockLocation
.getHosts();
for (String host
: hosts
) {
System
.out
.println(host
);
}
}
System
.out
.println("-------------------");
}
fs
.close();
}
}
HDFS文件和文件夹判断
@Test
public void testListStatus() throws IOException
, InterruptedException
, URISyntaxException
{
Configuration configuration
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI("hdfs://192.168.137.34:9000"), configuration
, "root");
FileStatus
[] listStatus
= fs
.listStatus(new Path("/"));
for (FileStatus fileStatus
: listStatus
) {
if (fileStatus
.isFile()) {
System
.out
.println("f:"+fileStatus
.getPath().getName());
}else {
System
.out
.println("d:"+fileStatus
.getPath().getName());
}
}
fs
.close();
}
上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢? 我们可以采用IO流的方式实现数据的上传和下载。
HDFS的I/O流操作
@Test
public void putFileToHDFS() throws IOException
, InterruptedException
, URISyntaxException
{
Configuration configuration
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI("hdfs://192.168.137.34:9000"), configuration
, "root");
FileInputStream fis
= new FileInputStream(new File("e:/banhua.txt"));
FSDataOutputStream fos
= fs
.create(new Path("/banhua.txt"));
IOUtils
.copyBytes(fis
, fos
, configuration
);
IOUtils
.closeStream(fos
);
IOUtils
.closeStream(fis
);
fs
.close();
}
HDFS文件下载
需求:从HDFS上下载banhua.txt文件到本地e盘上
@Test
public void getFileFromHDFS() throws IOException
, InterruptedException
, URISyntaxException
{
Configuration configuration
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI("hdfs://192.168.137.34:9000"), configuration
, "root");
FSDataInputStream fis
= fs
.open(new Path("/banhua.txt"));
FileOutputStream fos
= new FileOutputStream(new File("e:/banhua.txt"));
IOUtils
.copyBytes(fis
, fos
, configuration
);
IOUtils
.closeStream(fos
);
IOUtils
.closeStream(fis
);
fs
.close();
}
定位文件读取
需求:分块读取HDFS上的大文件,比如根目录下的/hadoop-2.6.0.tar.gz
@Test
public void readFileSeek1() throws IOException
, InterruptedException
, URISyntaxException
{
Configuration configuration
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI("hdfs://192.168.137.34:9000"), configuration
, "root");
FSDataInputStream fis
= fs
.open(new Path("/hadoop-2.6.0.tar.gz"));
FileOutputStream fos
= new FileOutputStream(new File("e:/hadoop-2.6.0.tar.gz.part1"));
byte
[] buf
= new byte[1024];
for(int i
=0 ; i
< 1024 * 128; i
++){
fis
.read(buf
);
fos
.write(buf
);
}
IOUtils
.closeStream(fis
);
IOUtils
.closeStream(fos
);
}
@Test
public void readFileSeek2() throws IOException
, InterruptedException
, URISyntaxException
{
Configuration configuration
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI("hdfs://192.168.137.34:9000"), configuration
, "root");
FSDataInputStream fis
= fs
.open(new Path("/hadoop-2.6.0.tar.gz"));
fis
.seek(1024*1024*128);
FileOutputStream fos
= new FileOutputStream(new File("e:/hadoop-2.6.0.tar.gz.part2"));
IOUtils
.copyBytes(fis
, fos
, configuration
);
IOUtils
.closeStream(fis
);
IOUtils
.closeStream(fos
);
}