HBase扫描器与过滤器,HBase扫描器过滤器


扫描器

HBase在扫描数据的时候,使用scanner表扫描器。
HTable通过一个Scan实例,调用getScanner(scan)来获取扫描器。可以配置扫描起止位以及其他的过滤条件。
通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂。

但是这里有一点可能被忽略的地方,就是返回的scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegionServer。这在网络不是很好的情况下,对性能的影响是很大的,建议配置扫描器缓存

扫描器缓存

hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以对其进行配置:
1、在HBase的conf配置文件中进行配置。
2、通过调用HTable.setScannerCaching(int scannerCaching)进行配置。
3、通过调用Scan.setCaching(int caching)进行配置。
三者的优先级越来越高。

扫描器Demo

package Scanner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class Scanner {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf; 
    private HConnection hConn = null;

    private Scanner(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);

        hConn = HConnectionManager.createConnection(conf);  
    }

    public void scanTable(String tablename){
        Scan scan = new Scan();
        //设置扫描缓存
        scan.setCaching(1000);
        try {
            HTableInterface table = hConn.getTable(tablename);

            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                format(result);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void format(Result result){
        //行键
        String rowkey = Bytes.toString(result.getRow());

        //Return an cells of a Result as an array of KeyValues
        KeyValue[] kvs = result.raw();

        for (KeyValue kv : kvs) {
            //列族名
            String family = Bytes.toString(kv.getFamily());
            //列名
            String qualifier = Bytes.toString(kv.getQualifier());

            String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));

            System.out.println("rowkey->"+rowkey+", family->"
            +family+", qualifier->"+qualifier);
            System.out.println("value->"+value);

        }
    }

    //命令行 scan 'students'
    public static void main(String[] args) throws IOException {
        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";
        //初始化
        Scanner conn = new Scanner(rootDir,zkServer,port);

        conn.scanTable("students");
    }
}

过滤器

1、使用过滤器可以提高操作表的效率, HBase中两种数据读取函数get()和scan()都支持过滤器,支持直接访问和通过指定起止行键来访问,但是缺少细粒度的筛选功能(如基于正则表达式对行键或值进行筛选的功能)。
2、可以使用预定义好的过滤器或者是实现自定义过滤器。
3、 过滤器在客户端创建,通过RPC传送到服务器端,在服务器端执行过滤操作,把数据返回给客户端。

过滤器分类

1、Comparision Filters(比较过滤器)

RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter

2、Dedicated Filters(专用过滤器)

SingleColumnValueFilter
SingleColumnValueExcludeFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
TimestampsFilter
RandomRowFilter

3、 Decorating Filters(附加过滤器)

SkipFilter
WhileMatchFilters

过滤器Demo

package Filter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;


public class FilterDemo {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf; 
    private HConnection hConn = null;

    private FilterDemo(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);

        hConn = HConnectionManager.createConnection(conf);  
    }


    //比较过滤器
    public void filterTable(String tablename){
        Scan scan = new Scan();
        scan.setCaching(1000);

        RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new BinaryComparator(Bytes.toBytes("Tom")));
        scan.setFilter(filter);
        try {
            HTableInterface table = hConn.getTable(tablename);

            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                format(result);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void filterTableRegex(String tablename){
        Scan scan = new Scan();
        scan.setCaching(1000);

        RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new RegexStringComparator("T\\w+"));
        scan.setFilter(filter);
        try {
            HTableInterface table = hConn.getTable(tablename);

            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                format(result);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //专用过滤器
    public void filterTablePage(String tablename){
        PageFilter pageFilter = new PageFilter(4);//每一行3行记录,预留一行。
        byte[] lastRow = null;//记录最后一次读到的rowkey作为下一次查询的rowkey
        int pageCount = 0;//表示第几页
        try {
            HTableInterface table = hConn.getTable(tablename);

            while(++pageCount>0){
                System.out.println("pageCount = " + pageCount);
                Scan scan = new Scan();
                scan.setFilter(pageFilter);
                if (lastRow != null) {
                    scan.setStartRow(lastRow);
                }

                ResultScanner resultScanner = table.getScanner(scan);
                int count=0;
                for (Result result : resultScanner) {

                    lastRow = result.getRow();
                    if (++count>3) {
                        break;
                    }   
                    format(result);

                }
                if(count<3){//当某次读取的数据小于3表示要结束了,终止循环
                    break;
                }

            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }



    public void format(Result result){
        //行键
        String rowkey = Bytes.toString(result.getRow());

        //Return an cells of a Result as an array of KeyValues
        KeyValue[] kvs = result.raw();

        for (KeyValue kv : kvs) {
            //列族名
            String family = Bytes.toString(kv.getFamily());
            //列名
            String qualifier = Bytes.toString(kv.getQualifier());

            String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));

            System.out.println("rowkey->"+rowkey+", family->"
            +family+", qualifier->"+qualifier);
            System.out.println("value->"+value);

        }
    }

    public static void main(String[] args) throws IOException {
        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";
        //初始化
        FilterDemo filterDemo = new FilterDemo(rootDir, zkServer, port);
        //filterDemo.filterTable("students");

        //filterDemo.filterTableRegex("students");
        filterDemo.filterTablePage("students");
    }

}

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容