读书人

Jetty 七 Continuation 总结

发布时间: 2012-11-01 11:11:32 作者: rapoo

Jetty 7 Continuation 总结
最近开始研究jetty 7。已经出到稳定版了。我相信大多JETTY的爱好者已经看过了。
这里呢。对jetty 7的continuation总结一下。

为了做一个server long push的WEB应用。我选择了jetty。对于Jetty,我只能说是一个新手,在网上搜资料的时候,发现相关资料少个可怜,中文的资料都是一个抄一个,或者就是翻译的。对于jetty我走了弯路。迫使自己看是看jetty的源文件。

大家肯定都知道jetty的continuation是建立的NIO技术的基础上。使WEB 服务器对大量的HTTP申请做阻塞不必开启过多的线程。避免了浪费。而jetty是将封装好的servlet加入一个等待队列。之后做轮询。哪个满足条件。就调用用户的方法去操作response。

刚开始作为一个第一次用jetty的人,我把suspend/complete和suspend/resunme产生了一个错误的理解。我以为这个方法只是阻塞申请。于是我建立了一个方法。里面做了一个循环去判断某个值,如果存在就反馈并complete。而最后发现jetty自带了遍历的功能。不用你去写循环。
正确的写法如下:

//判断applition中是否有msg有的话反馈,没有的话就阻塞。protected void processRequest(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {   //得到applition对象   ServletContext applition = getServletConfig().getServletContext();   //获取continuation    Continuation continuation = ContinuationSupport.getContinuation(request);   //设定超时时间、可以不设置。默认为30秒   continuation.setTimeout(0);   //阻塞   continuation.suspend();   //如果applition中有msg这个字符。开放阻塞   if(applition.getAttribute("msg").toString() != null){      continuation.complete();      response.getServletResponse().getWriter().print(             applition.getAttribute("msg").toString()     );   }}

其他你都不用管。服务器会做循环来调用你的serlvet。然后调用你的判断方法做处理。
要注意的一点是:
我做了一个小小的测试。我挂起了20个长连接去判断自己需求的信息,但是呢。我第一步就给与第9个长连接做判断的对象赋值,按理论上来说,它应该立即反馈的。
错!
它不会反馈给你。因为服务器端对每个客户端挂起的长连接做了限制。代码如下:
package org.eclipse.jetty.continuation;import java.util.ArrayList;import javax.servlet.ServletRequest;import javax.servlet.ServletResponse;import javax.servlet.ServletResponseWrapper;class FauxContinuation  implements ContinuationFilter.FilteredContinuation{  private static final ContinuationThrowable __exception = new ContinuationThrowable();  //这里就是限制你为什么不能一台机器申请挂起多个长连接的原因。  private static final int __HANDLING = 1;  private static final int __SUSPENDING = 2;  private static final int __RESUMING = 3;  private static final int __COMPLETING = 4;  private static final int __SUSPENDED = 5;  private static final int __UNSUSPENDING = 6;  private static final int __COMPLETE = 7;  private final ServletRequest _request;  private ServletResponse _response;  private int _state = 1;  private boolean _initial = true;  private boolean _resumed = false;  private boolean _timeout = false;  private boolean _responseWrapped = false;  private long _timeoutMs = 30000L;  private ArrayList<ContinuationListener> _listeners;  FauxContinuation(ServletRequest request)  {    this._request = request;  }  public void onComplete()  {    if (this._listeners != null)      for (ContinuationListener l : this._listeners)        l.onComplete(this);  }  public void onTimeout()  {    if (this._listeners != null)      for (ContinuationListener l : this._listeners)        l.onTimeout(this);  }  public boolean isResponseWrapped()  {    return this._responseWrapped;  }  public boolean isInitial()  {    synchronized (this)    {      return this._initial;    }  }  public boolean isResumed()  {    synchronized (this)    {      return this._resumed;    }  }  public boolean isSuspended()  {    synchronized (this)    {      switch (this._state)      {      case 1:        return false;      case 2:      case 3:      case 4:      case 5:        return true;      case 6:      }      return false;    }  }  public boolean isExpired()  {    synchronized (this)    {      return this._timeout;    }  }  public void setTimeout(long timeoutMs)  {    this._timeoutMs = timeoutMs;  }  public void suspend(ServletResponse response)  {    this._response = response;    this._responseWrapped = response instanceof ServletResponseWrapper;    suspend();  }  public void suspend()  {    synchronized (this)    {      switch (this._state)      {      case 1:        this._timeout = false;        this._resumed = false;        this._state = 2;        return;      case 2:      case 3:        return;      case 4:      case 5:      case 6:        throw new IllegalStateException(getStatusString());      }      throw new IllegalStateException("" + this._state);    }  }  public void resume()  {    synchronized (this)    {      switch (this._state)      {      case 1:        this._resumed = true;        return;      case 2:        this._resumed = true;        this._state = 3;        return;      case 3:      case 4:        return;      case 5:        fauxResume();        this._resumed = true;        this._state = 6;        break;      case 6:        this._resumed = true;        return;      default:        throw new IllegalStateException(getStatusString());      }    }  }  public void complete()  {    synchronized (this)    {      switch (this._state)      {      case 1:        throw new IllegalStateException(getStatusString());      case 2:        this._state = 4;        break;      case 3:        break;      case 4:        return;      case 5:        this._state = 4;        fauxResume();        break;      case 6:        return;      default:        throw new IllegalStateException(getStatusString());      }    }  }  public boolean enter(ServletResponse response)  {    this._response = response;    return true;  }  public ServletResponse getServletResponse()  {    return this._response;  }  void handling()  {    synchronized (this)    {      this._responseWrapped = false;      switch (this._state)      {      case 1:        throw new IllegalStateException(getStatusString());      case 2:      case 3:        throw new IllegalStateException(getStatusString());      case 4:        return;      case 5:        fauxResume();      case 6:        this._state = 1;        return;      }      throw new IllegalStateException("" + this._state);    }  }  public boolean exit()  {    synchronized (this)    {      switch (this._state)      {      case 1:        this._state = 7;        onComplete();        return true;      case 2:        this._initial = false;        this._state = 5;        fauxSuspend();        if ((this._state == 5) || (this._state == 4))        {          onComplete();          return true;        }        this._initial = false;        this._state = 1;        return false;      case 3:        this._initial = false;        this._state = 1;        return false;      case 4:        this._initial = false;        this._state = 7;        onComplete();        return true;      case 5:      case 6:      }      throw new IllegalStateException(getStatusString());    }  }  protected void expire()  {    synchronized (this)    {      this._timeout = true;    }    onTimeout();    synchronized (this)    {      switch (this._state)      {      case 1:        return;      case 2:        this._timeout = true;        this._state = 3;        fauxResume();        return;      case 3:        return;      case 4:        return;      case 5:        this._timeout = true;        this._state = 6;        break;      case 6:        this._timeout = true;        return;      default:        throw new IllegalStateException(getStatusString());      }    }  }  private void fauxSuspend()  {    long expire_at = System.currentTimeMillis() + this._timeoutMs;    long wait = this._timeoutMs;    while ((this._timeoutMs > 0L) && (wait > 0L))    {      try      {        super.wait(wait);      }      catch (InterruptedException e)      {        break label51:      }      wait = expire_at - System.currentTimeMillis();    }    if ((this._timeoutMs > 0L) && (wait <= 0L))      label51: expire();  }  private void fauxResume()  {    this._timeoutMs = 0L;    super.notifyAll();  }  public String toString()  {    return getStatusString();  }  String getStatusString()  {    synchronized (this)    {      return ((this._state == 4) ? "COMPLETING" : (this._state == 6) ? "UNSUSPENDING" : (this._state == 3) ? "RESUMING" : (this._state == 5) ? "SUSPENDED" : (this._state == 2) ? "SUSPENDING" : (this._state == 1) ? "HANDLING" : new StringBuilder().append("???").append(this._state).toString()) + ((this._initial) ? ",initial" : "") + ((this._resumed) ? ",resumed" : "") + ((this._timeout) ? ",timeout" : "");    }  }  public void addContinuationListener(ContinuationListener listener)  {    if (this._listeners == null)      this._listeners = new ArrayList();    this._listeners.add(listener);  }  public Object getAttribute(String name)  {    return this._request.getAttribute(name);  }  public void removeAttribute(String name)  {    this._request.removeAttribute(name);  }  public void setAttribute(String name, Object attribute)  {    this._request.setAttribute(name, attribute);  }  public void undispatch()  {    if (isSuspended())    {      if (ContinuationFilter.__debug)        throw new ContinuationThrowable();      throw __exception;    }    throw new IllegalStateException("!suspended");  }}

所以遇到这个问题的朋友不必担心。我使用多台机器测试过。服务器的servlet轮询队列是很长的。但对于单独用户来说只能接受6个suspend.在发送长连接挂起请求。他们加入到你所处的用户等待列表。当你挂起的6个连接中有一个被释放了。你的第7个连接将被挂起。

之前用了webbench做压测。

不管你怎么测非阻塞技术的长连接。你都会被jetty拒之门外。。。

就是因为这个做了限制。这样的限制大大降低对服务器的压力。
1 楼 jamesqiu 2009-12-19 很好的技术原创文章,很有含量,谢谢。 2 楼 beingchou 2009-12-20 呵呵、谢谢支持。 3 楼 javagui 2009-12-21 我也刚接触jetty,是做邮件推送业务。
一般suspend之后要接return,suspend是不会阻塞住当前线程的,是把请求抛回给了容器的过滤链。当超时或resume,请求会从过滤链里恢复,从新执行servlet的service,因此你的doget、dopost一开始需要判断request的一个特殊的属性,request.getAttibute("特殊属性")如果不满足条件可以这个属性应该是空或你定义一个特殊的值,当条件满足resume前要request.setAttibute("特殊属性","满足条件的值")。
eclipse的官网上的教程挺详细的呀。 4 楼 xingqiliudehuanghun 2009-12-22 问楼主一个很菜的问题,jetty是一个开源servlet jsp容器,他的一些特性可能比较有用
但如果基于这个特性做开发,部署的时候服务器是非jetty的,那是不是就不能运行了?
大家现在都用jetty做什么,jsp容器,testcase? 5 楼 javagui 2009-12-22 jetty用作嵌入式服务器是最大的亮点。 6 楼 beingchou 2009-12-22 谢谢。。我明白了。。呵呵。原来我之前理解错了。 7 楼 javagui 2009-12-23 http://wiki.eclipse.org/Jetty/Feature/Continuations

读书人网 >软件架构设计

热点推荐