java NIO读出数据比写入数据少
简单写了个C/S模式的NIO,例子:客户端向服务器写50000次数据,服务接受到数据后打印出来。
由于首次使用NIO,碰到个问题:通过计数显示,客户端写了50000次。
但是,服务器端执行读操作时,每次读到10000次左右时就不再读了,跳回了selector.select()监听。
按理说,数据没读完,我每次读取完也执行了key.interestOps(SelectionKey.OP_READ),应该是能直到数据读取完才对,才会回到事件监听。
可是,现在就是不对了,求教。是我理解有错,代码不会,还是什么原因?
客户端代码:
- Java code
public class MyMQService{ // 信道选择器 private Selector selector = null; // 与服务器通信的信道 private SocketChannel socketChannel = null; // 要连接的服务器Ip地址 private String hostIp = "localhost"; // 要连接的远程服务器在监听的端口 private int hostListenningPort = 33445; private byte[] buffer = new byte[256]; private static int count; /** * 构造函数 */ public MyMQService() { try { initialize(); } catch (IOException e) { System.out.println("初始化服务器连接异常" + e.getMessage()); e.printStackTrace(); } } /** * 初始化函数 * * @throws IOException 异常 */ private void initialize() throws IOException { // 打开监听信道并设置为非阻塞模式 socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort)); socketChannel.configureBlocking(false); // 打开并注册选择器到信道 selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); // 启动读取线程 new TCPClientReadThread(selector); } public void put(Object obj) throws IOException { synchronized (this) { count++; System.out.println("第@"+count); Message meg = new Message(1, obj); // 对象转数组 byte[] objarr = NioUtil.obj2Byte(meg); // 将对象赋值给固定长度的数组 System.arraycopy(objarr, 0, buffer, 0, objarr.length); // 发送本次对象 socketChannel.write(ByteBuffer.wrap(buffer)); } } public static void main(String[] args) throws IOException { MyMQService client1 = new MyMQService(); for (int i = 0; i < 10000; i++) { client1.put("dataxxxx"); } }}public class TCPClientReadThread implements Runnable { private Selector selector; // 超时时间,单位毫秒 private static final int TimeOut = 3000; public TCPClientReadThread(Selector selector) { this.selector = selector; new Thread(this).start(); } public void run() { // Object obj = null; try { while (true) { // 等待某信道就绪(或超时) if (selector.select(TimeOut) == 0) { // System.out.println("客户端运行中……"); continue; } // 遍历每个有可用IO操作Channel对应的SelectionKey for (SelectionKey key : selector.selectedKeys()) { // 如果该SelectionKey对应的Channel中有可读的数据 if (key.isReadable()) { ClientTCPProtocol ci = new ClientTCPProtocol(1024); ci.handleRead(key); // obj = ci.getMegObj(); // System.out.println("服务器返回消息:" + obj); // 为下一次读取作准备 key.interestOps(SelectionKey.OP_READ); } // 删除正在处理的SelectionKey selector.selectedKeys().remove(key); } } } catch (Exception ex) { ex.printStackTrace(); } }}服务器端代码:
- Java code
public class MyMQServer { // 超时时间,单位毫秒 private static final int TimeOut = 6000; // 本地监听端口 private static final int ListenPort = 33445; public static void main(String[] args) throws IOException { // 创建选择器 Selector selector = Selector.open(); // 打开监听信道 ServerSocketChannel listenerChannel = ServerSocketChannel.open(); // 与本地端口绑定 listenerChannel.socket().bind(new InetSocketAddress(ListenPort)); // 设置为非阻塞模式 listenerChannel.configureBlocking(false); // 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作 listenerChannel.register(selector, SelectionKey.OP_ACCEPT); // 创建一个处理协议的实现类,由它来具体操作 TCPProtocol protocol = new ServerTCPProtocol3(); // 反复循环,等待IO while (true) { // 等待某信道就绪(或超时) if (selector.select(TimeOut) == 0) { System.out.println("服务器运行中……"); continue; } // 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); try { if (key.isAcceptable()) { // 有客户端连接请求时 protocol.handleAccept(key); } if (key.isReadable()) { // 从客户端读取数据 protocol.handleRead(key); } if (key.isValid() && key.isWritable()) { // 客户端可写时 protocol.handleWrite(key); } } catch (IOException e) { // 出现IO异常(如客户端断开连接)时移除处理过的键 e.printStackTrace(); key.channel().close(); continue; } } } }}public class ServerTCPProtocol3 implements TCPProtocol { private static int icount = 0; private byte[] allBuffer = new byte[256]; // 当存在客户端访问时,判断是读,还是写 public void handleAccept(SelectionKey key) throws IOException { SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(256)); } public void handleRead(SelectionKey key) throws IOException { readObjByChannel(key); } /** * 从通道中读取对象放到服务器队列中 * * @param clientChannel 客户端通道 */ public void readObjByChannel(SelectionKey key) throws IOException { icount++; System.out.println("进入1"); SocketChannel channel = (SocketChannel) key.channel(); // 拿到256长度的缓冲区,一个key一个 ByteBuffer buffer = (ByteBuffer) key.attachment(); int count = channel.read(buffer); if (count > 0) { buffer.flip(); Message meg = (Message)NioUtil.byte2Obj(buffer.array()); // 客户端请求类型 int type = meg.getType(); if (type == 1) { // 向服务器发数据 try { System.out.println("进入2"); // 将对象消息放到队列中 System.out.println("接收到来自客户端:" + channel.socket().getRemoteSocketAddress() + "的消息," + meg.getMeg() + "@第" + icount + "次"); // 向客户端写成功消息 meg.setMeg("OK"); byte[] objarr = NioUtil.obj2Byte(meg); System.arraycopy(objarr, 0, allBuffer, 0, objarr.length); channel.write(ByteBuffer.wrap(allBuffer)); buffer.clear(); } catch (Exception e) { e.printStackTrace(); } } } else if(count < 0){ System.out.println("错啦"); channel.close(); } key.interestOps(SelectionKey.OP_READ); System.out.println("结束4"); } public void handleWrite(SelectionKey key) throws IOException { }
[解决办法]
Java对象的编码过程,应该不会有什么问题。
解码过程本身,也应该不会有什么问题。
问题的关键,应该是在接收缓冲区上面。
服务端对数据的接收上面,没有明显的,对TCP通讯过程当中,半包,粘包,的处理过程。
也就是说,客户端每次发送的数据之间,看不到是通过什么方式来区分的。
通俗点说,就是,本次发送的数据和下一次发送的数据,两次发送的数据,接收端通过什么方式来区分这两次的数据?
一般而言,我们会自己定义一个内部协议,来区分每次发送的数据。
比如,我在发送数据的前边,加上整个数据的长度信息,这样,通过长度就可以区分本次发送的数据到什么位置结束。
服务器每次读取数据,都会创建一个缓冲区来接收数据, 但是,如果缓冲区里面有一个以上的数据,或者,由于网络不稳定,本次只接收到了半个数据(当然,这里的一个、半个是指的每次发送的数据包),楼主的解码过程,就会出现明显的漏洞了。
看样子,楼主在测试代码的时候,选用的都是在同一个主机上,或者在局域网的环境下测试的吧。
这种情况下,网络通信的质量非常高,客户端发送的数据,基本上能够无阻碍的被服务端收到,所以,每次处理接收数据的过程中,其实,接收缓冲区里面可能存在一个以上的数据了。由于程序每次只处理一个数据包,就造成了丢包问题,那些没有被处理的数据包,被程序丢弃了。
正常的通信测试应该测试三个方面,半包、粘包和压力。
[解决办法]
“阻塞和非阻塞的形式,写到一起了”,数据丢失的原因是,服务器还没接收到数据,发送端就已经关闭,是么?
不是这样的,楼主你的代码改一个地方就可以了。
- Java code
public void put(Object obj) throws IOException { ... //关键是这一句 //socketChannel.write(ByteBuffer.wrap(buffer)); //改成如下 while(buff.hasRemaining()){ socketChannel.write(buff); } }