读书人

hbase-0.92.1-cdh4.1.3的HTablePool兑

发布时间: 2013-07-01 12:33:04 作者: rapoo

hbase-0.92.1-cdh4.1.3的HTablePool实现

hbase-0.92.1-cdh4.1.3的HTablePool实现:

?

1. PoolType:
Reusable: (默认)一个实例池,多线程复用,内部用ConcurrentLinkedQueue装多个实例HTable;
ThreadLocal: 每个线程只有一个实例,线程与线程之间互不影响, ThreadLocal;
特点是随着线程的增多,Pool中的HTable增多,但互不影响;
RoundRobin: Pool中的HTable用CopyOnWriteArrayList装;

2. 初始化:
HTablePool pool = new HTablePool(conf, 5);
// 默认 PoolType.Reusable
pool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.ThreadLocal);
// PoolMap.PoolType.ThreadLocal
pool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.RoundRobin);
// PoolMap.PoolType.RoundRobin
实例化PoolMap
实例化HTablePool,此时还没有任何HTable实例?,tables为空;

3. 取得HTableInterface对象:
pool.getTable(TEST_TABLE_NAME);?
查看tables是否含有table,如果没有,创建一个HTable实例
返回HTable实例封装成PooledHTable实例返回
PooledHTable实例用完.close();后放置到PoolMap;

4. HTablePool可以容纳任何的Table的HTableInterface实例.
HTable实例会共享同一个zookeeper连接
HTable实例,如果同在一个RegionServer会共享同一个连接HBaseClient$Connection
HTablePool有最大尺寸,但并没有限制HTable实例不得大于这个尺寸,一旦超过这个尺寸就会实例化,但归还到实例池的时候,如果池满了会弃用;
HTable实例线程不安全;

?

注意点:
1. 在多线程使用HTablePool拿到同一个表的HTable时,如果线程个数大于maxsize会导致写入始终是autoflush!

public HTableInterface getTable(String tableName) {   // call the old getTable implementation renamed to findOrCreateTable   HTableInterface table = findOrCreateTable(tableName);   // return a proxy table so when user closes the proxy, the actual table   // will be returned to the pool   return new PooledHTable(table);}public void close() throws IOException {   returnTable(table);}private void returnTable(HTableInterface table) throws IOException {   // this is the old putTable method renamed and made private   String tableName = Bytes.toString(table.getTableName());   if (tables.size(tableName) >= maxSize) {     // release table instance since we're not reusing it     this.tables.remove(tableName, table);     this.tableFactory.releaseHTableInterface(table);     return;   }   tables.put(tableName, table);}

?

如果tables.size大于maxsize,此时会去掉一个保存的HTable对象,而releaseHTableInterface实际调用的就是HTable的close方法,close方法又会强制flushHTable的buffer,因此,如果我们想不使用autoflush提升写入速度失效。

2. 改写HTable,的flushCommit(固定频率+内存占用>1M)

@Overridepublic void put(final List<Put> puts) throws IOException {    super.put(puts);    needFlush();}private void needFlush() throws IOException {    long currentTime = System.currentTimeMillis();    if ((currentTime - lastFlushTime.longValue()) > flushInterval) {        super.flushCommits();        lastFlushTime.set(currentTime);    }}

?

初始化使用代码样例

?

import java.io.IOException;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.HTablePool;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;import org.junit.BeforeClass;import org.junit.Test;public class HTablePoolTest2 {protected static String TEST_TABLE_NAME = "testtable";protected static String ROW1_STR = "row1";protected static String COLFAM1_STR = "colfam1";protected static String QUAL1_STR = "qual1";private final static byte[] ROW1 = Bytes.toBytes(ROW1_STR);private final static byte[] COLFAM1 = Bytes.toBytes(COLFAM1_STR);private final static byte[] QUAL1 = Bytes.toBytes(QUAL1_STR);private static HTablePool pool;@BeforeClasspublic static void runBeforeClass() throws IOException {Configuration conf = HBaseConfiguration.create();// 默认使用PoolType.Reusablepool = new HTablePool(conf, 10);// 初始化填充poolHTableInterface[] tables = new HTableInterface[10];for (int n = 0; n < 10; n++) {tables[n] = pool.getTable(TEST_TABLE_NAME);}// close后,PooledTable就放回了poolfor (HTableInterface table : tables) {table.close();}}@Testpublic void testHTablePool() throws IOException, InterruptedException,ExecutionException {Callable<Result> callable = new Callable<Result>() {public Result call() throws Exception {return get();}};FutureTask<Result> task1 = new FutureTask<Result>(callable);FutureTask<Result> task2 = new FutureTask<Result>(callable);Thread thread1 = new Thread(task1, "THREAD-1");thread1.start();Thread thread2 = new Thread(task2, "THREAD-2");thread2.start();Result result1 = task1.get();System.out.println(Bytes.toString(result1.getValue(COLFAM1, QUAL1)));Result result2 = task2.get();System.out.println(Bytes.toString(result2.getValue(COLFAM1, QUAL1)));}private Result get() {HTableInterface table = pool.getTable(TEST_TABLE_NAME);Get get = new Get(ROW1);try {Result result = table.get(get);return result;} catch (IOException e) {e.printStackTrace();return null;} finally {try {table.close();} catch (IOException e) {e.printStackTrace();}}}}

http://blog.csdn.net/mrtitan/article/details/8892815
http://helpbs.iteye.com/blog/1492054

读书人网 >其他数据库

热点推荐