MR之SequenceFile详解


package com.leaf.hadoop.second;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
//$ hadoop jar SequenceWriteSample.jar SequenceWriteSample
public class SequenceWriteSample {

	private static String meg = "hello world";
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();//获取环境变量
		FileSystem fs = FileSystem.get(conf);//获取文件系统
		Path path = new Path("sequenceFile");//定义路径
		Random rand = new Random();
		SequenceFile.Writer write = new SequenceFile.Writer(fs, conf, path, IntWritable.class, Text.class);
		for(int i=0;i<100;i++){
			write.append(new IntWritable(rand.nextInt(100)), new Text(meg));//写操作,隐藏的为每一行添加一个偏移量
			//System.out.println(write.getLength());
		}
		IOUtils.closeStream(write);//将写出流关闭
	}
	
	/*
	 * createWriter()方法的源码;
	 * public static Writer createWriter(FileSystem fs,Configuration conf,Path name,Class keyClass,Class valClass)throws IOException{
		return createWriter(fs,conf,name,keyClass,valClass,getCompressionType(conf));
	}
	*CompressionType类用以对SequenceFile写入的文件设定是否进行压缩处理
	*
	*CompressionType.NONE:表示不对任何数据进行压缩从而直接进行存储
	*CompressionType.RECORD:表示仅仅压缩key而对value不进行压缩
	*CompressionType.BLOCK:表示对所有的key与value都进行压缩
	*/
	
	/**
	 * write实例还隐藏的为每一行添加一个偏移量,用于获取当前文件的移动位置,
	 * 并按一定顺序生成相应的"同步位置"(Sync position),同步位置是
	 * 生成在SequenceFile文件的内部划分位置处,同步位置具体用法见SequenceFile.Reader
	 */
}
class SequenceReaderSample{//运行命令:$ hadoop jar SequenceReaderSample.jar SequenceReaderSample
	public static void main(String[] args)throws Exception{
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);//获取文件系统
		Path path = new Path("cool.txt");//定义输出路径
		SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);//创建实例
		
		IntWritable key = new IntWritable();//创建待读入Key实例
		Text value = new Text();//创建待读入value实例
		while(reader.next(key,value)){
			System.out.println(key+"======"+value);//打印已读取键值对
		}
		IOUtils.closeStream(reader);//关闭读入流
	}
	/**
	 * 相对于SequenceFile.Writer方法
	 * SequenceFile.Reader提供了查找同步位置的方法,通过返回一个布尔值确认是否已经读取到偏移位置
	 * public boolean syncSeen(){return syncSeen}
	 * 
	 * 同步位置(Sync Position)的作用:
	 * 	
	 * 
	 */
	
	/*
	 * reader中源码
	 * public synchronized boolean next(Writable key, Writable val)
			throws IOException {
			boolean more = next(key);                //判断下一个值是否存在
			if (more)                        //存在则进行操作
			getCurrentValue(val);                  //获得对应的值
			return more;
			}
			keyIn = new DataInputStream(keyInFilter);            //创建输入流
			public synchronized boolean next(Writable key) throws IOException { 
			…
			key.readFields(keyIn);                    //用读方法进行数据读取
			…
			}
			*
			*/
	/**
	 * 继续查看reader方法:
	 * …
	 *key.readFields(valBuffer);                    //通过 key 读入数据
	 *valBuffer.mark(0);                      //标记
	 *if (valBuffer.getPosition() != keyLength)              //判断已读取位置
	 *throw  new  IOException(key  +  "  read  "  +  valBuffer.getPosition()   +  "  bytes,  should  read  " + 
	 *keyLength);                        //抛出异常
	 *}
	 *…
	 *解析:首先key的对象读取一个偏移位置,根据其二进制生成具体事例,然后将位置归零,
	 *	如果此时偏移位置与key的长度不相同则抛出错误。注:next进行键值读取的时候,必须有一个给定偏移量,
	 *	而同时如果偏移量大于key的长度则产生异常。而解决办法是通过获取文件的偏移位置后,从偏移的边界
	 *	开始读取,然后将整个值读入key中。
	 *	
	 *	Reader同时还提供了一个sync(long position)方法,将位置定位到当前位置的下一个同步位置开始读取。
	 *	案例代码如下:
	 */
}
class SyncPositionSample{
	public static void main(String[] args)throws Exception{
		Configuration conf = new Configuration();//获取环境变量
		FileSystem fs = FileSystem.get(conf);//获取文件系统
		Path path = new Path("leaf.txt");//获取路径
		SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);//创建reader实例
		IntWritable key = new IntWritable();//设置存放实例
		Text value = new Text();
		reader.sync(0);//读取同步点
		while(reader.next(key,value)){//将数据读入实例中
			System.out.println(key+"-"+value);
		}
	}
}







相关内容