HBase学习笔记,hbase权威指南


一、HBase简介
HBase(Hadoop Database)是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。
HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具

二、HBASE基础知识

表(table),是存储管理数据的。表由行和列组成。

数据模型

行键(row key),类似于MySQL中的主键。
行键是HBase表天然自带的。

列族(column family),列的集合。
HBase中列族是需要在定义表时指定的,列是在插入记录时动态增加的。
HBase表中的数据,每个列族单独一个文件。
一个Column Family中可以有任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。

时间戳(timestamp),列(也称作标签、修饰符)的一个属性。
行键和列确定的单元格,可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。
如果不指定时间戳或者版本,默认取最新的数据。

逻辑数据模型

数据模型-
字符串、整数、二进制串甚至串行化的结构都可以作为行键,
表按照行键的“逐字节排序”顺序对行进行有序化处理,
表内数据非常‘稀疏’,不同的行的列的数目完全可以大不相同,
可以只对一行上“锁”,
对行的写操作是始终是“原子”的。

数据模型-
“族:标签”
其中,族和标签都可为任意形式的串,
物理上将同“族”数据存储在一起,
数据可通过时间戳区分版本。


物理数据模型

将逻辑模型中的一个Row分割为根据Column family存储的物理模型。

HBase是适合海量数据(如20PB)的秒级简单查询的数据库。

HBase表中的记录,按照行键进行拆分, 拆分成一个个的region。
许多个region存储在region server(单独的物理机器)中的。
这样,对表的操作转化为对多台region server的并行查询。

Table在行的方向上分割为多个HRegion,一个region由(startkey,endkey)表示,每个HRegion分散在不同的RegionServer中。


HBase中的每一张表,就是所谓的BigTable,稀疏表。
RowKey 和 ColumnKey 是二进制值byte[],按字典顺序排序;
Timestamp 是一个 64 位整数;
value 是一个未解释的字节数组byte[]。

表中的不同行可以拥有不同数量的成员。即支持“动态模式“模型。


存储的数据都是字节数组。
表中的数据是按照行键的顺序物理存储的。


三、HBase的体系结构

HBase是主从式结构,HMaster、HRegionServer。

HMaster(决策性的事务)
Master 可以启动多个HMaster,通过Zookeeper的Master Election机制保证总有一个HMaster运行
为Region server 分配region。
负责region server 的负载均衡。
发现失效的region server ,并重新分配其上的region。

Region Server
维护**Master 分配给它的**region处理对这些region 的IO 请求
负责切分在运行过程中变得过大的region

Client
client包含访问hbase 的接口,client 维护着一些cache 来加快对hbase 的访问,比如region的位置信息等。

Zookeeper
保证任何时候,集群中只有一个running master
Zookeeper存贮所有Region 的寻址入口(Region是存储表中数据的)。
实时监控Region Server 的状态,将Region server 的上线和下线信息,实时通知给Master。

为什么不存储在HMaster中?
如果这个机器宕机了,他的所有信息就会丢失,如果存储在Zookeeper中,他就会在很多机器上有备份。

存储Hbase 的schema(图表),包括有哪些table,每个table 有哪些column family。

可以看出,client 访问hbase 上数据的过程并不需要master 参与,寻址访问zookeeper 和region server,数据读写访问regionserver。HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最核心的模块。

四、Region定位流程

HBase中有两张特殊的Table:-ROOT-和.META.
.META.:记录了用户表的Region信息,.META.可以有多个regoin。
-ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region。
Zookeeper中记录了-ROOT-表的location。

Client访问用户数据之前需要首先访问zookeeper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问。

ZooKeeper–> -ROOT-(单Region)–> .META.–> 用户表

五、HBase伪分布安装
1 、解压缩、重命名、设置环境变量

tar -zxvf hbase-0.94.7-security.tar.gz
mv hbase-0.94.7-security hbase
vi /etc/profile

export HBASE_HOME=/usr/local/hbase
export PATH=.:$HADOOP_HOME/bin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin:$JAVA_HOME/bin:$PATH

source /etc/profile

2、 修改$HBASE_HOME/conf/hbase-env.sh,修改内容如下:

export JAVA_HOME=/usr/local/jdk/
export HBASE_MANAGES_ZK=true

3、 修改$HBASE_HOME/conf/hbase-site.xml,修改内容如下:

这里写代码片
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://liguodong:9000/hbase</value>
</property>
<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
</property>
<property>
    <name>hbase.zookeeper.quorum</name>
    <value>liguodong</value>
</property>
<property>
    <name>dfs.replication</name>
    <value>1</value>
</property>

4.3 (可选)文件regionservers的内容为liguodong

[root@liguodong conf]# vi regionservers
[root@liguodong conf]# more regionservers
liguodong(主机名)

启动hbase,执行命令start-hbase.sh
注意:启动hbase之前,确保hadoop是运行正常的,并且可以写入文件。

验证:
(1)执行jps,发现新增加了3个java进程,分别是HMaster、HRegionServer、HQuorumPeer
(2)使用浏览器访问http://liguodong:60010/

五、HBASE Shell 操作

进入HBase命令行,输入:hbase shell

DDL操作
创建表

create ‘users’,’user_id’,’address’,’info’

表users,有三个列族user_id,address,info。

列出全部表

list

得到表的描述

describe ‘users’

创建表

create ‘users_tmp’,’user_id’,’address’,’info’

删除表

disable ‘users_tmp’
drop ‘users_tmp’

DML操作

添加记录

put 'users','xiaoming','info:age','24'
put 'users','xiaoming','info:birthday','1987-06-17'
put 'users','xiaoming','info:company','alibaba'
put 'users','xiaoming','address:contry','china'
put 'users','xiaoming','address:province','zhejiang'
put 'users','xiaoming','address:city','hangzhou'
put 'users','zhangyifei','info:birthday','1987-4-17'
put 'users','zhangyifei','info:favorite','movie'
put 'users','zhangyifei','info:company','alibaba'
put 'users','zhangyifei','address:contry','china'
put 'users','zhangyifei','address:province','guangdong'
put 'users','zhangyifei','address:city','jieyang'
put 'users','zhangyifei','address:town','xianqiao'

获取一条记录
1.取得一个id的所有数据

get 'users','xiaoming'

2.获取一个id,一个列族的所有数据

get 'users','xiaoming','info'

3.获取一个id,一个列族中一个列的所有数据

get 'users','xiaoming','info:age'

更新记录

put 'users','xiaoming','info:age' ,'29'
get 'users','xiaoming','info:age'

put 'users','xiaoming','info:age' ,'30'
get 'users','xiaoming','info:age'

获取单元格数据的版本数据

get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>1}
get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>2}
get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>3}

获取单元格数据的某个版本数据

get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1364874937056}

全表扫描

scan 'users'

删除xiaoming值的’info:age’字段

delete 'users','xiaoming','info:age'
get 'users','xiaoming'

删除整行

deleteall 'users','xiaoming'

统计表的行数

count 'users'

清空表

truncate 'users'

六、HBASE的Java API 操作

基本操作

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;

public class HBaseApp {

    //创建表,插入记录,查询一条记录,遍历所有记录,删除表


    //表名
    public static final String TABLE_NAME="table1";
    //列族名称
    public static final String FAMILY_NAME="family1";
    //行键
    public static final String ROW_KEY="rowkey1";

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();

        conf.set("hbase.rootdir", "hdfs://liguodong:9000/hbase");

        //使用eclipse时必须添加这个,否则无法定位
        conf.set("hbase.zookeeper.quorum", "liguodong");

        final HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);

        //1、创建表,删除表使用HBaseAdmin
        //HBase shell : describe 'table1        
        createTable(hBaseAdmin);

        //2、删除表
        //HBase shell : list
        //deleteTable(hBaseAdmin);


        //3、插入记录,查询一条记录,遍历所有记录HTable   
        //HBase shell : scan 'table1'
        final HTable hTable = new HTable(conf,TABLE_NAME);
        //插入
        //putRecord(hTable);

        //方法抽取 shift+alt+m
        //4、查询某条记录
        //getRecord(hTable);

        //5、全表扫描
        scanTable(hTable);

        hTable.close();

    }


    public static void scanTable(final HTable hTable) throws IOException {
        Scan scan = new Scan();
        final ResultScanner scanner = hTable.getScanner(scan);
        for (Result result : scanner) {
            final byte[]  value = result.getValue(FAMILY_NAME.getBytes(), "age".getBytes());
            System.out.println(result);
            System.out.println(new String(value));
        }
    }


    public static void getRecord(final HTable hTable) throws IOException {
        Get get = new Get(ROW_KEY.getBytes());
        //hTable.get(get);
        final Result result = hTable.get(get);
        final byte[]  value = result.getValue(FAMILY_NAME.getBytes(), "age".getBytes());
        System.out.println(result);
        System.out.println(new String(value));
    }


    public static void putRecord(final HTable hTable) 
            throws IOException{

        Put  put= new Put(ROW_KEY.getBytes());
        put.add(FAMILY_NAME.getBytes(), "age".getBytes(), "25".getBytes());
        hTable.put(put);


    }

    public static void deleteTable(final HBaseAdmin hBaseAdmin) throws IOException{
        hBaseAdmin.disableTable(TABLE_NAME);
        hBaseAdmin.deleteTable(TABLE_NAME);
    }



    public static void createTable(final HBaseAdmin hBaseAdmin) throws IOException{
        if(!hBaseAdmin.tableExists(TABLE_NAME))
        {
            HTableDescriptor tableDescriptor = new HTableDescriptor(TABLE_NAME);
            //添加列族
            HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
            tableDescriptor.addFamily(family);          
            hBaseAdmin.createTable(tableDescriptor);
        }
    }

}

将HDFS数据批量导入到HBase中,便于查询

上传数据到HDFS
hadoop fs -put HTTP_20130313143750.dat /input

HBASE表定义为:create 'wlan_log', 'cf'

RowKey设计为:
msisdn:日期时间串(yyyyMMddHHmmss)

package hbase;

import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

/**
 * 将HDFS数据批量导入到HBase中
 * @author liguodong
 */
public class BatchImport {

    public static void main(String[] args) throws Exception {
        final Configuration configuration = new Configuration();
        //设置zookeeper
        configuration.set("hbase.zookeeper.quorum", "liguodong");
        //设置hbase表名称
        configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
        //将该值改大,防止hbase超时退出
        configuration.set("dfs.socket.timeout", "180000");

        final Job job = new Job(configuration, "HBaseBatchImport");

        //输入路径
        FileInputFormat.setInputPaths(job, "hdfs://liguodong:9000/input");

        //指定对输入数据进行格式化处理的类
        job.setInputFormatClass(TextInputFormat.class);

        //指定自定义的Mapper类
        job.setMapperClass(BatchImportMapper.class);

        //设置map的输出,不设置reduce的输出类型
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        //指定自定义的Reducer类
        job.setReducerClass(BatchImportReducer.class);

        //不再设置输出路径,而是设置输出格式类型
        job.setOutputFormatClass(TableOutputFormat.class);

        job.waitForCompletion(true);
    }


    static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

        SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");
        Text v2 = new Text();

        protected void map(LongWritable key, Text value, Context context) 
        throws java.io.IOException ,InterruptedException {

            //对value(v1)进行拆分
            final String[] splited = value.toString().split("\t");

            try {
                //时间戳转化成指定格式的年月日时分秒
                final Date date = new Date(Long.parseLong(splited[0].trim()));
                //转化成指定格式
                final String dateFormat = dateformat1.format(date);

                //行键
                String rowKey = splited[1]+":"+dateFormat;
                v2.set(rowKey+"\t"+value.toString());

                context.write(key, v2);

            } catch (NumberFormatException e) {
                final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
                counter.increment(1L);
                System.out.println("出错了"+splited[0]+" "+e.getMessage());
            }
        };
    }

    /**
     * TableReducer<LongWritable, Text, NullWritable>
     * 分别对应:k2 v2 k3 
     * @author liguodong
     */
    static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
        protected void reduce(LongWritable key, java.lang.Iterable<Text> values,Context context) 
        throws java.io.IOException ,InterruptedException {

            for (Text text : values) {

                final String[] splited = text.toString().split("\t");

                //传入行键
                final Put put = new Put(Bytes.toBytes(splited[0]));

                //列族    列   具体值
                put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
                put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
                //省略其他字段,调用put.add(....)即可
                context.write(NullWritable.get(), put);
            }
        };
    }
}

七、HBase集群搭建
hbase的集群搭建过程是在原来的hbase伪分布基础上进行搭建

1、 集群结构:
主节点(hmaster)是liguodong,
从节点(region server)是hadoop1和hadoop2

2、 修改liguodong上的hbase的几个文件

(1)HBASE_MANAGES_ZK:告诉HBASE是否使用自己Zookeeper的一个实例。这里不用自己的,用外部的。
修改hbase-env.sh的最后一行export HBASE_MANAGES_ZK=false

(2)新增了hadoop1,hadoop2节点
修改hbase-site.xml文件的hbase.zookeeper.quorum的值为liguodong,hadoop1,hadoop2

(3)修改regionservers文件(存放的region server的hostname),
内容修改为hadoop1、hadoop2(以前是liguodong)

3、复制liguodong节点中的hbase文件夹到hadoop1、hadoop2中
scp -r hbase hadoop1:/usr/local/
scp -r hbase hadoop2:/usr/local/

复制liguodong中的/etc/profile到hadoop1、hadoop2中,在hadoop1、hadoop2上执行source /etc/profile

scp /etc/profile hadoop1:/etc/
scp /etc/profile hadoop2:/etc/

source /etc/profile

4、启动运行
(1)、首先启动hadoop 执行:start-all.sh 并且保证hadoop是可读写的。(主节点)
(2)、然后启动zookeeper集群,执行:zkServer.sh start(每个节点分别启动)

[root@liguodong ~]# zkServer.sh start

JMX enabled by default
Using config: /usr/local/zk/bin/../conf/zoo.cfg
Starting zookeeper … STARTED

[root@liguodong ~]# jps

2823 QuorumPeerMain
2536 NameNode
2689 JobTracker
2841 Jps

3、最后在liguodong节点上启动hbase集群,执行:start-hbase.sh。(主节点)

关闭时

先关闭hbase: stop-hbase.sh
再关闭zookeeper: zkServer.sh stop
最后关闭hadoop: stop-all.sh

相关内容