利用HDFS java API增删改查操作


利用HDFS java API增删改查操作

在做这个实验的时候需要特别注意下面三个问题:

1、hdfs安全模式需要关闭命令:./hadoop dfsadmin -safemode leave

2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误

3、hadoop集群用户权限的问题,以及各个目录的作用

目前为什么会有这三个问题的原因待查!!!

未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!

从这上面看,0.20.0 可能是不支持的

https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Serengeti Distro:
Apache Hadoop:1.0.1
GreenPlum HD:1.1(Apache Hadoop 1.0.0)
CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)

步骤如下:

工程结构,如图:

工程结构

工程结构

上代码 O(∩_∩)O哈哈~

pom.xml配置如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 "http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.yun.hdfs hdfs 0.0.1-SNAPSHOT UTF-8 maven-assembly-plugin false jar-with-dependencies com.yun.hdfs.WangPan make-assembly package assembly org.apache.hadoop hadoop-core 0.20.2 jar compile

WangPan.java 主方法用于调用:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.yun.hdfs; import java.io.IOException; public class WangPan { private static String result = ""; public static void main(String[] args) { try { // 判断命令输入是否正确 if (args[0] != null && !"".equals(args[0]) && args.length > 0) { if ("upload".equals(args[0])) { result = "upload:" + WangPanUtils.uploadFile(args); } else if ("delete".equals(args[0])) { result = "delete:" + WangPanUtils.deleteFile(args); } else if ("query".equals(args[0])) { if (WangPanUtils.listFile(args) == null) { result = "query:fail!"; } else { result = "query:success"; } } else if ("read".equals(args[0])) { result = "read:" + WangPanUtils.readFile(args); } else { System.out.println("sorry,wo have no this service!"); } System.out.println(result); } else { System.out.println("fail!"); System.exit(1); } } catch (IOException e) { e.printStackTrace(); } } }

WangPanUtils.java增删改查:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 package com.yun.hdfs; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class WangPanUtils { public static String uploadFile(String[] args) throws IOException { String loaclSrc = args[1]; String dst = args[2]; if (args.length < 3) { return "fail"; } InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in, out, 4096, true); return "success"; } public static Path[] listFile(String[] args) throws IOException { if (args.length < 2) { return null; } String dst = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FileStatus[] statu = fs.listStatus(new Path(dst)); Path[] listPaths = FileUtil.stat2Paths(statu); return listPaths; } public static String deleteFile(String[] args) throws IOException { if (args.length < 2) { return "fail"; } String fileName = args[1]; Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(fileName), config); Path path = new Path(fileName); if (!hdfs.exists(path)) { return "fail"; } boolean isDeleted = hdfs.delete(path, false); if (isDeleted) { return "success"; } else { return "fail"; } } public static String readFile(String[] args) throws IOException { if(args.length < 3){ return "fail"; } String dst = args[1]; String newPath = args[2]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FSDataInputStream hdfsInStream = fs.open(new Path(dst)); OutputStream out = new FileOutputStream(newPath); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); while (-1 != readLen) { out.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } out.close(); hdfsInStream.close(); fs.close(); return "success"; } } public static String mkdir(String[] args) throws IOException{ if(args.length < 2){ return "fali"; } String dst = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); Path path = new Path(dst); if (fs.exists(path)) { return "fail"; } fs.mkdirs(path); return "success"; }

PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:

打包命令

打包命令

执行命令在每个方法注释上写明了,执行效果如下:

增删改查效果

增删改查效果

还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功

web hdfs

web hdfs

相关内容