读书人

hbase的CoprocessorProtocol及一个容易

发布时间: 2013-09-04 10:34:09 作者: rapoo

hbase的CoprocessorProtocol及一个简单的通用扩展实现
hbase中的CoprocessorProtocol机制.

CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。


先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。


这里由于
public class CountAndSumResult implements Writable {private List<Long> resultList = new ArrayList<Long>();private Long count = 0L;public CountAndSumResult() {}public CountAndSumResult(int resultSize) {for (int i = 0; i < resultSize; i++) {resultList.add(0L);}}public Long getCount() {return count;}public void setCount(Long count) {this.count = count;}public Long getSum(int i) {return resultList.get(i);}public void setSum(int i, Long sum) {resultList.set(i, sum);}public int getResultSize() {return resultList.size();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(count);out.writeInt(resultList.size());for (Long v : resultList) {out.writeLong(v);}}@Overridepublic void readFields(DataInput in) throws IOException {count = in.readLong();int size = in.readInt();for (int i = 0; i < size; i++) {resultList.add(in.readLong());}}}public class CountAndSumHandler implements RowHandler<CountAndSumResult> {private List<String> columns = new ArrayList<String>();public CountAndSumHandler() {}public CountAndSumHandler(List<String> columns) {super();this.columns = columns;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(columns.size());for (String s : columns) {out.writeUTF(s);}}@Overridepublic void readFields(DataInput in) throws IOException {int size = in.readInt();for (int i = 0; i < size; i++) {columns.add(in.readUTF());}}@Overridepublic CountAndSumResult handle(List<KeyValue> keyValues,CountAndSumResult t) {if (!keyValues.isEmpty()) {t.setCount(t.getCount() + 1);}for (int i = 0; i < columns.size(); i++) {String column = columns.get(i);for (KeyValue kv : keyValues) {if (column.equals(Bytes.toString(kv.getQualifier()))) {byte[] value = kv.getValue();if (value == null || value.length == 0) {} else {Long tValue = Bytes.toLong(value);t.setSum(i, t.getSum(i) + tValue);}break;}}}return t;}@Overridepublic CountAndSumResult getInitValue() {return new CountAndSumResult(columns.size());}}public class CountAndSumReducer implementsMyReducer<CountAndSumResult, CountAndSumResult> {@Overridepublic CountAndSumResult getInitValue() {return null;}@Overridepublic CountAndSumResult reduce(CountAndSumResult r, CountAndSumResult t) {if (r == null) {return t;}if (t == null) {return r;}r.setCount(r.getCount() + t.getCount());int size = r.getResultSize();for (int i = 0; i < size; i++) {r.setSum(i, r.getSum(i) + t.getSum(i));}return r;}}


有了CoprocessorProtocol,可以扩展出来很多的功能,这个机制还是很强大的。

读书人网 >互联网

热点推荐