Comet技术在项目中的使用
Comet技术在项目中的使用?Comet是一种服务器端推的技术,所谓服务器端推也就是当有事件要通知给某个用户的时候,是由服务器端直接发送到用户的浏览器。
服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology
有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?
比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。
jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。
当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。
Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。
jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。
下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。
在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),
我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。
这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。
换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。
DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。
环境配置方法:
服务器端:
??? 类库清单:WEB-INF/lib
??? ??? jetty-6.1.9.jar
??? ??? jetty-util-6.1.9.jar
??? ??? servlet-api-2.5-6.1.9.jar
??? ??? (以上Jetty服务器自带)
??? ??? cometd-api-0.9.20080221.jar
??? ??? cometd-bayeux-6.1.9.jar
??? web.xml配置:
??? ?
????<!--??配置ContinuationCometdServlet,?这个是必须的。配置后就可以支持comted??-->?
???<?servlet?>?
?????<?servlet-name?>?cometd?</?servlet-name?>?
?????<?servlet-class?>?org.mortbay.cometd.continuation.ContinuationCometdServlet?</?servlet-class?>?
?????<!--??对队列的内容进行过滤??-->?
?????<?init-param?>?
???????<?param-name?>?filters?</?param-name?>?
???????<?param-value?>?/WEB-INF/filters.json?</?param-value?>?
?????</?init-param?>?
?????<!--??超时设置The?server?side?poll?timeout?in?milliseconds?(default?250000).?This?is?how?long?the?server?will?
hold?a?reconnect?request?before?responding.??-->?
?????<?init-param?>?
???????<?param-name?>?timeout?</?param-name?>?
???????<?param-value?>?120000?</?param-value?>?
?????</?init-param?>?
?????<!--??The?client?side?poll?timeout?in?milliseconds?(default?0).?How?long?a?client?will?wait?between?
reconnects??-->?
?????<?init-param?>?
???????<?param-name?>?interval?</?param-name?>?
???????<?param-value?>?0?</?param-value?>?
?????</?init-param?>?
?????<!--??the?client?side?poll?timeout?if?multiple?connections?are?detected?from?the?same?browser?
(default?1500).??-->?
?????<?init-param?>?
???????<?param-name?>?multiFrameInterval?</?param-name?>?
???????<?param-value?>?1500?</?param-value?>?
?????</?init-param?>?
?????<!--??0=none,?1=info,?2=debug??-->?
?????<?init-param?>?
???????<?param-name?>?logLevel?</?param-name?>?
???????<?param-value?>?0?</?param-value?>?
?????</?init-param?>?
?????<!--?If?"true"?then?the?server?will?accept?JSON?wrapped?in?a?comment?and?will?generate?JSON?wrapped?
in?a?comment.?This?is?a?defence?against?Ajax?Hijacking.??-->?
?????<?init-param?>?
???????<?param-name?>?JSONCommented?</?param-name?>?
???????<?param-value?>?true?</?param-value?>?
?????</?init-param?>?
?????<?init-param?>?
???????<?param-name?>?alwaysResumePoll?</?param-name?>?
???????<?param-value?>?false?</?param-value?>???<!--??use?true?for?x-site?cometd??-->?
?????</?init-param?>?
?????<?load-on-startup?>?1?</?load-on-startup?>?
???</?servlet?>?
???<?servlet-mapping?>?
?????<?servlet-name?>?cometd?</?servlet-name?>?
?????<?url-pattern?>?/cometd/*?</?url-pattern?>?
???</?servlet-mapping?>???
filters.json内容如下:
格式如下:
??? {
??? ??? "channels": "/**", --要过滤的队列(支持通配符)
??? ??? "filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter
??? ??? "init"??? : {} --初始化的值,调用 DataFilter.init方法传入
??? }
示例内容如下:
[
??{
?????"?channels?"?:??"?/**?"?,?
?????"?filter?"???:??"?org.mortbay.cometd.filter.NoMarkupFilter?"?,?
?????"?init?"?????:?{}
??}?,?
??{
?????"?channels?"?:??"?/chat/*?"?,?
?????"?filter?"????:??"?org.mortbay.cometd.filter.RegexFilter?"?,?
?????"?init?"?????:??[
???????????????????[??"[fF?]?.ck?"?,?"?dang?"??],
??????????????????[??"?teh??"?,?"?the??"?]
????????????????]
??},
?
??{
?????"?channels?"?:??"?/chat/**?"?,
?????"?filter?"????:??"?org.mortbay.cometd.filter.RegexFilter?"?,
?????"?init?"?????:?[
??????????????????[??"?[?Mm?]?icrosoft?"?,??"?Micro\\$oft?"??],
??????????????????[??"?.*tomcat.*?"?,?null?]
????????????????]
??}
]
这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。
客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。
代码开发:
接下来,我们要准备客户端(使用dojo来实现)
一共三个文件
index.html
chat.js
chat.css(不是必须)
下面来看一下这两个文件的内容(加入注释)
index.html
<?html?>?
<?head?>?
?????<?title?>?Cometd?chat?</?title?>?
?????<?script??type?="text/javascript"??src?="../dojo/dojo/dojo.js"?></?script?>?<!--??dojo类库??-->?
?????<?script??type?="text/javascript"??src?="../dojo/dojox/cometd.js.uncompressed.js"?></?script?>?<!--?dojo-cometd类库??-->?
?????<?script??type?="text/javascript"??src?="chat.js"?></?script?>?<!--??chat?js文件,控制cometd的连接,消息的发送与接收??-->?
?????<?link??rel?="stylesheet"??type?="text/css"??href?="chat.css"?>?
</?head?>?
<?body?>?
<?h1?>?Cometd?Chat?</?h1?>?
<?div??id?="chatroom"?>?
??<?div??id?="chat"?></?div?>?
??<?div??id?="input"?>?
????<?div??id?="join"???>?<!--??未登录时,显示的登录名和登录按钮??-->?
?????Username:? ?<?input??id?="username"??type?="text"?/>
<?input??id?="joinB"??class?="button"??type?="submit"??name?="join"??value?="Join"?/>?
????</?div?>?
????<?div??id?="joined"??class?="hidden"?>?<!--??登录后,显示的消息框和发送,退出按钮(默认为隐藏)?-->?
?????Chat:? ?<?input??id?="phrase"??type?="text"?></?input?>?
??????<?input??id?="sendB"??class?="button"??type?="submit"??name?="join"??value?="Send"?/>?
??????<?input??id?="leaveB"??class?="button"??type?="submit"??name?="join"??value?="Leave"?/>?
????</?div?>?
???</?div?>?
??</?div?>?
</?body?>
chat.js文件
??1??//?引入所需要的类?
??2??dojo.require(?"?dojox.cometd?"?);
??3??dojo.require(?"?dojox.cometd.timestamp?"?);
??4??
??5??//?定义一个room类?
??6??var??room??=??{
??7???????//?定义属性?
??8??????_last:??""?,??//?最后发送消息的人员(如果不是本人,则显示为空)??
??9??????_username:??null?,??//?当前的用户名?
?10??????_connected:??true?,??//?当前的连接状态?true已经连接,?false表示未连接?
?11??????groupName:??"?whimsical?"?,??//?组名(未知)?
?12??
?13???????//?登录操作?
?14??????join:??function?(name){
?15??
?16???????????if?(name??==???null???||??name.length?==?0??){
?17??????????????alert('Please?enter?a?username?!?');
?18??????????}?else?{
?19??
?20??????????????dojox.cometd.init(
new??String(document.location).replace(?/?http:\?/?\?/?[?^?\?/?]?*/?,'').replace(?/?\?/?examples\?/?.?*?$?/?,'')?+?"/cometd?"?);
?21???????????????//??dojox.cometd.init("http://127.0.0.2:8080/cometd");?
?22???????????????this?._connected??=???true?;
?23??
?24???????????????this?._username??=??name;
?25??????????????dojo.byId('join').className?=?'hidden';
?26??????????????dojo.byId('joined').className?=?'';
?27??????????????dojo.byId('phrase').focus();
?28??
?29???????????????//??subscribe?and?join?
?30??????????????dojox.cometd.startBatch();
?31??????????????dojox.cometd.subscribe(?"?/chat/demo?"?,?room,??"?_chat?"?,?{?groupName:??this.groupName});
?32??????????????dojox.cometd.publish(?"?/chat/demo?"?,?{?
?33??????????????????user:?room._username,
?34??????????????????join:??true?,
?35??????????????????chat?:?room._username?+?"??has?joined?"?
?36??????????????},?{?groupName:??this?.groupName?});
?37??????????????dojox.cometd.endBatch();
?38??
?39???????????????//??handle?cometd?failures?while?in?the?room?
?40??????????????room._meta??=??dojo.subscribe(?"?/cometd/meta?"?,??this?,??function?(event){
?41??????????????????console.debug(event);???
?42???????????????????if?(event.action??==???"?handshake?"?){
?43??????????????????????room._chat({?data:?{
?44??????????????????????????join:??true?,
?45??????????????????????????user:?"?SERVER?"?,
?46??????????????????????????chat:?"?reinitialized?"?
?47??????????????????????}?});
?48??????????????????????dojox.cometd.subscribe(?"?/chat/demo?"?,?room,??"?_chat?"?,?{?groupName:??this.groupName?});
?49??????????????????}?else???if?(event.action??==???"?connect?"?){
?50???????????????????????if?(event.successful??&&???!?this?._connected){
?51??????????????????????????room._chat({?data:?{
?52??????????????????????????????leave:??true?,
?53??????????????????????????????user:??"?SERVER?"?,
?54??????????????????????????????chat:??"?reconnected!?"?
?55??????????????????????????}?});
?56??????????????????????}
?57???????????????????????if?(?!?event.successful??&&???this?._connected){
?58??????????????????????????room._chat({?data:?{
?59??????????????????????????????leave:??true?,
?60??????????????????????????????user:??"?SERVER?"?,
?61??????????????????????????????chat:??"?disconnected!?"?
?62??????????????????????????}?});
?63??????????????????????}
?64???????????????????????this?._connected??=??event.successful;
?65??????????????????}
?66??????????????},?{groupName:??this?.groupName?});
?67??????????}
?68??????},
?69??
?70???????//?离开操作?
?71??????leave:??function?(){
?72???????????if?(?!?room._username){
?73???????????????return?;
?74??????????}
?75??
?76???????????if?(room._meta){
?77??????????????dojo.unsubscribe(room._meta,??null?,??null?,?{?groupName:??this?.groupName?});
?78??????????}
?79??????????room._meta?=?null?;
?80??
?81??????????dojox.cometd.startBatch();
?82??????????dojox.cometd.unsubscribe(?"?/chat/demo?"?,?room,??"?_chat?"?,?{?groupName:??this.groupName?});
?83??????????dojox.cometd.publish(?"?/chat/demo?"?,?{?
?84??????????????user:?room._username,
?85??????????????leave:??true?,
?86??????????????chat?:?room._username?+?"??has?left?"?
?87??????????},?{?groupName:??this?.groupName?});
?88??????????dojox.cometd.endBatch();
?89??
?90???????????//??switch?the?input?form?
?91??????????dojo.byId('join').className?=?'';
?92??????????dojo.byId('joined').className?=?'hidden';
?93??????????dojo.byId('username').focus();
?94??????????room._username??=???null?;
?95??????????dojox.cometd.disconnect();
?96??????},
?97??
?98???????//?发送消息?
?99??????chat:??function?(text){
100???????????if?(?!?text??||???!?text.length){
101???????????????return???false?;
102??????????}
103??????????dojox.cometd.publish(?"?/chat/demo?"?,?{?user:?room._username,?chat:?text},?{?groupName:?this?.groupName?});
104??????},
105??
106???????//?从服务器收到消息后,回调的方法?
107??????_chat:??function?(message){
108???????????var??chat?=?dojo.byId('chat');
109???????????if?(?!?message.data){
110??????????????console.debug(?"?bad?message?format??"?+?message);
111???????????????return?;
112??????????}
113???????????var??from?=?message.data.user;
114???????????var??special?=?message.data.join??||??message.data.leave;
115???????????var??text?=?message.data.chat;
116???????????if?(?!?text){??return?;?}
117??
118???????????if?(??!?special??&&??from??==??room._last?){
119??????????????from?=?"?
?"?;
120??????????}?else?{
121??????????????room._last?=?from;
122??????????????from?+=?"?:?"?;
123??????????}
124??
125???????????if?(special){
126??????????????chat.innerHTML??+=???"?<span?class=\?"?alert\?"?><span?class=\?"?from\?"?>?"?+?from?+?"
</span><span?class=\?"?text\?"?>?"?+?text?+?"?</span></span><br/>?"?;
127??????????????room._last?=?""?;
128??????????}?else?{
129??????????????chat.innerHTML??+=???"?<span?class=\?"?from\?"?>?"?+?from?+?"? </span><span?class=\?"?text\?"?>?"?+?text?+?"?</span><br/>?"?;
130??????????}?
131??????????chat.scrollTop??=??chat.scrollHeight??-??chat.clientHeight;????
132??????},
133??????
134???????//?初始操作?
135??????_init:??function?(){
136??????????dojo.byId('join').className?=?'';
137??????????dojo.byId('joined').className?=?'hidden';
138??????????dojo.byId('username').focus();
139??
140???????????var??element?=?dojo.byId('username');
141??????????element.setAttribute(?"?autocomplete?"?,?"?OFF?"?);?
142??????????dojo.connect(element,??"?onkeyup?"?,??function?(e){??//?支持回车,登录???
143???????????????if?(e.keyCode??==??dojo.keys.ENTER){
144??????????????????room.join(dojo.byId('username').value);
145???????????????????return???false?;
146??????????????}
147???????????????return???true?;
148??????????});
149??
150??????????dojo.connect(dojo.byId('joinB'),??"?onclick?"?,??function?(e){??//?绑定?room.join方法到?Join按扭
151??????????????room.join(dojo.byId('username').value);
152??????????????e.preventDefault();
153??????????});
154??
155??????????element?=?dojo.byId('phrase');?//?取得消息框?
156??????????element.setAttribute(?"?autocomplete?"?,?"?OFF?"?);
157??????????dojo.connect(element,??"?onkeyup?"?,??function?(e){??//?支持回车发送消息??
158???????????????if?(e.keyCode??==??dojo.keys.ENTER){
159??????????????????room.chat(dojo.byId('phrase').value);
160??????????????????dojo.byId('phrase').value?=?'';
161??????????????????e.preventDefault();
162??????????????}
163??????????});
164??
165??????dojo.connect(dojo.byId('sendB'),??"?onclick?"?,??function?(e){???//?绑定?room.chat方法到?sendB按扭??
166??????????????room.chat(dojo.byId('phrase').value);
167??????????????dojo.byId('phrase').value?=?'';
168??????});
169??????????dojo.connect(dojo.byId('leaveB'),??"?onclick?"?,?room,??"?leave?"?);??//?绑定?room.leave方法到?leaveB按扭??
170??????}?
171??};
172??
173??//?页面装载时,调用room._init方法?
174??dojo.addOnLoad(room,??"?_init?"?);
175??//?页面关闭时,调用?room.leave方法?
176??dojo.addOnUnload(room,?"?leave?"?);
177??
178??//?vim:ts=4:noet:
补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作
要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例
可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入
实现方法如下:
?1??public???class??BayeuxStartupListener??implements??ServletContextAttributeListener
?2??{
?3???????public???void??initialize(Bayeux?bayeux)
?4??????{
?5???????????synchronized?(bayeux)
?6??????????{
?7???????????????if??(?!?bayeux.hasChannel(?"?/service/echo?"?))
?8??????????????{
?9???????????????????????????????????//?取得?bayeux实例????????????????
10??????????????}
11??????????}
12??????}
13??????
14???????public???void??attributeAdded(ServletContextAttributeEvent?scab)
15??????{
16???????????if??(scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))
17??????????{
18??????????????Bayeux?bayeux?=?(Bayeux)?scab.getValue();
19??????????????initialize(bayeux);
20??????????}
21??????}
22??
23???????public???void??attributeRemoved(ServletContextAttributeEvent?scab)
24??????{
25??
26??????}
27??
28???????public???void??attributeReplaced(ServletContextAttributeEvent?scab)
29??????{
30??
31??????}
32??}
取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息
?1???????public???void??initialize(Bayeux?bayeux)
?2??????{
?3???????????synchronized?(bayeux)
?4??????????{
?5???????????????if??(?!?bayeux.hasChannel(?"?/service/echo?"?))
?6??????????????{
?7???????????????????????????????????//?取得?bayeux实例???
?8???????????????????????????????????new??ChatService(bayeux);?????????????
?9??????????????}
10??????????}
11??????}
具体方法请看下面这段代码:
?1??//?定义?ChatService类,继承?BayeuxService?
?2??public???static???class??ChatService??extends??BayeuxService?{
?3??
?4??????ConcurrentMap?<?String,Set?<?String?>>??_members??=???new??ConcurrentHashMap?<String,Set?<?String?>>?();
?5??????
?6???????public??ChatService(Bayeux?bayeux)
?7??????{
?8???????????super?(bayeux,??"?chat?"?);?//?必须,把?Bayeux传入到?BayeuxService对象中?
?9??????????subscribe(?"?/chat/**?"?,??"?trackMembers?"?);??//?订阅队列,收到消息后,会回调trackMembers方法?
10???????????/*?
11??????????????subscribe支持回调的方法如下:
12??????????????#?myMethod(Client?fromClient,?Object?data)
13??????????????????????#?myMethod(Client?fromClient,?Object?data,?String?id)
14??????????????????????#?myMethod(Client?fromClient,?String?channel,?Object?data,String?id)
15??????????????#?myMethod(Client?fromClient,?Message?message)
16??????????????
17??????????????????参数:
18??????????????????????Client?fromClient?发送消息的客户端
19??????????????????????Object?data?消息内容
20??????????????????????id?The?id?of?the?message?
21??????????????????????channel?队列名称
22??????????????????????Message?message?消息对象。继承于Map
23??????????
24???????????*/?
25??????}
26??????
27???????//?发布消息到队列?
28???????public???void??sendMessage(String?message)?{
29??????????????Map?<?String,Object?>??mydata??=???new??HashMap?<?String,?Object?>?();
30??????????????mydata.put(?"?chat?"?,?message);
31??????????????
32??????????????Client?sender??=??getBayeux().newClient(?"?server?"?);
33??????????????
34??????????????getBayeux().getChannel(?"?/chat/demo?"?,??false?).publish(sender,?mydata,??"?0?"?/*?null?*/?);
35??
36??????}
37??????
38???????//?发送消息给指定的client(非广播方式)?
39???????public???void??sendMessageToClient(Client?joiner,?String?message)?{
40??????????????Map?<?String,Object?>??mydata??=???new??HashMap?<?String,?Object?>?();
41??????????????mydata.put(?"?chat?"?,?message);
42?????????????
43??????????????send(joiner,??"?/chat/demo?"?,?mydata,??"?0?"?/*?null?*/?);
44??????}????
45??????
46???????//?订阅消息回调方法?
47???????public???void??trackMembers(Client?joiner,?String?channel,?Map?<?String,Object?>?data,?String?id)
48??????{
49???????????????//?解释消息内容,如果消息内容中?有?join这个字段且值为true?
50???????????if??(Boolean.TRUE.equals(data.get(?"?join?"?)))
51??????????{
52???????????????????//?根据队列,取得当前登录的人员?
53??????????????Set?<?String?>??m??=??_members.get(channel);
54???????????????if??(m?==?null?)
55??????????????{
56???????????????????????//?如果为空,则创建一个新的Set实现?
57??????????????????Set?<?String?>??new_list?=?new??CopyOnWriteArraySet?<?String?>?();
58??????????????????m?=?_members.putIfAbsent(channel,new_list);
59???????????????????if??(m?==?null?)
60??????????????????????m?=?new_list;
61??????????????}
62??????????????
63???????????????final??Set?<?String?>??members?=?m;
64???????????????final??String?username?=?(String)data.get(?"?user?"?);
65??????????????
66??????????????members.add(username);
67???????????????????????????//?为该client增加事件,Remove事件。当用户退出时,触发该方法。?????????????
68??????????????joiner.addListener(?new??RemoveListener(){
69???????????????????public???void??removed(String?clientId,??boolean??timeout)
70??????????????????{
71??????????????????????members.remove(username);
72??????????????????}
73??????????????});
74??
75???????????????????????????//?为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。?
76??????????????joiner.addListener(?new??MessageListener()?{
77???????????????????????????????????public???void??deliver(Client?fromClient,?Client?toClient,?Message?message)?{
78??????????????????????????????????????System.out.println(?"?message?from??"???+??fromClient.getId()??+???"??to??"?
79???????????????????????????????????????????????+??toClient.getId()??+???"??message?is??"???+??message.getData());
80??????????????????????????????????}??????
81??????????????});
82??
83??????????????Map?<?String,Object?>??mydata??=???new??HashMap?<?String,?Object?>?();
84??????????????mydata.put(?"?chat?"?,??"?members=?"???+??members);
85???????????????//?把已经登录的人员信息列表,发送回给消息发送者?
86??????????????send(joiner,channel,mydata,id);
87???????????
88??????????}
89??????}
90??}
91
服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology
jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。
当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。
jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。
下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。
在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),
我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。
这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。
DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。
服务器端:
??? 类库清单:WEB-INF/lib
??? ??? jetty-6.1.9.jar
??? ??? jetty-util-6.1.9.jar
??? ??? servlet-api-2.5-6.1.9.jar
??? ??? (以上Jetty服务器自带)
??? ??? cometd-api-0.9.20080221.jar
??? ??? cometd-bayeux-6.1.9.jar
??? web.xml配置:
??? ?
filters.json内容如下:
格式如下:
??? {
??? ??? "channels": "/**", --要过滤的队列(支持通配符)
??? ??? "filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter
??? ??? "init"??? : {} --初始化的值,调用 DataFilter.init方法传入
??? }
示例内容如下:
这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。
客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。
代码开发:
接下来,我们要准备客户端(使用dojo来实现)
一共三个文件
index.html
chat.js
chat.css(不是必须)
下面来看一下这两个文件的内容(加入注释)
index.html
chat.js文件
补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作
要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例
可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入
实现方法如下:
取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息
具体方法请看下面这段代码:
?"?;