HDFS 文件操作,


hdfs 文件操作,使用FileSystem里提供的方法实现。代码:
package hdfs.fs.nefu;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class hdfsFileOperation {
	
	public static void main(String[] args) throws IOException {
	/*HDFS文件操作 主要是通过已有的方法来进行实现
	 * @param 1.获取configuration对象
	 * @param 2.获取FileSystem对象
	 * @param 3.进行文件操作
	 * */
		//创建文件
		createFiles();
		//创建目录
		createDir();
		//重命名文件
		fileRename();
		//删除文件
		deleteFile();
		//上传文件
		uploadFile();
		//查看文件是否存在
		fileExists();
		//查看文件最后修改时间
		fileModifyTime();
		//查看目录下的所有文件
		showFile();
		//查看文件所在位置
		showLocation();
		//获取集群的名称节点
		showDatanodeName();
	}

	private static void showDatanodeName() throws IOException {
		Configuration conf = new Configuration();
		FileSystem file = FileSystem.get(conf);
		DistributedFileSystem hdfs = (DistributedFileSystem)file;
		//获取各个节点信息
		DatanodeInfo[] datanodeStatus = hdfs.getDataNodeStats();
		for(int i=0;i<datanodeStatus.length;i++){
			System.out.println("datanode_name:"+datanodeStatus[i].getHostName());
		}
	}

	private static void showLocation() throws IOException {
		Configuration conf = new Configuration();
		FileSystem file = FileSystem.get(conf);
	
		Path dest = new Path("/usr/hadoop/hello");
		
		FileStatus filestatus = file.getFileStatus(dest);
		//获取文件块信息
		BlockLocation[] location = file.getFileBlockLocations(filestatus, 0, filestatus.getLen());
		//可存在多个节点
		int length = location.length;
		for(int i=0;i<length;i++){
			/*@param getHosts:Get the list of hosts (hostname) hosting this block return String*/
			String[] host = location[i].getHosts();
			System.out.println("block_"+i+host[0]);
		}
	}

	private static void showFile() throws IOException {
		Configuration conf = new Configuration();
		FileSystem file = FileSystem.get(conf);
	
		Path dest = new Path("/");
		
		FileStatus files[] = file.listStatus(dest);
		//显示文件
		for(int i=0;i<files.length;i++){
			System.out.println(files[i].getPath().toString());
		}
	}

	private static void fileModifyTime() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		
		Path path = new Path("hdfs://localhost/hello/text.txt");
		//获取文件信息
		FileStatus fstatus = fs.getFileStatus(path);
		//获取修改时间
		long modify_time = fstatus.getModificationTime();
		System.out.println("文件修改时间"+modify_time);
	}

	private static void fileExists() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		Path path = new Path("/test2");
		boolean isExists = fs.exists(path);
		System.out.println(isExists?"文件存在":"文件不存在");
	}

	private static void uploadFile() throws IOException {
		Configuration conf = new Configuration();
		FileSystem file = FileSystem.get(conf);
		
		//本地文件 在java api中文件名都是字符串,而在hadoop中的文件名都是path对象
		Path src = new Path("D:\\log.txt");
		
		//HDFS文件 
		Path dest = new Path("/");
		
		file.copyFromLocalFile(src, dest);
		System.out.println("Upload to "+conf.get("fs.default.name"));
		
		//获取文件信息
		FileStatus files[] = file.listStatus(dest);
		
		//输出信息
		for(FileStatus value:files){
			System.out.println(value.getPath());
		}
	}

	private static void deleteFile() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		Path path =  new Path("/test1");
		/**
		 * @param path 待删除的文件
		 * @param false :if path is a directory and set to true, 
		 * the directory is deleted else throws an exception. 
		 * In case of a file the recursive can be set to either true or false. 
		 * */
		fs.delete(path, false);
	}

	private static void fileRename() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		
		Path file1 = new Path("/test1");
		Path file2 = new Path("/test2");
		boolean isRename = fs.rename(file1, file2);
		
		System.out.println(isRename?"已经重命名":"未被重命名");
	}

	private static void createDir() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		
		Path dst = new Path("hdfs://localhost/hello/Dir");
		fs.mkdirs(dst);
	}
	private static void createFiles() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		
		Path dst = new Path("hdfs://localhost/hello");
		
		//文件内容 字节数组
		byte[] buffer ="hello xd".getBytes();
		//创建文件是一个输出流
		FSDataOutputStream output = fs.create(dst);
		//写入
		output.write(buffer,0,buffer.length);
	}
}
代码均在eclipse上面调试,书写,可能会存在着出入。



相关内容