读书人

讯息读

发布时间: 2013-12-13 13:57:17 作者: rapoo

消息读
netty使用了相关的算法计算出比较合适缓冲区大小,整个流程图如下

ReceiveBufferSizePredictor可以根据实际读取的字节大小数设置下次读写叫合适的缓冲区大小。类结构如下



AdaptiveReceiveBufferSizePredictor 提供了一种自适应的计算方式,如下代码所述,当改类初始化的时候,会填充SIZE_TABLE数组。

  private static final int[] SIZE_TABLE;    static {        List<Integer> sizeTable = new ArrayList<Integer>();        for (int i = 1; i <= 8; i ++) {            sizeTable.add(i);        }        for (int i = 4; i < 32; i ++) {            long v = 1L << i;            long inc = v >>> 4;            v -= inc << 3;            for (int j = 0; j < 8; j ++) {                v += inc;                if (v > Integer.MAX_VALUE) {                    sizeTable.add(Integer.MAX_VALUE);                } else {                    sizeTable.add((int) v);                }            }        }        SIZE_TABLE = new int[sizeTable.size()];//        for (int i = 0; i < SIZE_TABLE.length; i ++) {//            SIZE_TABLE[i] = sizeTable.get(i);//            System.out.println(SIZE_TABLE[i]);//        }    }

SIZE_TABLE数据长度为232,从以下数据可以看出
1-8的数据增量为1
8-16增量为1
16-32增量为2
32-64增量为4
64-128增量为8
128-256增量为16
依次类推如算法所述,1-8直接加入到数据,8以后的分段可以如下表示2^m到2^(m+1)之间的增量是2^(m-3)次方。如上算法的结果如下
123456789101112131415161820222426283032364044485256606472808896104112120128144160176192208224240256288320352384416448480512576640704768832896960102411521280140815361664179219202048230425602816307233283584384040964608512056326144665671687680819292161024011264122881331214336153601638418432204802252824576266242867230720327683686440960450564915253248573446144065536737288192090112983041064961146881228801310721474561638401802241966082129922293762457602621442949123276803604483932164259844587524915205242885898246553607208967864328519689175049830401048576117964813107201441792157286417039361835008196608020971522359296262144028835843145728340787236700163932160419430447185925242880576716862914566815744734003278643208388608943718410485760115343361258291213631488146800641572864016777216188743682097152023068672251658242726297629360128314572803355443237748736419430404613734450331648545259525872025662914560671088647549747283886080922746881006632961090519041174405121258291201342177281509949441677721601845493762013265922181038082348810242516582402684354563019898883355443203690987524026531844362076164697620485033164805368709126039797766710886407381975048053063688724152329395240961006632960107374182412079595521342177280147639500816106127361744830464187904819220132659202147483647

初始缓存大小为1024,可以在SIZE_TABLE中找到对应数组位置。当实际读取字节小于1024数组左偏,否则右偏;下次预计的时候根据本次已设置的索引情况来判断;直到超过最大或最小值时,以最值为主。
SocketReceiveBufferAllocator
SocketReceiveBufferAllocator.get(size)处理逻辑如下所述,没什么特别的。值得借鉴下段位移代码部分
Buffer = null                分配DirectBufferBuffer.capacity < size 分配DirectBufferBuffer.capacity * percent > size && exceedCount=maxExceedCount 分配DirectBufferBuffer.capacity * percent > size && exceedCount <maxExceedCount buffer重复使用Buffer.capacity * percent <= size exceedCount =0;buffer重复使用分配DirectBuffer  释放已申请的Buffer占用的系统内存;计算size进1024计算size进1024    // Normalize to multiple of 1024        int q = capacity >>> 10;        int r = capacity & 1023;        if (r != 0) {            q ++;        }        return q << 10;

java.nio.DirectByteBuffer
直接向操作系统请求分配一些资源空间,改空间并不是JAVA推内存。之后的所有写入读取操作都不通过堆内存处理,直接操作系统资源空间。通过unsafe.allocateMemory,setMemory操作资源空间。而最后一行代码,Cleaner是一个PhantomReference实现类
  DirectByteBuffer(int cap) {// package-privatesuper(-1, 0, cap, cap, false);Bits.reserveMemory(cap);int ps = Bits.pageSize();long base = 0;try {    base = unsafe.allocateMemory(cap + ps);} catch (OutOfMemoryError x) {    Bits.unreserveMemory(cap);    throw x;}unsafe.setMemory(base, cap + ps, (byte) 0);if (base % ps != 0) {    // Round up to page boundary    address = base + ps - (base & (ps - 1));} else {    address = base;}         //关键代码cleaner = Cleaner.create(this, new Deallocator(base, cap));

从如下代码可以看出Cleaner接受一个runnable对象,clean方法中调用runnable对象的run方法。
  private Cleaner(Object obj, Runnable runnable)    {        super(obj, dummyQueue);        next = null;        prev = null;        thunk = runnable;    }    public static Cleaner create(Object obj, Runnable runnable)    {        if(runnable == null)            return null;        else            return add(new Cleaner(obj, runnable));    }    public void clean()    {        if(!remove(this))            return;        try        {            thunk.run();

在这里,Deallocator即为runnable对象,可以看到由unsafe.freeMemory
public void run() {    if (address == 0) {// Paranoiareturn;    }    unsafe.freeMemory(address);    address = 0;    Bits.unreserveMemory(capacity);}

那clean方法调用时机在哪呢,正如JDK文档中指出当虚拟机确定对象虚可到达时,那么在那时将会被加入pending队列。在Reference中体现,
    /* List of References waiting to be enqueued.  The collector adds     * References to this list, while the Reference-handler thread removes     * them.  This list is protected by the above lock object.     */    private static Reference pending = null;

Reference的静态语句块中初始化一个后台线程,不断的遍历pending,处理pending中的每个节点
 static {ThreadGroup tg = Thread.currentThread().getThreadGroup();for (ThreadGroup tgn = tg;     tgn != null;     tg = tgn, tgn = tg.getParent());Thread handler = new ReferenceHandler(tg, "Reference Handler");/* If there were a special system-only priority greater than * MAX_PRIORITY, it would be used here */handler.setPriority(Thread.MAX_PRIORITY);handler.setDaemon(true);handler.start();    }

处理pending代码,注意Cleaner部分
public void run() {    for (;;) {Reference r;synchronized (lock) {    if (pending != null) {r = pending;Reference rn = r.next;pending = (rn == r) ? null : rn;r.next = r;    } else {try {    lock.wait();} catch (InterruptedException x) { }continue;    }}// 原来clean是在这处理的if (r instanceof Cleaner) {    ((Cleaner)r).clean();    continue;}ReferenceQueue q = r.queue;if (q != ReferenceQueue.NULL) q.enqueue(r);    }}    }

这种由JVM确定的调用时机,其实对开发来讲是被动的,不“靠谱的”,鬼知道它何时释放。所以我们只要争取主动释放。即调用Cleaner的clean即可。
netty中的ByteBufferUtil就干这事。
/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */package org.jboss.netty.util.internal;import java.lang.reflect.Method;import java.nio.ByteBuffer;/** * This is fork of ElasticSearch's ByteBufferAllocator.Cleaner class */public final class ByteBufferUtil {    private static final boolean CLEAN_SUPPORTED;    private static final Method directBufferCleaner;    private static final Method directBufferCleanerClean;    static {        Method directBufferCleanerX = null;        Method directBufferCleanerCleanX = null;        boolean v;        try {            directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");            directBufferCleanerX.setAccessible(true);            directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");            directBufferCleanerCleanX.setAccessible(true);            v = true;        } catch (Exception e) {            v = false;        }        CLEAN_SUPPORTED = v;        directBufferCleaner = directBufferCleanerX;        directBufferCleanerClean = directBufferCleanerCleanX;    }    /**     * Destroy the given {@link ByteBuffer} if possible     */    public static void destroy(ByteBuffer buffer) {        if (CLEAN_SUPPORTED && buffer.isDirect()) {            try {                Object cleaner = directBufferCleaner.invoke(buffer);                directBufferCleanerClean.invoke(cleaner);            } catch (Exception e) {                // silently ignore exception            }        }    }    private ByteBufferUtil() {        // Utility class    }}

读书人网 >开源软件

热点推荐