hadoop-hbase-endpoint例子,hadoophbase
hadoop-hbase-endpoint例子,hadoophbase
count下users表里使用google邮箱的用户数
服务端:
先定义个接口,extends CoprocessorProtocol,接口里定义个方法public long gmailCounter(String userId) throws IOException;
再定义个类,extends BaseEndpointCoprocessor,再implements刚才自己定义的接口GmailCounterProtocol,实现gmailCounter方法
在gmailCounter方法里先构造一个scan,然后InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);
这个InternalScanner需要do-while循环取值,比较怪异
客户端:
先构造:Batch.Call<GmailCounterProtocol, Long> callable
然后EXEC:Map<byte[], Long> rs = table.coprocessorExec(GmailCounterProtocol.class, startKey, endKey, callable);,必须指定startKey和endKey范围
最后循环取结果:for (Map.Entry<byte[], Long> e : rs.entrySet()) { e.getValue().longValue(); }
接口
package test.hbase.inaction.example5_3; import java.io.IOException; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; public interface GmailCounterProtocol extends CoprocessorProtocol { public long gmailCounter(String userId) throws IOException; }
package test.hbase.inaction.example5_3; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; import test.hbase.inaction.example2_4.UsersDAO; public class UserEndpointServer extends BaseEndpointCoprocessor implements GmailCounterProtocol { private final Log log = LogFactory.getLog(this.getClass()); @Override public long gmailCounter(String prefix) throws IOException { log.info("================================== begin"); Scan scan = new Scan(prefix.getBytes()); scan.setFilter(new PrefixFilter(prefix.getBytes())); scan.addColumn(UsersDAO.INFO_FAM, UsersDAO.EMAIL_COL); scan.setMaxVersions(1); RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment(); InternalScanner scanner = env.getRegion().getScanner(scan); long sum = 0; List<KeyValue> kvList = new ArrayList<KeyValue>(); boolean hasMore = false; do { hasMore = scanner.next(kvList); for (KeyValue kv : kvList) { log.info(Bytes.toString(kv.getRow()) + ":" + Bytes.toString(kv.getFamily()) + ":" + Bytes.toString(kv.getQualifier()) + "=" + Bytes.toString(kv.getValue())); String email = Bytes.toString(kv.getValue()); if (email != null && email.toLowerCase().endsWith("@gmail.com")) { sum++; } log.info("sum=" + sum); } kvList.clear(); } while (hasMore); scanner.close(); log.info("================================== end"); return sum; } }
package test.hbase.inaction.example5_3; import java.io.IOException; import java.util.Arrays; import java.util.Map; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.util.Bytes; import test.hbase.inaction.example2_4.UsersDAO; public class UserEndpointClient { private static HTablePool pool = new HTablePool(); private static HTableInterface table = pool.getTable(UsersDAO.TABLE_NAME); /** * @param args * @throws Throwable * @throws IOException */ public static void main(String[] args) throws Throwable { final String userId = "id"; Batch.Call<GmailCounterProtocol, Long> callable = new Batch.Call<GmailCounterProtocol, Long>() { @Override public Long call(GmailCounterProtocol instance) throws IOException { return instance.gmailCounter(userId); } }; byte[] startKey = Bytes.toBytes(userId); byte[] endKey = Arrays.copyOf(startKey, startKey.length); endKey[endKey.length - 1]++; Map<byte[], Long> rs = table.coprocessorExec( GmailCounterProtocol.class, startKey, endKey, callable); long sum = 0; for (Map.Entry<byte[], Long> e : rs.entrySet()) { sum += e.getValue().longValue(); } System.out.println(sum); } }
hbase-env.sh
export HBASE_CLASSPATH=/home/u/myjar/FollowsObserver.jar
hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://172.10.10.144:9000/hbase</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hbase.coprocessor.region.classes</name>
<value>test.hbase.inaction.example5_3.UserEndpointServer</value>
</property>
</configuration>
HBase数据库log日志
2014-10-31 16:47:45,028 INFO test.hbase.inaction.example5_3.UserEndpointServer: id01:info:email=wyjxx@gmail.com
2014-10-31 16:47:45,028 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=1
2014-10-31 16:47:45,028 INFO test.hbase.inaction.example5_3.UserEndpointServer: id02:info:email=asdasdf@qq.com
2014-10-31 16:47:45,028 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=1
2014-10-31 16:47:45,029 INFO test.hbase.inaction.example5_3.UserEndpointServer: id03:info:email=xxx@gmail.com
2014-10-31 16:47:45,029 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=2
2014-10-31 16:47:45,029 INFO test.hbase.inaction.example5_3.UserEndpointServer: id04:info:email=yyy@163.com
2014-10-31 16:47:45,029 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=2
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: id05:info:email=zzz@sohu.com
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=2
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: id06:info:email=test@gmail.com
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=3
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: id07:info:email=test7@gmail.com
2014-10-31 16:47:45,030 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=4
2014-10-31 16:47:45,031 INFO test.hbase.inaction.example5_3.UserEndpointServer: id08:info:email=test8@gmail.com
2014-10-31 16:47:45,031 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=5
2014-10-31 16:47:45,031 INFO test.hbase.inaction.example5_3.UserEndpointServer: id09:info:email=test9@sina.com.cn
2014-10-31 16:47:45,031 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=5
2014-10-31 16:47:45,032 INFO test.hbase.inaction.example5_3.UserEndpointServer: id99:info:email=test99@gmail.com
2014-10-31 16:47:45,032 INFO test.hbase.inaction.example5_3.UserEndpointServer: sum=6
问题写的太粗了,建议去我的空间看看,有两篇关于这人的日志,希望对你有帮助。
我忙了好久解决了。以下几个思路:
1,是否是安全模式开着?safe mode
对于safe mode .网上说有2种方法:a,bin/hadoop dfsadmin -safemode leave b,修改dfs.safemode.threshold.pct.我用的是方法b。具体就是在hdfs-site.xml中添加一该属性。
2,注意,运行权限。具体问题再说。
评论暂时关闭