读书人

NIO Reactor方式(阅读NIO笔记)

发布时间: 2012-12-22 12:05:06 作者: rapoo

NIO Reactor模式(阅读NIO笔记)

注:内容均节选自附件中的ppt文档。

?

1.网络服务一般的结构:

? 读取请求--->解码请求--->处理服务--->编码响应--->发送响应

经典的服务设计是“每一个请求一个线程”,如下图


NIO Reactor方式(阅读NIO笔记)
?2.Reactor模式

Reactor响应I/O事件,分发到合适的Handler处理。

Handler执行非阻塞的动作。

基本的Reactor设计,单线程版本


NIO Reactor方式(阅读NIO笔记)

示例代码:

?

package com.zhang.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Set;public class Reactor implements Runnable {    final Selector selector;    final ServerSocketChannel serverSocketChannel;    public Reactor(int port) throws IOException {        selector = Selector.open();        serverSocketChannel = ServerSocketChannel.open();        serverSocketChannel.socket().bind(new InetSocketAddress(port));        serverSocketChannel.configureBlocking(false);        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        key.attach(new Acceptor());    }    @Override    public void run() {        while (!Thread.interrupted()) {            try {                selector.select();                Set<SelectionKey> selectionKeys = selector.selectedKeys();                for(SelectionKey selectionKey : selectionKeys){                    dispatch(selectionKey);                }                selectionKeys.clear();            } catch (IOException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }    }    private void dispatch(SelectionKey selectionKey) {        Runnable run = (Runnable) selectionKey.attachment();        if(run != null){            run.run();        }    }        class Acceptor implements Runnable{        @Override        public void run() {            try {                SocketChannel channel = serverSocketChannel.accept();                if(channel != null){                    new Handler(selector,channel);                }            } catch (IOException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                    }    }}
?

?

import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;public class Handler implements Runnable {    private final static int DEFAULT_SIZE = 8092;    private final SocketChannel socketChannel;    private final SelectionKey seletionKey;    private static final int READING = 0;    private static final int SENDING = 1;        private int state = READING;    ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);    ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);    public Handler(Selector selector, SocketChannel channel) throws IOException {        this.socketChannel = channel;        socketChannel.configureBlocking(false);        this.seletionKey = socketChannel.register(selector, 0);        seletionKey.attach(this);        seletionKey.interestOps(SelectionKey.OP_READ);        selector.wakeup();    }    @Override    public void run() {        if(state == READING){            read();        }else if(state == SENDING){            write();        }    }        class Sender implements Runnable {        @Override        public void run() {            try {                socketChannel.write(outputBuffer);            } catch (IOException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }            if(outIsComplete()){                seletionKey.cancel();            }                                }            }    private void write() {        try {            socketChannel.write(outputBuffer);        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        while(outIsComplete()){            seletionKey.cancel();        }            }    private void read() {        try {            socketChannel.read(inputBuffer);            if(inputIsComplete()){                process();                seletionKey.attach(new Sender());                seletionKey.interestOps(SelectionKey.OP_WRITE);                seletionKey.selector().wakeup();            }        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                    }        public boolean inputIsComplete(){        return false;    }        public boolean outIsComplete(){        return false;            }        public void process(){            }}
?

NIO支持的特性:

?

Channels,连接到文件、socket等等,支持非阻塞读。Buffers,与数组相似的对象,能直接从Channels中读写。Selectors,辨识哪一个通道的集合有IO事件。(Tell which of a set of Channels have IO events)SelectionKeys,维护IO事件的状态和绑定。多线程设计添加线程增加可扩展性,主要应用在多核处理器中Worker 线程,Reactors要快速出发handlers。handlers的处理降低了Reactor的速度。将非I/O操作分离到其他线程中处理。Multiple Reactor Threads,多Reactor线程。Reactor线程可以使IO操作饱和,分布负载到其他的reactors,负载均衡来匹配CPU和IO之间的速度差异。
NIO Reactor方式(阅读NIO笔记)
?使用多个Reactors
NIO Reactor方式(阅读NIO笔记)
?

?

读书人网 >编程

热点推荐