用JMS手工实现私聊和公聊
?? 最近接触J2EE JMS,准备做个简单的聊天系统,其实都是在命令行下完成,没有用到Eclipce,全部用最原始的方式编代码、编译、部署、运行!其实我们最主要的是学会原理然后在逐步深入!这里用到服务器是jboss-5.0.0.GA,下面就让我们来动手吧!
??? 1) 初始化上下文
?配置JNDI的一个方法是通过属性文件jndi.properties! 这里会随着服务器的不同而不同!
我的jndi.properties配置文件内容如下:
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
java.naming.provider.url=localhost
?
你也可以在代码中实现,如下:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,
??????????????????????????? "org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
?
下面运行程序的时候,要把jndi.properties和class文件放在同一个目录下,不然找不到命名服务接口!
?
?2)在JBOSS下面创—estination ,包括我们要用到的queue和topic两种方式!我们可以写个Xxx-service.xml 内容如下:
?xml version="1.0" encoding="UTF-8"?>
<server>
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=queue1">
<attribute name="JNDIName">queue/queue1</attribute>
<depends
optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager
</depends>
</mbean>
?
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=queue2">
<attribute name="JNDIName">queue/queue2</attribute>
<depends
optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager
</depends>
</mbean>
?
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=queue3">
<attribute name="JNDIName">queue/queue3</attribute>
<depends
optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager
</depends>
</mbean>
?
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=mytopic">
<attribute name="JNDIName">topic/mytopic</attribute>
<depends
optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager
</depends>
</mbean>
</server>?
?
我们这里创建了三个queue队列(为了体现出私聊的效果) 和 一个topic/mytopic!其中粗黑体字为需要改动的地方!
?
把Xxx-service.xml 拷贝到 JBOSS服务器的deploy 下面,如:D:\jboss-5.0.0.GA\server\default\deploy 这样会引发队列的热部署,你可以通过http://localhost:8080/jmx-console 进入Jboss 管理台,查看刚才部署的队列或者主题,如果已近启动了服务器,也可以在Console 看到部署的信息!
3)针对该队列编写消息生产者,下面是MyMessageProducer.java代码:import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;import java.util.Scanner;public class MyMessageProducer { //消息连接池名称 protected String JMS_FACTORY="ConnectionFactory"; // 同步接收消息的点对点(PTP) protected QueueConnectionFactory qconFactory; protected QueueConnection qcon; protected QueueSession qsession; protected QueueSender qsender; protected QueueReceiver qreceiver; protected Queue queue; ////////////////////////////////////////////////////// public void init(String queueName){ //初始化JMS运行环境 try { if (qsession == null) { InitialContext ctx = new InitialContext(); System.out.println("-----------"+JMS_FACTORY); qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); qcon = qconFactory.createQueueConnection(); qcon.start(); qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = (Queue) ctx.lookup(queueName); System.out.println("-----------"+queueName) ; qsender=qsession.createSender(queue); } } catch (NamingException ne) { System.out.println("JNDI 查找失败,不能找到指定队列,错误原因:" + ne.toString()); ne.printStackTrace();System.exit(0); } catch (JMSException je) { System.out.println("JMS发生异常错误,错误原因:" + je.toString()); je.printStackTrace(); System.exit(0); } } void sendMessage(String msg) { try { TextMessage message = qsession.createTextMessage(); message.setText(msg); qsender.send(message); } catch (Exception e) { System.out.println("-------------Failed to send message"); } } public MyMessageProducer() { } public void close() throws JMSException {qsession.close();qcon.close();} //Main method public static void main(String[] args) { MyMessageProducer publisher = new MyMessageProducer(); Scanner in = new Scanner(System.in); System.out.println("请你输入要和那个队列通话:(queue/queue1、queue/queue2 or queue/queue3)"); String queueName = in.nextLine(); publisher.init(queueName); String msg; System.out.println("连接" + queueName +"成功"); System.out.println("请你输入要私聊的消息!(q or Q 退出):"); msg = in.nextLine(); while(!msg.equals("q") && !msg.equals("Q")){ publisher.sendMessage(msg); System.out.println("请你输入要私聊的消息!(q or Q 退出):"); msg = in.nextLine(); } System.out.println("你已成功退出了与" + queueName +"的会话"); try{ publisher.close();} catch (JMSException je) { System.out.println("JMS关闭异常错误,错误原因:" + je.toString()); je.printStackTrace(); System.exit(0); } } }??
4) 针对该队列编写消息接受者,下面是MyMessageProducer.java代码:import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;import java.util.Scanner;public class MessageReciever implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("Message content is:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } //消息连接池名称 protected String JMS_FACTORY="ConnectionFactory"; // 同步接收消息的点对点(PTP) protected QueueConnectionFactory qconFactory; protected QueueConnection qcon; protected QueueSession qsession; protected QueueReceiver qreceiver; protected Queue queue; ////////////////////////////////////////////////////// public void init(String queueName){ //初始化JMS运行环境 try { if (qsession == null) { InitialContext ctx = new InitialContext(); System.out.println("-----------"+JMS_FACTORY); qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); qcon = qconFactory.createQueueConnection(); qcon.start(); qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = (Queue) ctx.lookup(queueName); System.out.println("-----------"+queueName) ; qreceiver = qsession.createReceiver(queue); } } catch (NamingException ne) { System.out.println("JNDI 查找失败,不能找到指定队列,错误原因:" + ne.toString()); ne.printStackTrace(); System.exit(0); } catch (JMSException je) { System.out.println("JMS发生异常错误,错误原因:" + je.toString()); je.printStackTrace(); System.exit(0); } } void recieveMessage(MessageReciever msgRcvr) { try { qreceiver.setMessageListener(msgRcvr); qcon.start(); synchronized(msgRcvr){ msgRcvr.wait(1000000); } } catch (Exception e) { System.out.println("-------------Failed to recieve message"); } } public MessageReciever() { } //Main method public static void main(String[] args) { MessageReciever reciever = new MessageReciever(); Scanner in = new Scanner(System.in); System.out.println("请你输入要接受私聊消息的队列:(queue/queue1、queue/queue2 or queue/queue3)"); String queueName = in.nextLine(); reciever.init(queueName); reciever. recieveMessage(reciever); } }?
5)编译MyMessageProducter.java?
javac -cp D:/jboss-5.0.0.GA/client/jboss-javaee.jar MessageProducter.java
?
注意这里要用到JBOSS client 下面的jar包,所以编译的时候要制定classpath!
?
?
6)编译MyMessageReceiver.java?
javac -cp D:/jboss-5.0.0.GA/client/jboss-javaee.jar MessageReciever.java
?
7)在命令终端运行应用程序!当然首先要开启服务器!假定你已经把jndi.properties 和 编译好的class文件放在一起!
?
首先你要运行消息生产者,要指定用到的queue 队列! 如下:
java -cp D:/jboss-5.0.0.GA/client/jbossall-client.jar;D:/jboss-5.0.0.GA/client/jboss-javaee.jar;D:/jboss-5.0.0.GA/client/jnp-client.jar;D:/jboss-5.0.0.GA/client/jboss-logging-log4j.jar;./ MessageProducter
?
然后要指定用到的queue 队列!如: queue/queue1
?
这里要用到服务器client目录下面的JAR包,所以特别不方便,你可以打成JAR包,不过好像要用到全部的client文件!
这样也好,手工部署和运行让我们多了解一些程序究竟是怎样运行的!不然总是依赖工具,让我们一遇到问题,就不知所措!呵呵!
?
再次运行消息接受者,同样要指定用到的queue 队列!
java -cp D:/jboss-5.0.0.GA/client/jbossall-client.jar;D:/jboss-5.0.0.GA/client/jboss-javaee.jar;D:/jboss-5.0.0.GA/client/jnp-client.jar;./ MessageReciever
?
这只是私聊,即PTP,指定队列后,发送到队列,只有一个消息消费者!
至于公聊,即Pub/Sub, 用到主题(topic),订阅者订阅消息,才可以查看发布者发布的消息!
实现代码有MyMessagePublisher.java 、MessageSubscriber1.java 、MessageSubscriber2.java
已上传!
?
大家有什么问题或者好的建议,希望留言!谢谢!
1 楼 luven 2009-08-21 大哥 : tomcat 和 openjms 怎么整合? 2 楼 godson_2003 2010-07-24 这样的私聊实际中是不可行的吧如果10个人 100个人私聊 那得多少个queue?