HBase 5种写入数据方式


Version :hadoop1.2.1; hbaes0.94.16;

HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:

1. 直接使用HTable进行导入,代码如下:

package hbase.curd;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class PutExample {

	/**
	 * @param args
	 * @throws IOException 
	 */
	private HTable  table = HTableUtil.getHTable("testtable");
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		PutExample pe = new PutExample();
		pe.putRows();
		
	}
	
	public void putRows(){
		List<Put> puts = new ArrayList<Put>();
		for(int i=0;i<10;i++){
			Put put = new Put(Bytes.toBytes("row_"+i));
			Random random = new Random();
			
			if(random.nextBoolean()){
				put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i));
			}
			if(random.nextBoolean()){
				put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i));
			}
			if(random.nextBoolean()){
				put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i));
			}
			if(random.nextBoolean()){
				put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i));	
			}
			if(random.nextBoolean()){
				put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i));
			}
			puts.add(put);
		}
		try{
			table.put(puts);
			table.close();
		}catch(Exception e){
			e.printStackTrace();
			return ;
		}
		System.out.println("done put rows");
	}

}

其中HTableUtil如下:

package hbase.curd;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;

public class HTableUtil {
	private static HTable table;
	private static Configuration conf;
	
	static{
		conf =HBaseConfiguration.create();
		conf.set("mapred.job.tracker", "hbase:9001");
		conf.set("fs.default.name", "hbase:9000");
		conf.set("hbase.zookeeper.quorum", "hbase");
		try {
			table = new HTable(conf,"testtable");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public static Configuration getConf(){
		return conf;
	}
	public static HTable getHTable(String tablename){
		if(table==null){
			try {
				table= new HTable(conf,tablename);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} 
		}
		return table;
	}
	
	public static  byte[] gB(String name){
		return Bytes.toBytes(name);
	}
}

这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。

2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:

package hbase.mr;

import java.io.IOException;

import hbase.curd.HTableUtil;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ImportFromFile {

	/**
	 * 从文件导入到HBase
	 * @param args
	 */
	public static final String NAME="ImportFromFile";
	public enum Counters{LINES}
	
	static class ImportMapper extends Mapper<LongWritable,Text,
		ImmutableBytesWritable,Writable>{
		private byte[] family =null;
		private byte[] qualifier = null;
		@Override
		protected void setup(Context cxt){
			String column = cxt.getConfiguration().get("conf.column");
			byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
			family = colkey[0];
			if(colkey.length>1){
				qualifier = colkey[1];
			}
		}
		@Override
		public void map(LongWritable offset,Text line,Context cxt){
			try{
				String lineString= line.toString();
				byte[] rowkey= DigestUtils.md5(lineString);
				Put put = new Put(rowkey);
				put.add(family,qualifier,Bytes.toBytes(lineString));
				cxt.write(new ImmutableBytesWritable(rowkey), put);
				cxt.getCounter(Counters.LINES).increment(1);
			}catch(Exception e){
				e.printStackTrace();
			}
		}
	}
	private static CommandLine parseArgs(String[] args){
		Options options = new Options();
		Option o = new Option("t" ,"table",true,"table to import into (must exist)");
		o.setArgName("table-name");
		o.setRequired(true);
		options.addOption(o);
		
		o= new Option("c","column",true,"column to store row data into");
		o.setArgName("family:qualifier");
		o.setRequired(true);
		options.addOption(o);
		
		o = new Option("i", "input", true,
		"the directory or file to read from");
		o.setArgName("path-in-HDFS");
		o.setRequired(true);
		options.addOption(o);
		options.addOption("d", "debug", false, "switch on DEBUG log level");
		CommandLineParser parser = new PosixParser();
		CommandLine cmd = null;
		try {
			cmd = parser.parse(options, args);
		} catch (Exception e) {
			System.err.println("ERROR: " + e.getMessage() + "\n");
			HelpFormatter formatter = new HelpFormatter();
			formatter.printHelp(NAME + " ", options, true);
			System.exit(-1);
		}
		return cmd;
	}
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		
		Configuration conf = HTableUtil.getConf();
		String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs(); 
		CommandLine cmd = parseArgs(otherArgs);
		String table = cmd.getOptionValue("t");
		String input = cmd.getOptionValue("i");
		String column = cmd.getOptionValue("c");
		conf.set("conf.column", column);
		Job job = new Job(conf, "Import from file " + input + " into table " + table);
		job.setJarByClass(ImportFromFile.class);
		job.setMapperClass(ImportMapper.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Writable.class);
		job.setNumReduceTasks(0); 
		FileInputFormat.addInputPath(job, new Path(input));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
	private static String[] initialArg(){
		String []args = new String[6];
		args[0]="-c";
		args[1]="fam:data";
		args[2]="-i";
		args[3]="/user/hadoop/input/picdata";
		args[4]="-t";
		args[5]="testtable";
		return args;
	}
}

2.2 读取HBase表写入HBase表中字段,代码如下:

package hbase.mr;

import hadoop.util.HadoopUtils;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParseDriver {

	/**
	 * 把hbase表中数据拷贝到其他表(或本表)相同字段
	 * @param args
	 */
	enum Counters{
		VALID, ROWS, COLS, ERROR
	}
	private static Logger log = LoggerFactory.getLogger(ParseDriver.class);
	static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
		private byte[] columnFamily =null ;
		private byte[] columnQualifier =null;
		@Override
		protected void setup(Context cxt){
			columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
			columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
		}
		@Override 
		public void map(ImmutableBytesWritable row,Result columns,Context cxt){
			cxt.getCounter(Counters.ROWS).increment(1);
			String value =null;
			try{
				Put put = new Put(row.get());
				for(KeyValue kv : columns.list()){
					cxt.getCounter(Counters.COLS).increment(1);
					value= Bytes.toStringBinary(kv.getValue());
					if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
						put.add(columnFamily,columnQualifier,kv.getValue());
						cxt.write(row, put);
						cxt.getCounter(Counters.VALID).increment(1);
					}
				}
			}catch(Exception e){
				log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
						",Value:"+value);
				cxt.getCounter(Counters.ERROR).increment(1);
			}
		}
		private boolean equals(byte[] a,byte[] b){
			String aStr= Bytes.toString(a);
			String bStr= Bytes.toString(b);
			if(aStr.equals(bStr)){
				return true;
			}
			return false;
		}
	}
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		byte[] columnFamily = Bytes.toBytes("fam");
		byte[] columnQualifier = Bytes.toBytes("data");
		Scan scan = new Scan ();
		scan.addColumn(columnFamily, columnQualifier);
		HadoopUtils.initialConf("hbase");
		Configuration conf = HadoopUtils.getConf();
		conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
		conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
		
		String input ="testtable" ;//
		String output="testtable1"; // 
				
		Job job = new Job(conf,"Parse data in "+input+",write to"+output);
		job.setJarByClass(ParseDriver.class);
		TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, 
				ImmutableBytesWritable.class, Put.class,job);
		TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
		
		System.exit(job.waitForCompletion(true)?0:1);
		
	}

}

其中HadoopUtils代码如下:

package hadoop.util;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;

public class HadoopUtils {

	private static Configuration conf;
	public  static void initialConf(){
		conf = new Configuration();
		conf.set("mapred.job.tracker", "hbase:9001");
		conf.set("fs.default.name", "hbase:9000");
		conf.set("hbase.zookeeper.quorum", "hbase");
	}
	public  static void initialConf(String host){
		conf = new Configuration();
		conf.set("mapred.job.tracker", host+":9001");
		conf.set("fs.default.name", host+":9000");
		conf.set("hbase.zookeeper.quorum", host);
	}
	public static Configuration getConf(){
		if(conf==null){
			initialConf();
		}
		return conf;
	}
	
	public static List<String> readFromHDFS(String fileName) throws IOException {
		Configuration conf = getConf();
		FileSystem fs = FileSystem.get(URI.create(fileName), conf);
		FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
		// 按行读取(新版本的方法)
		LineReader inLine = new LineReader(hdfsInStream, conf);
		Text txtLine = new Text();
		
		int iResult = inLine.readLine(txtLine); //读取第一行
		List<String> list = new ArrayList<String>();
		while (iResult > 0 ) {
			list.add(txtLine.toString());
			iResult = inLine.readLine(txtLine);
		}
		
		hdfsInStream.close();
		fs.close();
		return list;
	}
}

2.3 MR和HTable结合,代码如下:

package hbase.mr;

import hadoop.util.HadoopUtils;
import hbase.mr.AnalyzeDriver.Counters;

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParseSinglePutDriver {

	/**
	 * 使用HTable进行写入
	 * 把infoTable 表中的 qualifier字段复制到qualifier1字段
	 * 单个Put
	 * @param args
	 */
	private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
	static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
		private HTable infoTable =null ;
		private byte[] columnFamily =null ;
		private byte[] columnQualifier =null;
		private byte[] columnQualifier1 =null;
		@Override 
		protected void setup(Context cxt){
			log.info("ParseSinglePutDriver setup,current time: "+new Date());
			try {
				infoTable = new HTable(cxt.getConfiguration(),
						cxt.getConfiguration().get("conf.infotable"));
				infoTable.setAutoFlush(false);
			} catch (IOException e) {
				log.error("Initial infoTable error:\n"+e.getMessage());
			}
			columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
			columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
			columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
		}
		@Override 
		protected void cleanup(Context cxt){
			try {
				infoTable.flushCommits();
				log.info("ParseSinglePutDriver cleanup ,current time :"+new Date());
			} catch (IOException e) {
				log.error("infoTable flush commits error:\n"+e.getMessage());
			}
		}
		@Override 
		public void map(ImmutableBytesWritable row,Result columns,Context cxt){
			cxt.getCounter(Counters.ROWS).increment(1);
			String value =null ;
			try{
				Put put = new Put(row.get());
				for(KeyValue kv : columns.list()){
					cxt.getCounter(Counters.COLS).increment(1);
					value= Bytes.toStringBinary(kv.getValue());
					if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
						put.add(columnFamily,columnQualifier1,kv.getValue());
						infoTable.put(put);
					}
				}
			}catch(Exception e){
				log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
						",Value:"+value);
				cxt.getCounter(Counters.ERROR).increment(1);
			}
		}
		private boolean equals(byte[] a,byte[] b){
			String aStr= Bytes.toString(a);
			String bStr= Bytes.toString(b);
			if(aStr.equals(bStr)){
				return true;
			}
			return false;
		}
	}
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		String input ="testtable";
		byte[] columnFamily = Bytes.toBytes("fam");
		byte[] columnQualifier = Bytes.toBytes("data");
		byte[] columnQualifier1 = Bytes.toBytes("data1");
		Scan scan = new Scan ();
		scan.addColumn(columnFamily, columnQualifier);
		HadoopUtils.initialConf("hbase");
		Configuration conf = HadoopUtils.getConf();
		conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
		conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
		conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
		conf.set("conf.infotable", input);
		
		Job job = new Job(conf,"Parse data in "+input+",into tables");
		job.setJarByClass(ParseSinglePutDriver.class);
		TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, 
				ImmutableBytesWritable.class, Put.class,job);	
		job.setOutputFormatClass(NullOutputFormat.class);
		job.setNumReduceTasks(0);
		System.exit(job.waitForCompletion(true)?0:1);
	}

}

2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。

package hbase.mr;

import hadoop.util.HadoopUtils;
import hbase.mr.AnalyzeDriver.Counters;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParseListPutDriver {

	/**
	 * 使用HTable进行写入
	 * List <Put> 进行测试,查看效率
	 * 把infoTable 表中的 qualifier字段复制到qualifier1字段
	 * @param args
	 */
	private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
	static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
		private HTable infoTable =null ;
		private byte[] columnFamily =null ;
		private byte[] columnQualifier =null;
		private byte[] columnQualifier1 =null;
		private List<Put> list = new ArrayList<Put>();
		@Override 
		protected void setup(Context cxt){
			log.info("ParseListPutDriver setup,current time: "+new Date());
			try {
				infoTable = new HTable(cxt.getConfiguration(),
						cxt.getConfiguration().get("conf.infotable"));
				infoTable.setAutoFlush(false);
			} catch (IOException e) {
				log.error("Initial infoTable error:\n"+e.getMessage());
			}
			columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
			columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
			columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
		}
		@Override 
		protected void cleanup(Context cxt){
			try {
				infoTable.put(list);
				infoTable.flushCommits();
				log.info("ParseListPutDriver cleanup ,current time :"+new Date());
			} catch (IOException e) {
				log.error("infoTable flush commits error:\n"+e.getMessage());
			}
		}
		@Override 
		public void map(ImmutableBytesWritable row,Result columns,Context cxt){
			cxt.getCounter(Counters.ROWS).increment(1);
			String value =null ;
			try{
				Put put = new Put(row.get());
				for(KeyValue kv : columns.list()){
					cxt.getCounter(Counters.COLS).increment(1);
					value= Bytes.toStringBinary(kv.getValue());
					if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
						put.add(columnFamily,columnQualifier1,kv.getValue());
						list.add(put);
					}
				}
			}catch(Exception e){
				log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
						",Value:"+value);
				cxt.getCounter(Counters.ERROR).increment(1);
			}
		}
		private boolean equals(byte[] a,byte[] b){
			String aStr= Bytes.toString(a);
			String bStr= Bytes.toString(b);
			if(aStr.equals(bStr)){
				return true;
			}
			return false;
		}
	}
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		String input ="testtable";
		byte[] columnFamily = Bytes.toBytes("fam");
		byte[] columnQualifier = Bytes.toBytes("data");
		byte[] columnQualifier1 = Bytes.toBytes("data2");
		Scan scan = new Scan ();
		scan.addColumn(columnFamily, columnQualifier);
		HadoopUtils.initialConf("hbase");
		Configuration conf = HadoopUtils.getConf();
		conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
		conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
		conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
		conf.set("conf.infotable", input);
		
		Job job = new Job(conf,"Parse data in "+input+",into tables");
		job.setJarByClass(ParseListPutDriver.class);
		TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, 
				ImmutableBytesWritable.class, Put.class,job);	
		job.setOutputFormatClass(NullOutputFormat.class);
		job.setNumReduceTasks(0);
		System.exit(job.waitForCompletion(true)?0:1);
	}

}

数据记录条数为:26632,可以看到下面图片中的时间记录对比:



由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下(hbase1.0.jar为编译打包的MR程序):



分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990


相关内容

    暂无相关文章