Bulk Load-HBase数据导入最佳实践,load-hbase最佳实践


一、概述

HBase本身提供了很多种数据导入的方式,通常有两种常用方式:

1、使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase

2、另一种方式就是使用HBase原生Client API

这两种方式由于需要频繁的与数据所存储的RegionServer通信,一次性入库大量数据时,特别占用资源,所以都不是最有效的。了解过HBase底层原理的应该都知道,HBase在HDFS中是以HFile文件结构存储的,一个比较高效便捷的方法就是使用 “Bulk Loading”方法直接生成HFile,即HBase提供的HFileOutputFormat类。


二、Bulk Load基本原理

Bulk Load处理由两个主要步骤组成

1、准备数据文件

Bulk Load的第一步,会运行一个Mapreduce作业,其中使用到了HFileOutputFormat输出HBase数据文件:StoreFile。HFileOutputFormat的作用在于使得输出的HFile文件可以适应单个region,使用TotalOrderPartitioner类将map输出结果分区到各个不同的key区间中,每个key区间都对应着HBase表的region。

2、导入HBase表

第二步使用completebulkload工具将第一步的结果文件依次交给负责文件对应region的RegionServer,并将文件move到region在HDFS上的存储目录中,一旦完成,将数据开放给clients。

如果在bulk load准备导入或在准备导入与完成导入的临界点上发现region的边界已经改变,completebulkload工具会自动split数据文件到新的边界上,但是这个过程并不是最佳实践,所以用户在使用时需要最小化准备导入与导入集群间的延时,特别是当其他client在同时使用其他工具向同一张表导入数据。


注意:

bulk load的completebulkload步骤,就是简单的将importtsv或HFileOutputFormat的结果文件导入到某张表中,使用类似以下命令

hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable

命令会很快执行完成,将/user/todd/myoutput下的HFile文件导入到mytable表中。注意:如果目标表不存在,工具会自动创建表。


三、生成HFile程序说明:

1、最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
2、最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
3、MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。
4、MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
5、MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。


四、示例

1、创建表

create 'hfiletable','fm1','fm2'

2、准备原始数据

key1	fm1:col1	value1
key1	fm1:col2	value2
key1	fm2:col1	value3
key4	fm1:col1	value4

3、导入HBase MR

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class BulkLoadJob {
    static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);

    public static class BulkLoadMap extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] valueStrSplit = value.toString().split("\t");
            String hkey = valueStrSplit[0];
            String family = valueStrSplit[1].split(":")[0];
            String column = valueStrSplit[1].split(":")[1];
            String hvalue = valueStrSplit[2];
            final byte[] rowKey = Bytes.toBytes(hkey);
            final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
            Put HPut = new Put(rowKey);
            byte[] cell = Bytes.toBytes(hvalue);
            HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell);
            context.write(HKey, HPut);

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String inputPath = args[0];
        String outputPath = args[1];
        HTable hTable = null;
        try {
            Job job = Job.getInstance(conf, "ExampleRead");
            job.setJarByClass(BulkLoadJob.class);
            job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            // speculation
            job.setSpeculativeExecution(false);
            job.setReduceSpeculativeExecution(false);
            // in/out format
			job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);

            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, new Path(outputPath));

            hTable = new HTable(conf, args[2]);
            HFileOutputFormat2.configureIncrementalLoad(job, hTable);

            if (job.waitForCompletion(true)) {
                FsShell shell = new FsShell(conf);
                try {
                    shell.run(new String[]{"-chmod", "-R", "777", args[1]});
                } catch (Exception e) {
                    logger.error("Couldnt change the file permissions ", e);
                    throw new IOException(e);
                }
                //加载到hbase表
                LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                loader.doBulkLoad(new Path(outputPath), hTable);
            } else {
                logger.error("loading failed.");
                System.exit(1);
            }

        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } finally {
            if (hTable != null) {
                hTable.close();
            }
        }
    }
}

4、查看数据

hbase(main):003:0> scan 'hfiletable'
ROW                                                   COLUMN+CELL                                                                                                                                                  
 key2                                                 column=fm1:col1, timestamp=1437794332921, value=value1                                                                                                       
 key2                                                 column=fm1:col2, timestamp=1437794332921, value=value2                                                                                                       
 key2                                                 column=fm2:col1, timestamp=1437794332921, value=value3                                                                                                       
 key3                                                 column=fm1:col1, timestamp=1437794332921, value=value4                                                                                                       
2 row(s) in 0.1910 seconds

五、总结

虽然importtsv工具使用与大多数场景,用户有时希望自己编程生成数据,或以其他格式导入数据,比如importtsv需要在导入前确定每条数据column维度,一旦我们的数据的维度是根据数据内容本身的,importtsv就无法满足需求,这时就需要对工具改造,可以查看ImportTsv.java和HFileOutputFormat的javaDoc。

completebulkload同样可以编程化实现,可以查看LoadIncrementalHFiles类。







版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容