Hbase + Mapreduce + eclipse实例,hbasemapreduce


前面blog中提到了 eclipse操作单机版的Hbase列子  不熟悉的朋友可以去看看  

eclipse 连接并操作单机版Hbase


本篇文章介绍一个 Mapreduce   读取   Hbase  中数据    并进行计算 列子    类似与    wordcount   不过  此时的输入  是从 Hbase中读取


首先  需要创建输入源   

启动hbase,打开Hbase shell   这里  我的配置文件中  不再是单机了  而是采用了hdfs作为 文件系统


<span style="font-size:18px;"><configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
<description>数据存放的位置。</description>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
<description>指定副本个数为1,因为伪分布式。</description>
</property>
</configuration>
</span>

进入hbase shell之后  创建 表

<span style="font-size:18px;">hbase(main):007:0> create 'data_input', 'message'
0 row(s) in 1.1110 seconds

hbase(main):008:0> create 'data_output',{NAME=>'message',VERSION=>1}
0 row(s) in 1.0900 seconds

</span>

data_input表用来存放 mapreduce的输入数据

data_output 用来存放mapreduce的输出数据


然后往data_inout表中生成随机数据,这里用eclipse来操作Hbase  往表  data_input 里面写数据  代码如下:


<span style="font-size:18px;">package hbase_mapred1;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;


public class Importer1 {

    public static void main(String[] args) throws Exception {
        
        String [] pages = {"/", "/a.html", "/b.html", "/c.html"};
        
        //HBaseConfiguration hbaseConfig = new HBaseConfiguration();
        
      
        
     Configuration hbaseConfig=HBaseConfiguration.create();
        HTable htable = new HTable(hbaseConfig, "data_input");
        htable.setAutoFlush(false);
        htable.setWriteBufferSize(1024 * 1024 * 12);
        
        int totalRecords = 100000;
        int maxID = totalRecords / 1000;
        Random rand = new Random();
        System.out.println("importing " + totalRecords + " records ....");
        for (int i=0; i < totalRecords; i++)
        {
            int userID = rand.nextInt(maxID) + 1;
            byte [] rowkey = Bytes.add(Bytes.toBytes(userID), Bytes.toBytes(i));
            String randomPage = pages[rand.nextInt(pages.length)];
            Put put = new Put(rowkey);
            put.add(Bytes.toBytes("message"), Bytes.toBytes("page"), Bytes.toBytes(randomPage));
            htable.put(put);
        }
        htable.flushCommits();
        htable.close();
        System.out.println("done");
    }
}
</span>

到这里为止,数据已经写入到 表data_input表中去了,接下来用该表的数据

作为Mapreduce的输入数据 

代码如下:


<span style="font-size:18px;">package hbase_mapred1;

import java.io.IOException;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;


public class FreqCounter1 {

    static class Mapper1 extends TableMapper<ImmutableBytesWritable, IntWritable> {

        private int numRecords = 0;
        private static final IntWritable one = new IntWritable(1);

        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
            // extract userKey from the compositeKey (userId + counter)
            ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_INT);
            try {
                context.write(userKey, one);
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
            numRecords++;
            if ((numRecords % 10000) == 0) {
                context.setStatus("mapper processed " + numRecords + " records so far");
            }
        }
    }

    public static class Reducer1 extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

        public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            Put put = new Put(key.get());
            put.add(Bytes.toBytes("message"), Bytes.toBytes("total"), Bytes.toBytes(sum));
            System.out.println(String.format("stats :   key : %d,  count : %d", Bytes.toInt(key.get()), sum));
            context.write(key, put);
        }
    }
    
    public static void main(String[] args) throws Exception {
        HBaseConfiguration conf = new HBaseConfiguration();
        Job job = new Job(conf, "Hbase_FreqCounter1");
        job.setJarByClass(FreqCounter1.class);
        Scan scan = new Scan();
   //     String columns = "details"; // comma seperated
      //  scan.addColumns(columns);
        scan.setFilter(new FirstKeyOnlyFilter());
        TableMapReduceUtil.initTableMapperJob("data_input", scan, Mapper1.class, ImmutableBytesWritable.class,
                IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob("data_output", Reducer1.class, job);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}</span>

大概逻辑为:


map阶段读取datainput中的数据,然后标记为 1    熟悉map  reduce 流程的同学应该很容易理解  


将 data input表中相同的key  在shuffle阶段在一起   然后 reduce阶段读取有多少个 相同的key  相加  得到一个总数  将该总数存入data_output表中


最后来验证结果,

由于 无法直接阅读 Hbase中的数据 ,用以个程序将 hbase中的数据转换为可以阅读的数据格式  代码如下


<span style="font-size:18px;">package hbase_mapred1;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;

public class PrintUserCount {

    public static void main(String[] args) throws Exception {

        HBaseConfiguration conf = new HBaseConfiguration();
        HTable htable = new HTable(conf, "data_output");

        Scan scan = new Scan();
        ResultScanner scanner = htable.getScanner(scan);
        Result r;
        while (((r = scanner.next()) != null)) {
            ImmutableBytesWritable b = r.getBytes();
            byte[] key = r.getRow();
            int userId = Bytes.toInt(key);
            byte[] totalValue = r.getValue(Bytes.toBytes("message"), Bytes.toBytes("total"));
            int count = Bytes.toInt(totalValue);

            System.out.println("key: " + userId+ ",  count: " + count);
        }
        scanner.close();
        htable.close();
    }
}</span>


<span style="font-size:18px;">key: 1,  count: 1007
key: 2,  count: 1034
key: 3,  count: 962
key: 4,  count: 1001
key: 5,  count: 1024
key: 6,  count: 1033
key: 7,  count: 984
key: 8,  count: 987
key: 9,  count: 988
key: 10,  count: 990
key: 11,  count: 1069
key: 12,  count: 965
key: 13,  count: 1000
key: 14,  count: 998
key: 15,  count: 1002
key: 16,  count: 983
。。。</span>



注意相应的包需要导入程序   目录结构为



相关内容