读书人

施用Java的nio实现高效能的网络通信

发布时间: 2012-08-26 16:48:05 作者: rapoo

使用Java的nio实现高效能的网络通信

在使用Java的API构建网络通信的应用时(尤其是基于移动的网络),有两种技术可供选择。第一种为直接使用Socket/Server Socket+输入输出流来构建,另外一种使用ServerSocketChannel+Selector。前者为阻塞式通信,需要服务端启动很多线程监听每一个客户端的Socket,通信效率低,且服务端容易受客户端的影响。比如当某个客户端当掉之后,服务的向这个客户端写数据的线程就会一直阻塞,直到写数据超时。所以我们在开发网络通信的应用时,首选后者。如下的代码实现了网络聊天/的功能,在加上语音或视频的录制功能又可实现网络的音频会议或视屏会议。网上和书上有很多类似的例子,但大多没有经过很好的测试,比如有一个客户端突然退出或者当掉,会影响服务端的运行。而我们的实际情况是要求服务端不应该受客户端的影响。

?

代码如下:

?

服务端:

package com.nio;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.nio.channels.Channel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;/** * 非阻塞通信的服务端 * @author Alex * */public class NioServer {private class NSServerThread extends Thread {private String ip;private int port;public NSServerThread(String ip, int port) {this.ip = ip;this.port = port;}public void run() {try {init(this.ip, this.port);} catch (UnknownHostException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}public void init(String ip, int port) throws IOException { // 用于检测所有Channel状态的SelectorSystem.out.println("打开NioPTTServer 端口为:" + port);Selector selector = null;// 定义实现编码、解码的字符集对象selector = Selector.open();// 通过open方法来打开一个未绑定的ServerSocketChannel实例ServerSocketChannel server = ServerSocketChannel.open();// InetSocketAddress isa = new InetSocketAddress(ip, port);InetSocketAddress isa = new InetSocketAddress(ip, port);// 将该ServerSocketChannel绑定到指定IP地址server.socket().bind(isa);// 设置ServerSocket以非阻塞方式工作server.configureBlocking(false);// 将server注册到指定Selector对象server.register(selector, SelectionKey.OP_ACCEPT);while (true) {int keys = selector.select();// System.out.println("keys:"+keys);if (keys > 0) {// 依次处理selector上的每个已选择的SelectionKey// SelectionKey removeReadSk=null;try {for (SelectionKey sk : selector.selectedKeys()) {// 从selector上的已选择Key集中删除正在处理的SelectionKeyselector.selectedKeys().remove(sk);// 如果sk对应的通道包含客户端的连接请求if (sk.isAcceptable()) {// 调用accept方法接受连接,产生服务器端对应的SocketChannelSocketChannel sc = server.accept();// 设置采用非阻塞模式sc.configureBlocking(false);// 将该SocketChannel也注册到selectorsc.register(selector, SelectionKey.OP_READ);// 将sk对应的Channel设置成准备接受其他请求sk.interestOps(SelectionKey.OP_ACCEPT);}// 如果sk对应的通道有数据需要读取if (sk.isReadable()) {// 获取该SelectionKey对应的Channel,该Channel中有可读的数据SocketChannel sc = (SocketChannel) sk.channel();// 定义准备执行读取数据的ByteBufferByteBuffer buff = ByteBuffer.allocate(1024);// 开始读取数据int len = 0;try {if ((len = sc.read(buff)) > 0) {buff.flip();// 缓存 2指针复位 准备下次读取数据System.out.println("读取数据:" + buff.array());sk.interestOps(SelectionKey.OP_READ);}else{System.out.println("没有数据读取,关闭通道");sk.cancel();if (sk.channel() != null) {sk.channel().close();}}// 将sk对应的Channel设置成准备下一次读取}// 如果捕捉到该sk对应的Channel出现了异常,即表明该Channel// 对应的Client出现了问题,所以从Selector中取消sk的注册catch (IOException ex) {// 从Selector中删除指定的SelectionKeyex.printStackTrace();sk.cancel();if (sk.channel() != null) {sk.channel().close();}System.out.println("关闭一个客户端");}// 如果content的长度大于0,即聊天信息不为空if (len > 0) {// 遍历该selector里注册的所有SelectKeySelectionKey writeKey = null;for (SelectionKey key : selector.keys()) {// 获取该key对应的ChannelChannel targetChannel = key.channel();// 如果该channel是SocketChannel对象if (targetChannel instanceof SocketChannel) {// && key != sk) {// 将读到的内容写入该Channel中 ,返回到客户端if (targetChannel instanceof SocketChannel&& key != sk) {SocketChannel dest = null;try {// 将读到的内容写入该Channel中 ,返回到客户端dest = (SocketChannel) targetChannel;System.out.println("写数据:"+ buff.array());dest.write(buff);} catch (Exception e) {// 写异常,关闭通道e.printStackTrace();if (dest != null) {dest.close();}targetChannel.close();writeKey = key;}}}}if (writeKey != null) {writeKey.cancel();if (writeKey.channel() != null) {writeKey.channel().close();}}}}}} catch (Exception e) {e.printStackTrace();// selector.close();// server.close();// System.out.println("发生异常,重新打开端口");// init(ip,port);// throw new RuntimeException(e);}}}}public static void main(String[] args) throws IOException {// System.out.println(InetAddress.getLocalHost().getHostAddress());String host = InetAddress.getLocalHost().getHostAddress();int port = 30000;new NioServer().new NSServerThread(host, port).start();// new NServer().init(InetAddress.getLocalHost().getHostAddress(),// 30000);System.out.println("Nio服务端启动了,host:" + host);}}

?

?

客户端代码:

package com.nio;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Scanner;/** * 非阻塞通信的服务端 * @author Alex * */public class NioClient {public void init(String ip, int port) throws IOException {// 定义检测SocketChannel的Selector对象Selector selector = null;// 定义处理编码和解码的字符集//Charset charset = Charset.forName("UTF-8");// 客户端SocketChannelSocketChannel sc = null;selector = Selector.open();InetSocketAddress isa = new InetSocketAddress(ip, port);// 调用open静态方法创建连接到指定主机的SocketChannelsc = SocketChannel.open(isa);// 设置该sc以非阻塞方式工作sc.configureBlocking(false);// 将SocketChannel对象注册到指定Selectorsc.register(selector, SelectionKey.OP_READ);// 启动读取服务器端数据的线程------------new ReadDataThread(selector).start();new SendDataThread(sc,selector).start();}private class SendDataThread extends Thread{SocketChannel sc;Selector selector;public SendDataThread(SocketChannel sc,Selector selector){this.sc=sc;this.selector=selector;}public void run(){// 创建键盘输入流-----------Scanner scan = new Scanner(System.in);ByteBuffer byteBuf=ByteBuffer.allocate(152);File file=new File("D:/未命名2.jpg");try{InputStream in=new FileInputStream(file);int len=0;while (scan.hasNextLine()) {// 读取键盘输入String line = scan.nextLine();// 将键盘输入的内容输出到SocketChannel中if("send".equals(line.trim())){byte[] buf=new byte[152];if((len=in.read(buf))>-1){byteBuf.put(buf);byteBuf.flip();sc.write(byteBuf);byteBuf.clear();}byteBuf.put(buf);byteBuf.flip();sc.write(byteBuf);byteBuf.clear();}else if("quit".equals(line.trim())){sc.close();selector.selectedKeys().clear();selector.close();}//sc.write(charset.encode(line));}}catch(Exception e){e.printStackTrace();}}}//读取服务器数据的----客户端监听线程private class ReadDataThread extends Thread {Selector selector;public ReadDataThread(Selector selector)throws IOException {this.selector = selector;}public void run() {try {ByteBuffer buff = ByteBuffer.allocate(152);while (selector.select() > 0) {// 遍历每个有可用IO操作Channel对应的SelectionKeyfor (SelectionKey sk : selector.selectedKeys()) {// 删除正在处理的SelectionKeyselector.selectedKeys().remove(sk);// 如果该SelectionKey对应的Channel中有可读的数据if (sk.isReadable()) {// 使用NIO读取Channel中的数据SocketChannel sc = (SocketChannel) sk.channel();String content = "";if (sc.read(buff) > 0) {sc.read(buff);buff.flip();content = new String(buff.array());// charset.decode(buff);buff.clear();}// 打印输出读取的内容System.out.println("接收的数据:" + content);// 为下一次读取作准备sk.interestOps(SelectionKey.OP_READ);}}}} catch (Exception ex) {ex.printStackTrace();}}}public static void main(String[] args) throws IOException {new NioClient().init("192.168.42.128", 30000);}}

?

读书人网 >网络协议

热点推荐