Hadoop I/O



Hadoop自带一套原子操作用于数据I/O。其中一些技术,如数据完整性保持和压缩,对于处理多达数个TB的数据时,特别值得关注。另外一些Hadoop工具或API,所形成的构建模块可用于开发分布式系统,比如序列化操作和on-disk数据结构。

本篇的内容主要有以下几点:

(1)通过检验和保证数据完整性

(2)Hadoop压缩

(3)Hadoop序列化-Writable

(4)Hadoop顺序文件-即文件序列化


数据完整性


(1)在数据的流转过中,HDFS通过“校验和”,来检验数据完整性,如果发现损坏,则新建一个replica,删除损坏的部分,是数据块的复本保持在期望的水平。

(2)datanode节点本身也会在一个后台线程中运行一个DataBlockScanner,从而定期验证本节点的所有数据块。

(3)Hadoop的LocalFileSystem执行客户端的校验和验证,在写入数据时,会新建一个名为.filename.crc的文件,用于校验

(4)如果底层文件系统本身已经有了校验机制,则可以使用一个不需要检验的文件系统RawLocalFileSystem:

Configuration conf = ...

FileSystem fs = new RawLocalFileSystem();

fs.initialize(null,conf);

(5)LocalFileSystem通过CheckSumFileSystem完成校验操作,一般用法如下:

FileSystem rawFs = ...

FileSystem checksummedFs = new ChecksumFileSystem(rawFs);


Hadoop如何使用压缩


压缩格式总结




gzip是比较通用的压缩格式,比较通用。bzip2比gzip更高效,但压缩速度慢一点。bzip2解压比压缩快,但与其他压缩格式比,还是慢一点。LZO优化压缩速度,但效率略低。

DEFLATE是一个标准压缩算法,该算法的标准实现是zlib。没有可用于生产DEFLATE文件的常用命令行工具,因为通常都用gzip格式。gzip格式只是在DEFLATE格式增加了文件头和文件尾。

所有压缩算法都需要权衡时间和空间:一般来说-1为优化速度,-9为优化压缩空间,例如:gzip -1 file,代表最快压缩创建一个file.gz。

是否可切分,代表压缩算法是否支持切分(splitable),即是否可以搜索数据流任务位置并进一步往下读取数据。可切分压缩尤其适合MapReduce。


codec


codec实现了一种压缩-解压算法。在Hadoop中,对接口CompressionCodec的实现代表一个codec,Hadoop实现的codec列表如下:


其中LZO代码库拥有GPL许可,不在Apache的发行版中,可以在http://github.com/kevinweil/hadoop-lzo下载。


(例程1)从标准输入读取数据,然后写入标准输出:


import java.io.FileInputStream;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class StreamCompressor {
	public static void main(String[] args) throws Exception {

		String codeClassname = "org.apache.hadoop.io.compress.GzipCodec";
		Class<?> codecClass = Class.forName(codeClassname);
		Configuration conf = new Configuration();
		CompressionCodec codec = (CompressionCodec) ReflectionUtils
				.newInstance(codecClass, conf);
		CompressionOutputStream out = codec.createOutputStream(System.out);
		//InputStream in = new FileInputStream("/test/input/wc/file01.txt");
		InputStream in = System.in;
		IOUtils.copyBytes(in, out, 4096, false);
		out.finish();	//这里只是完成到这个数据流的写操作,并没有关闭,所以可以接着往下流
	}
}

hadoop集群执行命令:echo "Text" | hadoop jar test.jar StreamCompressor | gunzip ,可以看到正确的输出。


(例程2)根据文件扩展名,利用工厂判断产生codec对文件进行解压缩:


import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

public class FileDecompressor {
	public static void main(String[] args) throws Exception {
		String uri = "/test/input/t/1901.gz";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path inputPath = new Path(uri);
		CompressionCodecFactory factory = new CompressionCodecFactory(conf);
		CompressionCodec codec = factory.getCodec(inputPath);
		if (codec == null) {
			System.err.println("No codec found for " + uri);
			System.exit(1);
		}
		String outputUri = CompressionCodecFactory.removeSuffix(uri,
				codec.getDefaultExtension());
		InputStream in = null;
		OutputStream out = null;
		try {
			in = codec.createInputStream(fs.open(inputPath));
			out = fs.create(new Path(outputUri));
			IOUtils.copyBytes(in, out, conf);
		} finally {
			IOUtils.closeStream(in);
			IOUtils.closeStream(out);
		}
	}
}

hadoop集群执行命令:

hadoop jar test.jar FileDecompressor。

hadoop fs -ls /test/input/t ,可以看到 /test/input/t/file01.gz 文件已经被解压了。

hadoop fs -cat /test/input/t/file01,可以查看文件内容。


(例程3)使用压缩池对标准输入的数据进行压缩,然后写入标准输出:


如果使用的是原生代码库,并且需要在应用中执行大量压缩和解压缩操作,可以考虑使用CodecPool,它允许你反复使用压缩和解压缩,以分摊创建这些对象所涉及的开销。

关于原生类库:

为了性能,最好使用原生(native)类库进行压缩和解压缩,例如,使用原生gzip类库可以减少大约一半的解压缩时间和10%的压缩时间(和内置的Java实现相比)。

并非每种格式都有原生实现,如下表:


默认情况下,Hadoop会根据自身运行的平台搜索原生代码库,如果找到相应代码库就会自动加载。当然,特殊情况下,也可以禁用原生代码库,设置hadoop.native.lib为false(这确保使用内置的Java代码库,如果有的话)。

代码示例:

import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.ReflectionUtils;

public class PooledStreamCompressor {
	public static void main(String[] args) throws Exception {

		String codeClassname = "org.apache.hadoop.io.compress.GzipCodec";
		Class<?> codecClass = Class.forName(codeClassname);
		Configuration conf = new Configuration();
		CompressionCodec codec = (CompressionCodec) ReflectionUtils
				.newInstance(codecClass, conf);
		Compressor compressor = null;
		try {
			compressor = CodecPool.getCompressor(codec);
			CompressionOutputStream out = codec.createOutputStream(System.out,
					compressor);
			// InputStream in = new
			// FileInputStream("/test/input/wc/file01.txt");
			InputStream in = System.in;
			IOUtils.copyBytes(in, out, 4096, false);
			out.finish(); // 这里只是完成到这个数据流的写操作,并没有关闭,所以可以接着往下流
		} finally {
			CodecPool.returnCompressor(compressor);// 返回池子
		}
	}
}


(例程4)对查找最高气温的输出进行压缩:


import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureWithCompression {

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
		conf.setJobName("Max Temperature With Compression");

		// FileInputFormat.addInputPaths(conf, new Path(args[0]));
		// FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		FileInputFormat.setInputPaths(conf, new Path("/test/input/t"));
		FileOutputFormat.setOutputPath(conf, new Path("/test/output/t"));

		// 设置压缩(输出gz压缩文件)
		conf.setBoolean("mapred.output.compress", true);
		conf.setClass("mapred.output.compression.codec", GzipCodec.class,
				CompressionCodec.class);

		conf.setMapperClass(MaxTemperatureWithCompressionMapper.class);
		conf.setCombinerClass(MaxTemperatureWithCompressionReduce.class);
		conf.setReducerClass(MaxTemperatureWithCompressionReduce.class);

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);

		JobClient.runJob(conf);
	}
}

class MaxTemperatureWithCompressionMapper extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING = 9999;

	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		String line = value.toString();
		String year = line.substring(15, 19);
		int airTemperature;
		if (line.charAt(87) == '+') {
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		String quality = line.substring(92, 93);
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			output.collect(new Text(year), new IntWritable(airTemperature));
		}
	}
}

class MaxTemperatureWithCompressionReduce extends MapReduceBase implements
		Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterator<IntWritable> values,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		int maxValue = Integer.MIN_VALUE;
		while (values.hasNext()) {
			maxValue = Math.max(maxValue, values.next().get());
		}
		output.collect(key, new IntWritable(maxValue));
	}
}
查理结果:

hadoop fs -copyToLocal /test/output/t/part-00000.gz

gunzip -c part-00000.gz

应该使用哪种压缩格式


使用哪种压缩格式与具体应用相关。是希望运行速度最快,还是更关注降低存储开销?通常,需要为应用尝试不同的策略,并且为应用构建一套测试标准,从而找到最理想的压缩格式。

对于巨大、没有存储边界的文件,如日志文件,可以考虑如下选项:

(1)存储未经压缩的文件

(2)使用支持切分的存储格式,如bzip2

(3)在应用中切分文件成块,然后压缩。这种情况,需要合理选择数据库的大小,以确保压缩后数据近似HDFS块的大小

(4)使用顺序文件(Sequence File),它支持压缩和切分

(5)使用一个Avro数据文件,改文件支持压缩和切分,就像顺序文件一样,但增加了许多编程语言都可读写的优势

对于大文件来说,不应该使用不支持切分整个文件的压缩格式,否则将失去数据的本地特性,进而造成MapReduce应用效率低下。


序列化(serialization)


序列化,是将结构化对象转换为字节流,以便传输或存储。反序列化,是指字节流转回结构化对象的逆过程。

序列化在分布式数据处理的两大领域经常出现:进程间通信和永久存储。

Hadoop使用自己的序列化格式Writable,它格式紧凑,速度快,但很难用Java以外的语言进行扩展或使用。因为Writable是Hadoop的核心,大多数MapReduce程序的键和值都会使用它。


Writable接口


Writable接口定义了两个方法:一个将其状态写到DataOutput二进制流,另一个从DataInput二进制流读取状态,代码如下:

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

public interface Writable {
	void write(DataOutput output) throws IOException;
	void readFields(DataInput in) throws IOException;	
}

Writable类


Hadoop自带的org.apache.hadoop.io包中有广泛的Writable类可供选择。

Writable类的层次结构如下图:


Java基本类型的Writable类:



实现定制的Writable类型


Hadoop本身已经有如上的Writable实现可以满足大部分要求,但有些时候,我们可能还是需要根据自己的需求构造一个新的实现。由于Writable是MapReduce数据IO的核心,所以调整成二进制表示能对性能产生显著效果。

如下演示存储一对Text对象的Writable类型:

import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;

	public TextPair() {
		set(new Text(), new Text());
	}

	public TextPair(Text first, Text second) {
		set(first, second);
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	// @Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	// @Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}

	@Override
	public boolean equals(Object o) {
		if (o instanceof TextPair) {
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}

	@Override
	public String toString() {
		return first + "\t" + second;
	}

	// @Override
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if (cmp != 0) {
			return cmp;
		}
		return second.compareTo(tp.second);
	}
}

如下演示在TextPair类的基础上,为了速度,实现一个RawComparator(上面的 TextPair是在对象的基础上比较,我们下面在序列化的字节流的基础上进行比较):

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

public class Comparator extends WritableComparator {
	private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

	public Comparator() {
		super(TextPair.class);
	}

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		try {
			int firstL1 = WritableUtils.decodeVIntSize(b1[s1])
					+ readVInt(b1, s1);
			int firstL2 = WritableUtils.decodeVIntSize(b2[s2])
					+ readVInt(b2, s2);
			int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
			if (cmp != 0) {
				return cmp;
			}
			return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2,
					s2 + firstL2, l2 - firstL2);
		} catch (IOException e) {
			throw new IllegalArgumentException(e);
		}
	}

	static {
		WritableComparator.define(TextPair.class, new Comparator());
	}
}


以上可以看出,编写原生的comparator,需要处理字节级别的细节。


为什么不用Java Object Serialization


Doug Cutting这样解释:“为什么开始设计Hadoop的时候我不用Java Serialization?因为它看起来太复杂,而我认为需要有一个非常精简的机制,可以用于精确控制对象的读和写,因为这个机制是Hadoop的核心。使用Java Serialization后,虽然可以获得一些控制权,但用起来非常纠结。不用RMI也处于类似的考虑。高效、高性能的进程间通信是Hadoop的关键。我觉得我们需要精确控制连接、延迟和缓冲的处理方式,然而RMI对此无能为力。”

Doug认为Java序列化不满足序列化的标准:精简、快速、可扩展、互操作。

精简:Writable不把类名写到数据流,它假设客户端知道会收到什么类型,结果是这个格式比Java序列化更加精简,同时支持 随机存取和访问,因为流中的每一条记录均独立于其他记录。

高效:Writable对象可以(并且通常)重用,对于MapRe作业(主要对只有几个类型的大量对象进行序列化和反序列化),不需要为新建对象分配空间而得到的存储节省是非常可观的。


Avro


Apache Avro是一个独立于编程语言的数据序列化框架。该项目是由Doug Cutting创建的,旨在解决Hadoop中Writable类型的不足:缺乏语言的可移植性。

本篇不介绍这个框架,可以参阅官方网址:http://avro.apache.org 。


顺序文件


顺序文件,即流式文件,二进制文件。Hadoop开发了一组对象,来处理顺序文件。


SequenceFile


(1)写入SequenceFile对象

import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;

public class SequenceFileWriteDemo {
	private static final String[] DATA = { "One,two", "Threw,four", "Five,six",
			"Seven,eitht", "Nine,ten" };

	public static void main(String[] args) throws IOException {
		String uri = "/test/numbers.seq";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);
		IntWritable key = new IntWritable();
		Text value = new Text();
		SequenceFile.Writer writer = null;
		try {
			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
					value.getClass());
			for (int i = 0; i < 100; i++) {
				key.set(100 - i);
				value.set(DATA[i % DATA.length]);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
						value);
				writer.append(key, value);
			}

		} finally {
			IOUtils.closeStream(writer);
		}
	}
}


writer.getLength()实际取出的是流式文件的偏移量,是记录的边界。

(2)读取SequenceFile

import java.io.*;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReadDemo {
	public static void main(String[] args) throws IOException {
		String uri = "/test/numbers.seq";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);
		SequenceFile.Reader reader = null;
		try {
			reader = new SequenceFile.Reader(fs, path, conf);
			Writable key = (Writable) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			Writable value = (Writable) ReflectionUtils.newInstance(
					reader.getValueClass(), conf);
			long position = reader.getPosition();
			while (reader.next(key, value)) {
				String syncSeen = reader.syncSeen() ? "*" : "";// 同步点
				System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
						value);
				position = reader.getPosition();// beginning of next record
			}
		} finally {
			IOUtils.closeStream(reader);
		}
	}
}

在顺序文件中可以搜索给定位置,有两种方法:第一种是reader.seek(359),如果359不是记录的边界的话,则reader.next(key,value)时,会报IOException;第二种是reader.sync(360),代表从360之后找第一个同步点。

同步点是指当数据读取的实例出错后,能够再一次与记录边界同步的数据流中的一个位置。同步点是SequenceFile.Writer记录的,在顺序文件写入过程中,每隔一定记录便插入一个特殊项标记同步标注。同步点始终位于记录的边界处。

(3)通过命令行接口显示及排序SequenceFile

hadoop fs -text 可以识别gzip压缩文件及顺序文件,其他格式,则认为是文本文件。

hadoop fs -text /test/numbers.seq


MapFile


(1)写入MapFile

MapFile是已经排序的SequenceFile,并且已经加入用于搜索键的索引。

写入MapFile代码如下:

import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;

public class MapFileWriteDemo {
	private static final String[] DATA = { "One,two", "Threw,four", "Five,six",
			"Seven,eitht", "Nine,ten" };

	public static void main(String[] args) throws IOException {
		String uri = "/test/numbers.map";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		IntWritable key = new IntWritable();
		Text value = new Text();
		MapFile.Writer writer = null;
		try {
			writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
			for (int i = 0; i < 1024; i++) {
				key.set(i+1);
				value.set(DATA[i % DATA.length]);
				writer.append(key, value);
			}

		} finally {
			IOUtils.closeStream(writer);
		}
	}
}

可以通过hadoop命令查看,发现生成了numbers.map文件夹,里面有data和index文件:

hadoop fs -text /test/numbers.map/data | head 

hadoop fs -text /test/numbers.map/index | head 

关于index文件,默认情况下,是每隔128个键才有一个,可以通过MapFile.Writer实例中的setIndexInterval()方法设置io.map.index.interval属性。增加间隔数量可以有效减少用于存储索引的内存,减小间隔数量,可以提高随机访问的时间。

相关内容