利用mapreduce 访问hbase数据


package com.mr.test;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.Logger;


public class MRHbase {
	private static Logger log = Logger.getLogger(MRHbase.class);
	public static String family = "charactor";
	public static String col = "hobby";

	public static class HMap extends TableMapper<Text, Text> {
		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				Context context) throws IOException, InterruptedException {
//			KeyValue kv = value.getColumnLatest(family.getBytes(),
//					col.getBytes());
//			context.write(new Text(Bytes.toString(kv.getKey())),
//					new Text(Bytes.toString(kv.getValue())));
			byte[] v = value.getValue(family.getBytes(), col.getBytes());
			byte[] r = value.getRow();
			context.write(new Text(Bytes.toString(v)), new Text(Bytes.toString(r)));
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			while (values.hasNext()) {
				output.collect(key, values.next());
			}
		}
	}

	public static void main(String[] args) {
		Configuration conf = HBaseConfiguration.create();
		try {
			Job job = new Job(conf, "hbase test");
			job.setJarByClass(MRHbase.class);
			Scan scan = new Scan();
			scan.addColumn(family.getBytes(), col.getBytes());
			TableMapReduceUtil.initTableMapperJob("person", scan, HMap.class,
					Text.class, Text.class, job);
			job.setOutputFormatClass(TextOutputFormat.class);
			FileOutputFormat.setOutputPath(job, new Path(args[0]));
			job.waitForCompletion(true);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
}

注:要把zookeeper添加到hadoop/lib目录下,master&slaves

相关内容