jms 学习笔记2
JMS 异步消息传输,客户端将消息发给消息中介,可以保证消息被可靠的投递,即使在服务器服务中断,当服务恢复正常时,仍然可以恢复消息队列.? 之前学习了,JMS简单送消息,但是为了方便操作以及代码的简化,Spring提供的消息转化器来简化消息发送和接收时的转化过程:
消息转化器接口: org.springframework.jms.support.converter.MessageConverter ,些接口简单明了,只有两个方法:将消息转化成对象和将圣像转化为消息,下面是个简单的例子:
package com.kesn.jms.converter;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import com.kesn.jms.entity.Motorist;
/**
?* Spring 提供的消息转化器
?* @author LEPING LI
?*/
public class MotoristConverter implements MessageConverter {
?/**
? * 将消息报转化为对象
? */
?public Object fromMessage(Message message) throws JMSException,
???MessageConversionException {
??if(!(message instanceof MapMessage)){
???throw new MessageConversionException("Message is not a MapMessage!");
??}
??MapMessage map=(MapMessage)message;
??Motorist m=new Motorist();
??m.setName(map.getString("name"));
??m.setEmail(map.getString("email"));
??m.setAge(map.getString("age"));
??return m;
?}
?/**
? * 将对象转化为消息
? */
?public Message toMessage(Object obj, Session session) throws JMSException,
???MessageConversionException {
??if(!(obj instanceof Motorist)){
???throw new MessageConversionException("obj is not a Motorits!");
??}
??Motorist m=(Motorist)obj;
??MapMessage message=session.createMapMessage();
??message.setString("name", m.getName());
??message.setString("email", m.getEmail());
??message.setString("age", m.getAge());
??return message;
?}
}
将消息转化器置入JmsTemplate
?
<!-- JMS模板 -->
?<bean id="jmsTemplate" ref="connectionFactory"/>
??<property name="messageConverter" ref="motoristConverter"/>
?</bean>
?
如此设置,有发送消息和接收消息时使用相关方法可以消除消息的转化过程
package com.kesn.jms.service.impl;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import com.kesn.jms.entity.Motorist;
import com.kesn.jms.service.JmsTemplateReceiveService;
/**
?* 使用JmsTemplate 发送消息
?* @author LEPING LI
?*
?*/
public class KesnJmsTemplateMessageReceiver implements JmsTemplateReceiveService{
?
?private JmsTemplate jmsTemplate;
?public void setJmsTemplate(JmsTemplate jmsTemplate) {
??this.jmsTemplate = jmsTemplate;
?}
?private Destination destination;
?public void setDestination(Destination destination) {
??this.destination = destination;
?}
?@Override
?public Motorist receiveMessage() {
//??try {
//???MapMessage message=(MapMessage) jmsTemplate.receive(destination);
//???Motorist m=new Motorist();
//???m.setName(message.getString("name"));
//???m.setEmail(message.getString("email"));
//???m.setAge(message.getString("age"));
//???return m;
//??} catch (JmsException e) {
//???e.printStackTrace();
//??}
//??return null;
??return (Motorist)jmsTemplate.receiveAndConvert(destination);
?}
}
package com.kesn.jms.service.impl;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.kesn.jms.entity.Motorist;
import com.kesn.jms.service.JmsTemplateSendService;
/**
?* 使用JmsTemplate 发送消息
?* @author LEPING LI
?*
?*/
public class KesnJmsTemplateMessageSender implements JmsTemplateSendService{
?private JmsTemplate jmsTemplate;
?public void setJmsTemplate(JmsTemplate jmsTemplate) {
??this.jmsTemplate = jmsTemplate;
?}
?private Destination destination;
?public void setDestination(Destination destination) {
??this.destination = destination;
?}
?
?@Override
?public void sendMessage(final Motorist m) {
//??jmsTemplate.send(destination, new MessageCreator(){
//???@Override
//???public Message createMessage(Session session) throws JMSException {
//????MapMessage message=session.createMapMessage();
//????message.setString("name",m.getName());
//????message.setString("email",m.getEmail());
//????message.setString("age", m.getAge());
//????return message;
//???}
//??});
??jmsTemplate.convertAndSend(destination, m);
?}
}
在发送和接收的消息时,JmsTemplate 方法自动应该消息转化器
?
?
以上都是采用注入JmsTemplate 的方式来使用操作消息,和其它模板一样,Spring也提供了JmsTemplate 对应的支持类 JmsGatewaySupport ,通过继承该类,可能通过getJmsTemplate 方法获取到JmsTemplate 模板
?
通常的编程模式我们都更倾向于基于Pojo的编程,EJB2中MDB消息驱动Bean必须实现javax.ejb.MessageDrivenBean 接口,EJB3不再需要实现该接口,但是还是要去实现一个监听器接口:MessageListener ,更像POJO,但还是POJO,Spring2中提供的消息驱动Bean采用了EJB3类型的方式,我们称Spring消息驱动Pojo为MDP ,下面是一个简单的例子:
?
创建一个消息驱动Pojo
package com.kesn.jms.mdp;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
import com.kesn.jms.entity.Motorist;
/**
?* 创建消息监听器
?* @author LEPING LI
?*
?*/
public class MarkingMdp implements MessageListener {
?private Logger log=Logger.getLogger(MarkingMdp.class);
?@Override
?public void onMessage(Message message) {
??MapMessage map=(MapMessage)message;
??Motorist m=new Motorist();
??try {
???m.setName(map.getString("name"));
???m.setEmail(map.getString("email"));
???m.setAge(map.getString("age"));
???log.info("name:"+m.getName());
??} catch (JMSException e) {
???e.printStackTrace();
??}
?}
}
上面MDP就是实现一个MessageListener 监听器接口,了队列和主题有消息时,系统为自动调用MDP的onMessage方法来处理消息
但是JMS是如何自动调用onMessage方法来处理消息呢?消息监听的处理方式是这样的:消息监听器容器一个用于查看JMS目标,等待消息的特殊Bean,一量消息到达,就会获取消息,并通过onMessage方法来处理消息;Spring提供的 消息监听器容器有以下三种:
SimpleMessageListenerContainer 最简单的消息监听容器,处理有限数据量的消息,不支持事务
DefaultMessageListenerContainer 支持事务
ServerSessionMessageListenerConatainer? 支持更强大的功能,支持事务,允许动态管理JMS会话
?
消息监听器有Spring中的配置如下:
<!-- 消息监听器 -->
?<bean id="markingMdp" ref="connectionFactory"/>
??<property name="destination" ref="kesnQDestination"/>
??<property name="messageListener" ref="markingMdp"/>
?</bean>
?
如果需要事务可以使用:DefaultMessageListenerConatiner 配置如下
<!-- 消息监听器 -->
?<bean ref="connectionFactory"/>
??<property name="destination" ref="kesnQDestination"/>
??<property name="messageListener" ref="markingMdp"/>
??<property name="transactionManager" ref="jmsTransactionManager"/>
?</bean>
?
?<!-- JMS事务管理器 -->
?<bean? id="jmsTransactionManager" ref="connectionFactory"/>
?</bean>
?
?
下面学习一下如何用Spring创建一个纯POJO的MDP
上面的方式或多或不都使用了一定的JMS API,如何实现一个更POJO的MDP呢,Spring提供了一适配器 MessageListenerAdpater ,是一个MessageListener ,可能委派Bean和指定的方法,所谓适配器简单的说就是包装目标对象,增强其功能;我们将采用spring 配置的方式对之前的MDP(或是一个简单的POJO)经过适配器包装后形成新的监听器,配置如下:
<!-- 纯POJO MDP Bean -->
??? <bean id="markingPojoMdp" ref="markingPojoMdp"/>
??? ??? <property name="defaultListenerMethod" value="handleMessage"/>
??? </bean>
???
??? <!-- 配置监听器容器 -->
??? <bean ref="connectionFactory"/>
??? ??? <property name="destination" ref="kesnQDestination"/>
??? ??? <property name="messageListener" ref="purePojoMdp"/>
??? </bean>
?
?
通过在监控器中转入消息转化器,可以简单化消息接收的发送代码
<bean id="purePojoMdp" ref="markingPojoMdp"/>
??? ??? <property name="defaultListenerMethod" value="handleMessage"/>
??? ??? <property name="messageConverter" ref="motoristConverter"/>
??? </bean>
?
接收消息的代码可以转化为:
public void handleMessage(Motorist m) {
??? ??? log.info("name:"+m.getName());
??? }
?
至些我们建立的MDP已经是完成基于POJO的了