读书人

[ZooKeeper]Client Session失灵

发布时间: 2013-04-20 19:43:01 作者: rapoo

[ZooKeeper]Client Session失效
if (sockKey.isReadable()) {//返回-1 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); }

?SendThread处理异常

 else {                        // this is ugly, you have a better way speak up                        if (e instanceof SessionExpiredException) {                            LOG.info(e.getMessage() + ", closing socket connection");                        } else if (e instanceof SessionTimeoutException) {                            LOG.info(e.getMessage() + RETRY_CONN_MSG);                        } //连接被关闭了else if (e instanceof EndOfStreamException) {                            LOG.info(e.getMessage() + RETRY_CONN_MSG);                        } else if (e instanceof RWServerFoundException) {                            LOG.info(e.getMessage());                        } else {                            LOG.warn(                                    "Session 0x"                                            + Long.toHexString(getSessionId())                                            + " for server "                                            + clientCnxnSocket.getRemoteSocketAddress()                                            + ", unexpected error"                                            + RETRY_CONN_MSG, e);                        }//清理连接,失败当前请求                        cleanup();//此时state还是CONNECTED,发送DISCONNECTED状态通知                        if (state.isAlive()) {                            eventThread.queueEvent(new WatchedEvent(                                    Event.EventType.None,                                    Event.KeeperState.Disconnected,                                    null));                        }                        clientCnxnSocket.updateNow();                        clientCnxnSocket.updateLastSendAndHeard();                    }

?接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。

//此处返回true,因为连接已经被置为null了if (!clientCnxnSocket.isConnected()) {//重连时,先sleep一下                        if(!isFirstConnect){                            try {                                Thread.sleep(r.nextInt(1000));                            } catch (InterruptedException e) {                                LOG.warn("Unexpected exception", e);                            }                        }                        // don't re-establish connection if we are closing                        if (closing || !state.isAlive()) {                            break;                        }//重新开始连接                        startConnect();                        clientCnxnSocket.updateLastSendAndHeard();                    }

?取下一个server

addr = hostProvider.next(1000);

?之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的

//新建session成功时,seenRwServerBefore已经被置为truelong sessId = (seenRwServerBefore) ? sessionId : 0;            ConnectRequest conReq = new ConnectRequest(0, lastZxid,                    sessionTimeout, sessId, sessionPasswd);

?接下来server端处理

//此时sessionId不为0 long sessionId = connReq.getSessionId();        if (sessionId != 0) {            long clientSessionId = connReq.getSessionId();            LOG.info("Client attempting to renew session 0x"                    + Long.toHexString(clientSessionId)                    + " at " + cnxn.getRemoteSocketAddress());//先关闭老的连接,如果有的话,删除watch            serverCnxnFactory.closeSession(sessionId);            cnxn.setSessionId(sessionId);            reopenSession(cnxn, sessionId, passwd, sessionTimeout);        } else {            LOG.info("Client attempting to establish new session at "                    + cnxn.getRemoteSocketAddress());            createSession(cnxn, passwd, sessionTimeout);        }

?重启session

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,            int sessionTimeout) throws IOException {//检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false        if (!checkPasswd(sessionId, passwd)) {            finishSessionInit(cnxn, false);        } //密码正确,再校验下session是否还有效,这里不同的server处理不一样else {            revalidateSession(cnxn, sessionId, sessionTimeout);        }    }

?重试的server如果是leader

    @Override    protected void revalidateSession(ServerCnxn cnxn, long sessionId,        int sessionTimeout) throws IOException {//父类中通过sessionTrack检查        super.revalidateSession(cnxn, sessionId, sessionTimeout);        try {            // setowner as the leader itself, unless updated            // via the follower handlers            setOwner(sessionId, ServerCnxn.me);        } catch (SessionExpiredException e) {            // this is ok, it just means that the session revalidation failed.        }    }}

?父类revalidateSession方法

    protected void revalidateSession(ServerCnxn cnxn, long sessionId,            int sessionTimeout) throws IOException {        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);        if (LOG.isTraceEnabled()) {            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,                                     "Session 0x" + Long.toHexString(sessionId) +                    " is valid: " + rc);        }        finishSessionInit(cnxn, rc);    }

?leader的SessionTracker为SessionTrackerImpl,touchSession方法如下

synchronized public boolean touchSession(long sessionId, int timeout) {        if (LOG.isTraceEnabled()) {            ZooTrace.logTraceMessage(LOG,                                     ZooTrace.CLIENT_PING_TRACE_MASK,                                     "SessionTrackerImpl --- Touch session: 0x"                    + Long.toHexString(sessionId) + " with timeout " + timeout);        }//因为session超时,session已经被删掉了,此处返回null,所以检查结果是false        SessionImpl s = sessionsById.get(sessionId);        // Return false, if the session doesn't exists or marked as closing        if (s == null || s.isClosing()) {            return false;        }.....

?对于leader来说session检查的结果是false。

如果是follower,其校验方法

protected void revalidateSession(ServerCnxn cnxn, long sessionId,            int sessionTimeout) throws IOException {//需要询问leader,session是否还有效        getLearner().validateSession(cnxn, sessionId, sessionTimeout);    }

?follower询问session是否有效

     */    void validateSession(ServerCnxn cnxn, long clientId, int timeout)            throws IOException {        LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));        ByteArrayOutputStream baos = new ByteArrayOutputStream();        DataOutputStream dos = new DataOutputStream(baos);        dos.writeLong(clientId);        dos.writeInt(timeout);        dos.close();//REVALIDATE包用来检查session是否还有效        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos                .toByteArray(), null);        pendingRevalidations.put(clientId, cnxn);        if (LOG.isTraceEnabled()) {            ZooTrace.logTraceMessage(LOG,                                     ZooTrace.SESSION_TRACE_MASK,                                     "To validate session 0x"                                     + Long.toHexString(clientId));        }        writePacket(qp, true);    } 

?leader端处理REVALIDATE包

case Leader.REVALIDATE:                    bis = new ByteArrayInputStream(qp.getData());                    dis = new DataInputStream(bis);                    long id = dis.readLong();                    int to = dis.readInt();                    ByteArrayOutputStream bos = new ByteArrayOutputStream();                    DataOutputStream dos = new DataOutputStream(bos);                    dos.writeLong(id);//这里由于session已经被删掉,返回false                    boolean valid = leader.zk.touch(id, to);                    if (valid) {                        try {                            //set the session owner                            // as the follower that                            // owns the session                            leader.zk.setOwner(id, this);                        } catch (SessionExpiredException e) {                            LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);                        }                    }                    if (LOG.isTraceEnabled()) {                        ZooTrace.logTraceMessage(LOG,                                                 ZooTrace.SESSION_TRACE_MASK,                                                 "Session 0x" + Long.toHexString(id)                                                 + " is valid: "+ valid);                    }//结果是false                    dos.writeBoolean(valid);                    qp.setData(bos.toByteArray());                    queuedPackets.add(qp);                    break;

?follower处理返回结果

case Leader.REVALIDATE:            revalidate(qp);            break;protected void revalidate(QuorumPacket qp) throws IOException {        ByteArrayInputStream bis = new ByteArrayInputStream(qp                .getData());        DataInputStream dis = new DataInputStream(bis);        long sessionId = dis.readLong();        boolean valid = dis.readBoolean();        ServerCnxn cnxn = pendingRevalidations        .remove(sessionId);        if (cnxn == null) {            LOG.warn("Missing session 0x"                    + Long.toHexString(sessionId)                    + " for validation");        } else {            zk.finishSessionInit(cnxn, valid);        }        if (LOG.isTraceEnabled()) {            ZooTrace.logTraceMessage(LOG,                    ZooTrace.SESSION_TRACE_MASK,                    "Session 0x" + Long.toHexString(sessionId)                    + " is valid: " + valid);        }    }

?可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {        ......        try {//由于valid是false,所以返回给client的sessionid为0,password为空            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no                            // longer valid                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);            ByteArrayOutputStream baos = new ByteArrayOutputStream();            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);            bos.writeInt(-1, "len");            rsp.serialize(bos, "connect");            if (!cnxn.isOldClient) {                bos.writeBool(                        this instanceof ReadOnlyZooKeeperServer, "readOnly");            }            baos.close();            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());            bb.putInt(bb.remaining() - 4).rewind();            cnxn.sendBuffer(bb);                ......    }

?client端处理ConnectResponse

void readConnectResult() throws IOException {        ......//被server重置为0了        this.sessionId = conRsp.getSessionId();        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,                conRsp.getPasswd(), isRO);    }

?

void onConnected(int _negotiatedSessionTimeout, long _sessionId,                byte[] _sessionPasswd, boolean isRO) throws IOException {            negotiatedSessionTimeout = _negotiatedSessionTimeout;//sessionid为0,抛出SessionExpiredException异常            if (negotiatedSessionTimeout <= 0) {//state设为CLOSED,这个行为将关闭SendThread                state = States.CLOSED;//Expired状态通知                eventThread.queueEvent(new WatchedEvent(                        Watcher.Event.EventType.None,                        Watcher.Event.KeeperState.Expired, null));//这个行为将关闭EventThread                eventThread.queueEventOfDeath();//抛出异常                throw new SessionExpiredException(                        "Unable to reconnect to ZooKeeper service, session 0x"                                + Long.toHexString(sessionId) + " has expired");            }           ......        }

?SendThread处理SessionExpiredException,关闭SendThread

 while (state.isAlive()) {......// this is ugly, you have a better way speak up                        if (e instanceof SessionExpiredException) {                            LOG.info(e.getMessage() + ", closing socket connection");                        } ......//关闭连接,失败所有请求                        cleanup();//此时state已经被置为CLOSED,SendThread将退出                        if (state.isAlive()) {                            eventThread.queueEvent(new WatchedEvent(                                    Event.EventType.None,                                    Event.KeeperState.Disconnected,                                    null));                        }                        clientCnxnSocket.updateNow();                        clientCnxnSocket.updateLastSendAndHeard();}//清理cleanup();//关闭selector            clientCnxnSocket.close();    if (state.isAlive()) {        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,                Event.KeeperState.Disconnected, null));    }    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),                             "SendThread exitedloop.");

?清理动作

private void cleanup() {//关闭socket            clientCnxnSocket.cleanup();//发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED            synchronized (pendingQueue) {                for (Packet p : pendingQueue) {                    conLossPacket(p);                }                pendingQueue.clear();            }//等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED            synchronized (outgoingQueue) {                for (Packet p : outgoingQueue) {                    conLossPacket(p);                }                outgoingQueue.clear();            }        }

?

private void conLossPacket(Packet p) {        if (p.replyHeader == null) {            return;        }        switch (state) {        case AUTH_FAILED:            p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());            break;//关闭的时候,是SESSIONEXPIRED异常码        case CLOSED:            p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());            break;//其他是CONNECTIONLOSS异常码        default:            p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());        }        finishPacket(p);    }

?EventThread关闭

public void run() {           try {              isRunning = true;              while (true) {                 Object event = waitingEvents.take();//kill signal                 if (event == eventOfDeath) {                    wasKilled = true;                 } else {                    processEvent(event);                 }                 if (wasKilled)//等所有通知都发完再退出                    synchronized (waitingEvents) {                       if (waitingEvents.isEmpty()) {                          isRunning = false;                          break;                       }                    }              }           } catch (InterruptedException e) {              LOG.error("Event thread exiting due to interruption", e);           }            LOG.info("EventThread shut down");        }

?之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,            Record response, AsyncCallback cb, String clientPath,            String serverPath, Object ctx, WatchRegistration watchRegistration)    {        Packet packet = null;        // Note that we do not generate the Xid for the packet yet. It is        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),        // where the packet is actually sent.        synchronized (outgoingQueue) {            packet = new Packet(h, r, request, response, watchRegistration);            packet.cb = cb;            packet.ctx = ctx;            packet.clientPath = clientPath;            packet.serverPath = serverPath;//此时状态为CLOSED            if (!state.isAlive() || closing) {                conLossPacket(packet);            } else {                // If the client is asking to close the session then                // mark as closing                if (h.getType() == OpCode.closeSession) {                    closing = true;                }                outgoingQueue.add(packet);            }        }        sendThread.getClientCnxnSocket().wakeupCnxn();        return packet;    }

?从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。

1 楼 panggezi 2013-04-09 这篇文章说的更全面,
http://www.ngdata.com/so-you-want-to-be-a-zookeeper/ 2 楼 iwinit 2013-04-09 panggezi 写道这篇文章说的更全面,
http://www.ngdata.com/so-you-want-to-be-a-zookeeper/
不错不错,学习学习

读书人网 >开源软件

热点推荐