2020-08-23HDFS客户端操作

tech2026-01-30  6

1. HDFS客户端环境准备

根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径(例如:E:\hadoop)配置HADOOP_HOME环境变量 配置Path环境变量

创建一个Maven工程HdfsClientDemo

导入相应的依赖坐标+日志添加 版本根据你们自己的情况改

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.0</version> </dependency> </dependencies>

创建包名:com.ersan.hdfs

创建TestHDFS类

package com.ersan.hdfs.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/4 * @Description */ public class TestHDFS { public static void main(String[] args) throws Exception{ //1 获取文件系统 Configuration conf=new Configuration(); //配置在集群上运行 conf.set("fs.defaultFS","hdfs://192.168.137.34:9000"); // FileSystem fs=FileSystem.get(conf); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"),conf, "root"); //2 创建目录 fs.mkdirs(new Path("/test/ersan/myself")); //3 关闭资源 fs.close(); } }

HDFS文件上传(测试参数优先级)

package com.ersan.hdfs.hd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/4 * @Description * HDFS文件上传(测试参数优先级) */ public class TestCopyFromLocalFile { public static void main(String[] args) throws Exception{ // 获取文件 Configuration conf=new Configuration(); conf.set("dfs.replication","2"); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"), conf,"root"); //2 上传文件 fs.copyFromLocalFile(new Path("E:\\diray/11.txt"),new Path("/test/ersan/myself.txt")); //3 关闭资源 fs.close(); System.out.println("over"); } }

HDFS文件下载

package com.ersan.hdfs.hd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/4 * @Description 文件下载 */ public class TestCopyTolocalFile { public static void main(String[] args) throws Exception{ //1 获取文件系统 Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"), conf,"root"); //2 执行下载操作 //boolean delSrc 是否将源文件删除 //Path src 要下载的文件路径 //Path dst 将文件下载到的路径 // boolean useRawLocalFilesSstem 是否开启文件效验 fs.copyToLocalFile(false,new Path("/test/ersan/myself.txt",true), new Path("E:\\diray/111.txt")); //3 关闭资源 fs.close(); } }

HDFS文件夹删除

package com.ersan.hdfs.hd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/4 * @Description */ public class TestDelete { public static void main(String[] args) throws Exception{ //1 获取系统文件 Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"),conf,"root"); //2 执行删除 fs.delete(new Path("/myself.txt"),true); //3 关闭资源 fs.close(); } }

HDFS文件名更改

package com.ersan.hdfs.hd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/4 * @Description */ public class TestRename { public static void main(String[] args) throws Exception{ //1 获取文件 Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"),conf,"root"); //2 修改文件名 fs.rename(new Path("/test/ersan/myself.txt"),new Path("/test/ersan/m.txt")); //3 关闭资源 } }

HDFS文件详情查看

package com.ersan.hdfs.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.net.URI; /** * @Author :Ersan * @Date 2020/9/8 * @Description */ public class TestListFiles { public static void main(String[] args) throws Exception{ //1获取文件系统 Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.137.34:9000"),conf,"root"); //2 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles=fs.listFiles(new Path("/"),true); while(listFiles.hasNext()){ LocatedFileStatus status = listFiles.next(); //输出详情 //文件名称 System.out.println(status.getPath().getName()); //长度 System.out.println(status.getLen()); //权限 System.out.println(status.getPermission()); //分组 System.out.println(status.getGroup()); //获取存储的信息 BlockLocation[] blockLocations=status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations){ //获取快存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-------------------"); } //关闭资源 fs.close(); } }

HDFS文件和文件夹判断

@Test public void testListStatus() throws IOException, InterruptedException, URISyntaxException{ // 1 获取文件配置信息 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.137.34:9000"), configuration, "root"); // 2 判断是文件还是文件夹 FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { // 如果是文件 if (fileStatus.isFile()) { System.out.println("f:"+fileStatus.getPath().getName()); }else { System.out.println("d:"+fileStatus.getPath().getName()); } } // 3 关闭资源 fs.close(); }

上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢? 我们可以采用IO流的方式实现数据的上传和下载。

HDFS的I/O流操作

@Test public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.137.34:9000"), configuration, "root"); // 2 创建输入流 FileInputStream fis = new FileInputStream(new File("e:/banhua.txt")); // 3 获取输出流 FSDataOutputStream fos = fs.create(new Path("/banhua.txt")); // 4 流对拷 IOUtils.copyBytes(fis, fos, configuration); // 5 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }

HDFS文件下载

需求:从HDFS上下载banhua.txt文件到本地e盘上 // 文件下载 @Test public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{ // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.137.34:9000"), configuration, "root"); // 2 获取输入流 FSDataInputStream fis = fs.open(new Path("/banhua.txt")); // 3 获取输出流 FileOutputStream fos = new FileOutputStream(new File("e:/banhua.txt")); // 4 流的对拷 IOUtils.copyBytes(fis, fos, configuration); // 5 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }

定位文件读取

需求:分块读取HDFS上的大文件,比如根目录下的/hadoop-2.6.0.tar.gz @Test public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{ // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.137.34:9000"), configuration, "root"); // 2 获取输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.6.0.tar.gz")); // 3 创建输出流 FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.6.0.tar.gz.part1")); // 4 流的拷贝 byte[] buf = new byte[1024]; for(int i =0 ; i < 1024 * 128; i++){ fis.read(buf); fos.write(buf); } // 5关闭资源 IOUtils.closeStream(fis); IOUtils.closeStream(fos); } @Test public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{ // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.137.34:9000"), configuration, "root"); // 2 打开输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.6.0.tar.gz")); // 3 定位输入数据位置 fis.seek(1024*1024*128); // 4 创建输出流 FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.6.0.tar.gz.part2")); // 5 流的对拷 IOUtils.copyBytes(fis, fos, configuration); // 6 关闭资源 IOUtils.closeStream(fis); IOUtils.closeStream(fos); }
最新回复(0)