JMS与MQ的几种结合方式
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/guozhanxian/archive/2007/01/21/1489309.aspx
?
配置JNDI
用JMS实现消息的发送和接收时,经常会用到JNDI。因为JNDI这种方式比较灵活,对于编程也比较简单。
在安装了MQSeries Client for
Java之后,在\java\bin目录下找到JMSAdmin.config文件。该文件主要用来说明Context的存储方式及存储地址,对应于文件中的两个参数INITIAL_CONTEXT_FACTORY和PROVIDER_URL。典型的JMSAdmin.config文件内容如下:
#INITIAL_CONTEXT_FACTORY=com.sun.jndi.ldap.LdapCtxFactory
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.ejs.ns.jndi.CNInitialContextFactory
#
#PROVIDER_URL=ldap://polaris/o=ibm,c=us
PROVIDER_URL=file:/d:/temp
#PROVIDER_URL=iiop://localhost/
#
SECURITY_AUTHENTICATION=none
INITIAL_CONTEXT_FACTORY表示JMSAdmin
Tool使用的服务提供商。当前有三种受支持的值。com.sun.jndi.ldap.LdapCtxFactory用于LDAP,如果使用它就必须安装一个LDAP服务器。com.sun.jndi.fscontext.RefFSContextFactory用于文件系统上下文,它只需要使用者提供存放上下文的文件路径。com.ibm.ejs.ns.jndi.CNInitialContextFactory是专门为websphere提供的,它需要和websphere的CosNaming资源库一起使用。
PROVIDER_URL表示会话初始上下文的URL,由JMSAdmin
tool实现的所有JNDI操作的根。它和INITIAL_CONTEXT_FACTORY一一对应。
ldap://hostname/contextname 用于LDAP
file:[drive:]/pathname 用于文件系统上下文
iiop://hostname[:port]/[?TargetContext=ctx] 用于访问websphere
CosNaming名称空间
最后还有一个参数SECURITY_AUTHENTICATION,用于说明JNDI是否把安全性凭证传递给了您使用的服务供应商。只有当使用了LDAP服务供应商时,才使用此参数。此参数有三个值,none(匿名认证)、simple(简单认证)和CRAM-MD5认证机制。如果没有提供有效值,缺省值为none。
确认配置文件之后,可以在\java\bin目录下启动JMSAdmin控制台。也可以在任何目录下用下面的命令来启动控制台:
JMSAdmin -cfg MQ_JAVA_INSTALL_PATH\java\bin\JMSAdmin.config
其中MQ_JAVA_INSTALL_PATH为MQSeries Client for Java安装的根目录。
若启动失败,则好好检查一下您的环境变量是否设置正确。根据我个人的经验,除了把com.ibm.mq.jar和com.ibm.mqjms.jar加入到环境变量外,还要把fscontext.jar和providerutil.jar加入到环境变量。
进入JMSAdmin控制台后,您可以自由定义sub context。对于子上下文的操作,主要有一下命令:
display ctx
define ctx(ctxname)
change ctx(ctxname)
change ctx(=up)
change ctx(=init)
delete ctx(ctxname)
当然,在这里的主要任务并非是用来定义sub context,而是用来定义以下几个对象:
MQQueueConnectionFactory
MQTopicConnectionFactory
MQQueue
MQTopic
(还有其它的一些对象,如MQXAQueueConnectionFactory等,不常用到,在此不作说明。)
可以使用很多动词来操纵目录名称空间中的受管理对象。ALTER、DEFINE、DISPLAY、DELETE、COPY和MOVE,它们的用法都算比较简单,这里只列举一二以作说明。
例一:定义一QueueConnectionFactory,连接主机10.10.10.18,端口1414
DEFINE QCF(EXAMPLEQCF)+
DESC(Example Queue Connection Factory)+
TRAN(CLIENT)+
HOST(10.10.10.18)+
QMGR(QM_EXAMPLE)+
CHAN(S_EXAMPLE)+
PORT(1414)+
CCSID(1381)
例二:定义一Queue,其对应于MQ中的Q_EXAMPLE
DEFINE Q(EXAMPLEQL)+
DESC(Local queue)+
QMGR(QM_EXAMPLE)+
QUEUE(Q_EXAMPLE)+
CCSID(1381)
?
用JMS实现MQ编程
上面我们说明了怎样用JMSAdmin
Tool定义MQ对象的上下文。我们的最终目的是要用JMS来实现MQ编程,以实现在程序中对MQ队列进行收、发消息。所以,下面我们将重点讨论一下MQ的JMS实现。
如果您对JMS编程很熟悉,那么您也就会用JMS来实现MQ编程,因为用JMS来编写MQ程序与编写一般的JMS程序没有太大的差别。举个例子,当我们想发送一条消息到MQ的队列中,再从该队列中取回消息时,我们编程时主要有四个步骤。首先我们要初始化在程序中要用到的对象,然后才可以发送消息到队列中去,再就是收取消息了,最后要清除那些永久对象。这些都和普通的JMS程序相当。程序的源代码如下:
代码
import java.util.Hashtable;
import javax.jms.*;
import javax.naming.*;
import javax.naming.directory.*;
public class sample ...{
protected QueueConnectionFactory factory=null;
protected QueueConnection connection;
protected QueueSession queueSession;
protected TextMessage outMessage;
protected QueueSender queueSender;
protected QueueReceiver queueReceiver;
public static final String qcfLookup="EXAMPLEQCF";
public static final String qLookup="EXAMPLEQL";
public static final String icf =
"com.sun.jndi.fscontext.RefFSContextFactory";
public String url ="file:/d:/temp";
public void sampleInit() throws Exception ...{
Hashtable environment = new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY, icf);
environment.put(Context.PROVIDER_URL, url);
environment.put(Context.REFERRAL, "throw");
Context ctx=new InitialDirContext(environment);
factory = (QueueConnectionFactory)ctx.lookup(qcfLookup);
Queue q1=null;
q1=(Queue)ctx.lookup(qLookup);
connection = factory.createQueueConnection();
queueSession = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueSender = queueSession.createSender(q1);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
outMessage = queueSession.createTextMessage();
queueReceiver = queueSession.createReceiver(q1);
connection.start();
}
public void sendMessageOut(String message) throws JMSException ...{
outMessage.setText(message);
queueSender.send(outMessage);
}
public String receiveMessage() throws Exception...{
return ((TextMessage)queueReceiver.receive()).getText();
}
public void sampleClose() throws JMSException ...{
queueSession.close();
connection.close();
}
public static void main(String[] args)...{
String rec;
sample sp = new sample();
try ...{
sp.sampleInit();
sp.sendMessageOut("Hello World!");
java.lang.Thread.sleep(4000);
rec=sp.receiveMessage();
System.out.println("Receive text is : "+rec);
sp.sampleClose();
}catch(Exception e) ...{
e.printStackTrace();
}
}
}
?
?
理解示例类
如果你编写过JMS应用程序,就很容易理解JNDIUtil 和 Tester示例类(从http://assets.devx.com/sourcecode/WebSphereMQ_JMSSource&Classes.zip下载Java源文件和编译好的类),你创建的JMS管理对象隐藏了所有厂家专利实现。
JNDIUtil类
JNDIUtil包括使用名字通过JNDI查找检索对象的方法,参考清单1,你可以使用这个类中的方法检索你在MQ管理器中定义的JMS对象的引用情况。
清单1 JNDIUtil.java
package devx.articles.mqjms;
// JMS 类
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
// JNDI 类
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
// 标准 Java类
import java.util.Hashtable;
/**
*
* A wrapper class for JNDI calls
*
*/
public class JNDIUtil
{
private Context context;
public JNDIUtil(String icf, String url) throws JMSException, NamingException
{
Hashtable environment = new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY, icf );
environment.put(Context.PROVIDER_URL, url);
context= new InitialContext( environment );
}
/**
*
* @param ObjName Object Name to be retrieved
* @return Retrieved Object
* @throws NamingException
*/
private Object getObjectByName(String ObjName) throws NamingException
{
return context.lookup( ObjName );
}
/**
* A convenience method that returns QueueConnectionFactory objects (no casting required)
* @param factoryName QueueConnectionFactory JNDI name
* @return QueueConnectionFactory object
* @throws NamingException
*/
public QueueConnectionFactory getQueueConnectionFactory(String factoryName) throws NamingException
{
return (QueueConnectionFactory) getObjectByName(factoryName);
}
/**
* A convenience method that returns Queue objects (no casting required)
* @param queueName
* @return
* @throws NamingException
*/
public Queue getQueue(String queueName) throws NamingException
{
return (Queue) getObjectByName(queueName);
}
} Tester类
Tester类向OUT.QUEUE中写入消息,从IN.QUEUE中读取消息。参考清单2.
清单2 Tester.java
package devx.articles.mqjms;
//JMS 类
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.jms.Message;
//JNDI 类
import javax.naming.NamingException;
//标准 Java 类
/**
*
* A class to test JMS with IBM MQ
*
*/
public class Tester
{
public static String icf = "com.sun.jndi.fscontext.RefFSContextFactory";
public static String url = "file:/C:/JNDI-Directory";
public static void main(String[] vars) throws JMSException, NamingException
{
QueueSession session = null;
QueueConnection connection = null;
QueueConnectionFactory factory = null;
QueueSender queueSender = null;
QueueReceiver queueReceiver= null;
Queue oQueue = null; // 消息发送到的队列
Queue iQueue = null; // 接收消息的队列
try
{
JNDIUtil jndiUtil= new JNDIUtil(icf,url);
factory= jndiUtil.getQueueConnectionFactory("TestQM_QCF");
connection = factory.createQueueConnection();
// 启动(或重新启动)入站消息的连接地址,如果没有这个调用消息不会被接收
connection.start();
//表示一个非相互操作会话
boolean transacted = false;
session = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);
oQueue= jndiUtil.getQueue("OutputTestQueue");
queueSender = session.createSender(oQueue);
TextMessage oMsg = session.createTextMessage();
oMsg.setText("www.devx.com");
// 你还可以设置其他消息属性
queueSender.send(oMsg);
iQueue= jndiUtil.getQueue("InputTestQueue");
queueReceiver = session.createReceiver(iQueue);
Message iMsg = queueReceiver.receive(1000);
if ( iMsg != null )
System.out.println( ((TextMessage)iMsg).getText() );
else
System.out.println( "No messages in queue " );
}
finally
{
//总是释放资源
if ( queueReceiver!= null )
queueReceiver.close();
if ( queueSender!= null )
queueSender.close();
if ( session!= null )
session.close();
if ( connection!= null )
{
connection.close();
}
}
}
}
开始点是连接工厂查找,这个工厂用于创建一个连接:
factory= jndiUtil.getQueueConnectionFactory("TestQM_QCF");
connection = factory.createQueueConnection();
连接对象用于创建一个会话:
session = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);
要将消息写入IN.QUEUE queue,查找前面创建的目的地对象OutputTestQueue:
oQueue= jndiUtil.getQueue("OutputTestQueue");
最后创建一个QueueSender对象将消息写入队列:
queueSender = session.createSender(oQueue);
TextMessage oMsg = session.createTextMessage();
oMsg.setText("www.devx.com");
queueSender.send(oMsg);
从OUT.QUEUE读取消息的过程相同,但使用的是QueueReceiver。
编译运行示例类
当你安装WMQ时会自动将编译和运行示例类需要的jar文件添加到CLASSPATH环境变量中,需要的jar文件位于C:\Program Files\IBM\WebSphere MQ\Java\lib,包括JMS和JNDI需要的jar。
在运行Tester类之前,使用MQ管理器向IN.QUEUE增加一条测试消息:
1.在 IN.QUEUE上点击右键,选择放入测试消息。
2.输入任意文本,点击放入消息。
如果你还没有在IN.QUEUE队列中放入过消息,Tester类会显示“队列中无消息”。
要运行Tester类,将Tester.class 和 JNDIUtil.class添加到CLASSPATH,然后在命令提示符输入:
java devx.articles.mqjms.Tester 应用程序向OUT.QUEUE写入一条消息,并显示从IN.QUEUE检索到的消息,要检查发送到OUT.QUEUE中的消息,进入MQ管理器,在OUT.QUEUE上点击右键?选择“浏览消息”即可。
企业中的Java和WMQ MOM
在大型企业环境下,可以充分利用WMQ的性能和稳定性优势,只要你的代码遵循Java标准接口,你就可以受益。
?
本篇文章来源于:开发学院 http://edu.codepub.com?? 原文链接:http://edu.codepub.com/2009/0419/3122_2.php
?
?
这个是自己写的
?
所使用jar包为:ibmmq.jar,com.ibm.mq.jar,jms.jar,com.ibm.mq_jms.jar
?
?
?
package com.learn.mq;
import com.ibm.mq.*;
import javax.jms.*;
import com.ibm.mq.jms.*;
public class PTPSender {
?
?String host = "localhost";
?String qm = "learn.mq";
?//服务器连接通道,前面的为自己建的,后面的为系统默认的
?String channel = "LEARN.SVRCON.CHANNEL";//"SYSTEM.DEF.SVRCONN";
?String queue = "LEARN.QUEUE";
?QueueConnection qConn = null;
?QueueSession session = null;
?//由此值得到回复消息的CorrelationID
?String msgID = null;
?public void init(){
??//连接工厂,用com.ibm.mq.jms中的类实现javax.jms中的接口
??QueueConnectionFactory qcf = new MQQueueConnectionFactory();
??
??//设置连接工厂属性
??try {
???((MQQueueConnectionFactory)qcf).setHostName(host);
???((MQQueueConnectionFactory)qcf).setQueueManager(qm);
???((MQQueueConnectionFactory)qcf).setCCSID(1381);
???((MQQueueConnectionFactory)qcf).setChannel(channel);
???((MQQueueConnectionFactory)qcf).setPort(11001);
???qConn = qcf.createQueueConnection();
???
???//JMS规范指出要在停止的状态下创建连接,使用时需要start
???qConn.start();
??} catch (JMSException e) {
???// TODO Auto-generated catch block
???e.printStackTrace();
???return;
??}
??
??System.out.println("MQ连接成功");
??
?}
?//将消息发送到inputQ中
?public void sendMessage(){
??try{
???boolean transacted = false;
???
???//JMS会话是创建和使用消息的单线程上下文
???//非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
???session = qConn.createQueueSession(transacted
?????, Session.AUTO_ACKNOWLEDGE);
???
???//对队列管理器上队列的映射
???Queue inputQ = session.createQueue("inputQ");
???QueueSender qSender = session.createSender(inputQ);
???
???//消息由会话创建
???TextMessage message = session.createTextMessage();
???message.setText("this is input message from Sender");
???
???//消息为非持久消息
???message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
???
???//创建队列的另一种方式,learn.mq也可以指定为其它队列管理器
???Queue outputQ = session.createQueue("queue://learn.mq/outputQ");
???//设置消息的回复队列,JMSReplyTo为消息头
???message.setJMSReplyTo(outputQ);
???
???//发送消息,里面后两个参数暂时不知道什么含义
???qSender.send(message,DeliveryMode.NON_PERSISTENT,7,0);
???
???//获得发送消息的ID,放入msgID中暂存,留给取消息时用
???String messageID = message.getJMSMessageID();
???msgID = messageID;
???
??} catch (JMSException e) {
???// TODO Auto-generated catch block
???e.printStackTrace();
??}
?}
?
?//将消息从inputQ中转移到outputQ并设置消息的CorrectionID
?public void moveMessage(){
??Queue moveQ_get;
??Queue moveQ_put;
??try {
???moveQ_get = session.createQueue("inputQ");
???QueueReceiver qReceiver = session.createReceiver(moveQ_get);
???TextMessage message = (TextMessage)qReceiver.receive();
???
???message.setJMSCorrelationID(message.getJMSMessageID());
???
???System.out.println(message.getJMSReplyTo().toString());
???moveQ_put = session.createQueue(message.getJMSReplyTo().toString());
???QueueSender qSender = session.createSender(moveQ_put);
???qSender.send(message);
??} catch (JMSException e) {
???// TODO Auto-generated catch block
???e.printStackTrace();
??}
??
?}
?
?public void receiveMessage(){
??try{
???Queue outputQ = session.createQueue("outputQ");
???
???//获取发送消息对应的回复消息
???QueueReceiver qReceiver = session.createReceiver(outputQ
?????,"JMSCorrelationID='"+msgID+"'");
???//设置接收消息的超时时间
???TextMessage message = (TextMessage)qReceiver.receive(10000);
???System.out.println("receive message is:"+message.getText());
???
???//注册的监听器需要在一线程中运行
???//MyMsgListener msgListener = new MyMsgListener();
???//qReceiver.setMessageListener(msgListener);
???//wait();
???
??} catch (JMSException e) {
???// TODO Auto-generated catch block
???e.printStackTrace();
??}
?}
?
?//释放会话及连接
?public void destroy(){
??
??try {
???session.close();
???qConn.close();
??} catch (JMSException e) {
???// TODO Auto-generated catch block
???e.printStackTrace();
??}
?}
?
?
?public static void main(String[] args) {
??PTPSender sender = new PTPSender();
??sender.init();
??sender.sendMessage();
??sender.moveMessage();
??sender.receiveMessage();
??sender.destroy();
?}
}
//消息监听器
class MyMsgListener implements MessageListener{
?public void onMessage(Message message){
??try{
???if(message instanceof TextMessage){
????System.out.println("Listener 接收消息:"+((TextMessage)message).getText());
???}
??}catch(JMSException e){
???e.printStackTrace();
??}
?}
}
//异常监听器,需要有消息监听器一同运行,它将会发送MyExceptionListener对象到onMessage中
//异常监听器注册到连接对象上,也就是qConn.setExceptionListener(myExceptionListener)
class MyExceptionListener implements ExceptionListener{
?//此处可放入更多逻辑
?public void onException(JMSException e) {
??System.out.println("队列管理器异常");
??e.printStackTrace();
??System.exit(0);
?}
?
}