读书人

[ZooKeeper]联接中断,watch恢复,心跳和

发布时间: 2013-04-09 16:45:09 作者: rapoo

[ZooKeeper]连接中断,watch恢复,心跳和客户端超时

前一篇文章分析了server端主动超时session的情况,接下来看一下client和server网络暂时中断的情况。

1.和server主动关闭连接一样,client抛出EndOfStreamException异常,此时客户端状态还是CONNECTED

2.SendThread处理异常,清理连接,将当前所有请求置为失败,错误码是CONNECTIONLOSS

3.发送Disconnected状态通知

4.选下一个server重连

5.连上之后发送ConnectRequest,sessionid和password是当前session的数据

6.server端处理,分leader和follower,由于此时client端重试比较快,session还没超时,所以leader和follower端session校验成功。如果这个时候session正好超时了,则校验失败,client会抛出sessionExpired异常并退出

7.server端返回成功的ConnectResponse

8.client收到相应,发送SyncConnected状态通知给watcher

9.client发送SetWatches包,重建watch

//可以通过配置禁止重建watchif (!disableAutoWatchReset) {//当前的所有watch                    List<String> dataWatches = zooKeeper.getDataWatches();                    List<String> existWatches = zooKeeper.getExistWatches();                    List<String> childWatches = zooKeeper.getChildWatches();                    if (!dataWatches.isEmpty()                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {//发送重建请求                        SetWatches sw = new SetWatches(lastZxid,                                prependChroot(dataWatches),                                prependChroot(existWatches),                                prependChroot(childWatches));                        RequestHeader h = new RequestHeader();                        h.setType(ZooDefs.OpCode.setWatches);                        h.setXid(-8);                        Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);                        outgoingQueue.addFirst(packet);                    }                }

?10.server端收到setWatches请求,如果是follower,直接进入FinalRequestProcessor处理,无需proposal

case OpCode.setWatches: {                lastOp = "SETW";                SetWatches setWatches = new SetWatches();                // XXX We really should NOT need this!!!!                request.request.rewind();                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);                long relativeZxid = setWatches.getRelativeZxid();//添加watch                zks.getZKDatabase().setWatches(relativeZxid,                         setWatches.getDataWatches(),                         setWatches.getExistWatches(),                        setWatches.getChildWatches(), cnxn);                break;            }

?

//添加watch的时候判断watch是否需要触发public void setWatches(long relativeZxid, List<String> dataWatches,            List<String> existWatches, List<String> childWatches,            Watcher watcher) {        for (String path : dataWatches) {            DataNode node = getNode(path);            WatchedEvent e = null;            if (node == null) {                e = new WatchedEvent(EventType.NodeDeleted,                        KeeperState.SyncConnected, path);            } else if (node.stat.getCzxid() > relativeZxid) {                e = new WatchedEvent(EventType.NodeCreated,                        KeeperState.SyncConnected, path);            } else if (node.stat.getMzxid() > relativeZxid) {                e = new WatchedEvent(EventType.NodeDataChanged,                        KeeperState.SyncConnected, path);            }            if (e != null) {                watcher.process(e);            } else {                this.dataWatches.addWatch(path, watcher);            }        }        for (String path : existWatches) {            DataNode node = getNode(path);            WatchedEvent e = null;            if (node == null) {                // This is the case when the watch was registered            } else if (node.stat.getMzxid() > relativeZxid) {                e = new WatchedEvent(EventType.NodeDataChanged,                        KeeperState.SyncConnected, path);            } else {                e = new WatchedEvent(EventType.NodeCreated,                        KeeperState.SyncConnected, path);            }            if (e != null) {                watcher.process(e);            } else {                this.dataWatches.addWatch(path, watcher);            }        }        for (String path : childWatches) {            DataNode node = getNode(path);            WatchedEvent e = null;            if (node == null) {                e = new WatchedEvent(EventType.NodeDeleted,                        KeeperState.SyncConnected, path);            } else if (node.stat.getPzxid() > relativeZxid) {                e = new WatchedEvent(EventType.NodeChildrenChanged,                        KeeperState.SyncConnected, path);            }            if (e != null) {                watcher.process(e);            } else {                this.childWatches.addWatch(path, watcher);            }        }    }

?11.如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在

?

再来看看客户端主动超时Session和心跳的情况,SendThread主线程

public void run() {            clientCnxnSocket.introduce(this,sessionId);            clientCnxnSocket.updateNow();            clientCnxnSocket.updateLastSendAndHeard();    //selector的select超时时间,每次循环都会重新计算            int to;            long lastPingRwServer = System.currentTimeMillis();            while (state.isAlive()) {                try {                    ......//session建立之后,to为读超时减去读空闲时间                    if (state.isConnected()) {                        ......                        to = readTimeout - clientCnxnSocket.getIdleRecv();                    } else {                        to = connectTimeout - clientCnxnSocket.getIdleRecv();                    }                    //如果client长时间没收到server的packet,会导致读空闲时间很长,超过读超时,直接抛出异常                    if (to <= 0) {                        throw new SessionTimeoutException(                                "Client session timed out, have not heard from server in "                                        + clientCnxnSocket.getIdleRecv() + "ms"                                        + " for sessionid 0x"                                        + Long.toHexString(sessionId));                    }    //session建立之后,发送心跳                    if (state.isConnected()) {//如果写频繁,则写空闲时间很少,不用发送心跳                        int timeToNextPing = readTimeout / 2                                - clientCnxnSocket.getIdleSend();//写少,发心跳                        if (timeToNextPing <= 0) {                            sendPing();//上次发送时间                            clientCnxnSocket.updateLastSend();                        } //写繁忙,不用发送心跳else {                            if (timeToNextPing < to) {                                to = timeToNextPing;                            }                        }                    }.....//每次doTransport都会更新now,lastHeard和lastSend则取决于是否有读写请求                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);                } catch (Throwable e) {                   ....                        clientCnxnSocket.updateNow();                        clientCnxnSocket.updateLastSendAndHeard();                    }                }            }           .....        }

?心跳包,xid为-2

        private void sendPing() {            lastPingSentNs = System.nanoTime();            RequestHeader h = new RequestHeader(-2, OpCode.ping);            queuePacket(h, null, null, null, null, null, null, null, null);        }

server端处理ping包,如果是follower直接进入FinalRequestProcessor处理

case OpCode.ping: {                zks.serverStats().updateLatency(request.createTime);                lastOp = "PING";                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,                        request.createTime, System.currentTimeMillis());//心跳包的响应xid也是-2                cnxn.sendResponse(new ReplyHeader(-2,                        zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");                return;            }

?如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在

client收到心跳包响应,啥事不做

if (replyHdr.getXid() == -2) {                // -2 is the xid for pings                if (LOG.isDebugEnabled()) {                    LOG.debug("Got ping response for sessionid: 0x"                            + Long.toHexString(sessionId)                            + " after "                            + ((System.nanoTime() - lastPingSentNs) / 1000000)                            + "ms");                }                return;            }

?

?以上可以看出

1.心跳包只有写空闲时才会发送

2.每次transport的时候都会更新当前时间now

3.lastHeard和lastSend取决于是否有读写请求

4.客户端session超时和连接关闭CONNECTIONLOSS处理是一样的,都会导致重试

读书人网 >开源软件

热点推荐