读书人

activeMQ跟spring集成

发布时间: 2012-11-01 11:11:31 作者: rapoo

activeMQ和spring集成
activeMQ和spring集成

activeMQ的服务端还是像以前那样配置,注册成为WINDOWS的服务,自己启动。
和SPRING集成的,其实是消息发送和消息消费的CLIENT端

使用到了MDP,这样可以把一些发送邮件、发送短信的耗时操作都变成异步的。调用发送邮件、发送短信等,SERVICE马上
返回了,其实耗时操作没有完成,而是把这个任务放置到了队列里面,由MDP去调用真正的处理程序和方法,来处理队列里面
的数据。

貌似加入了这个JAR包:
apache-activemq-4.1.0-incubator.jar

在CONF里面增加了配置文件applicationContext-activemq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">

<beans>
<bean id="jmsConnectionFactory"
/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="userMessageConverter" />
</bean>

<!-- Queue模式 -->
<bean id="destinationQueue"
/>

<!-- POJO which send Message uses Spring JmsTemplate -->
<bean id="userMessageProducer"
ref="jmsTemplate" />
<property name="destination" ref="destinationQueue" />
</bean>

<!-- Message Driven POJO (MDP) -->
<bean id="messageListener"
value="printUser" />
<property name="messageConverter" ref="userMessageConverter" />
</bean>

<!-- listener container,MDP无需实现接口 -->
<bean id="listenerContainer"
ref="jmsConnectionFactory" />
<property name="destination" ref="destinationQueue" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>

其中配置引用了三个BEAN,其中一个比较重要的工具BEAN是转化MESSAGE的UserMessageConverter.java:

package com.sillycat.plugin.activemq;


import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.springframework.jms.support.converter.MessageConverter;

import com.sillycat.core.model.User;


public class UserMessageConverter implements MessageConverter {


/*
* (non-Javadoc)
*
* @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object,
* javax.jms.Session)
*/
public Message toMessage(Object obj, Session session) throws JMSException {
//check Type
if (obj instanceof User) {
ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
HashMap map = new HashMap();
try {
//Order,Order,Product must implements Seralizable
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
bos.close();
map.put("User", bos.toByteArray());
objMsg.setObjectProperty("Map", map);

} catch (IOException e) {
e.printStackTrace();
}

return objMsg;
} else {
throw new JMSException("Object:[" + obj + "] is not User");
}

}

/*
* (non-Javadoc)
*
* @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
*/
public Object fromMessage(Message msg) throws JMSException {
if (msg instanceof ObjectMessage) {
HashMap map= (HashMap) ((ObjectMessage) msg)
.getObjectProperty("Map");
try {
// Order,Order,Product must implements Seralizable
ByteArrayInputStream bis=new ByteArrayInputStream((byte[]) map.get("User"));
ObjectInputStream ois=new ObjectInputStream(bis);
return ois.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
} else {
throw new JMSException("Msg:[" + msg + "] is not Map");
}
}

}

注意传递的对象要序列化

一个发送消息的BEAN是UserMessageProducer.java:

package com.sillycat.plugin.activemq;

import javax.jms.Queue;

import org.springframework.jms.core.JmsTemplate;

import com.sillycat.core.model.User;


public class UserMessageProducer {
private JmsTemplate template;

private Queue destination;

public void setTemplate(JmsTemplate template) {
this.template = template;
}

public void setDestination(Queue destination) {
this.destination = destination;
}

public void send(User order) {
template.convertAndSend(this.destination, order);
}

}

一个接受消息的BEAN是UserMessageConsumer.java:
package com.sillycat.plugin.activemq;

import com.sillycat.core.model.User;


public class UserMessageConsumer {


public void printUser(User user) {
user.getId();
user.getName();
System.out.println(user);
}

}

这里只是一个DEMO,所以打印出来对象信息就结束了,其实真正的可以实现一些异步的调用,比如说耗时比较常的发送邮件,发送短信啊。这些就可以写到这里

当然老习惯,配一个单元测试UserMessageTest.java:

package com.sillycat.plugin.activemq;

import com.sillycat.core.model.User;
import com.sillycat.plugin.commons.base.ServiceTestBase;

public class UserMessageTest extends ServiceTestBase{

private UserMessageProducer userMessageProducer;

private User user;

protected void setUp() throws Exception {
super.setUp();
userMessageProducer = (UserMessageProducer) appContext.getBean("userMessageProducer");
user = getUser();
}

protected void tearDown() throws Exception {
super.tearDown();
}

public void testDumy(){
assertTrue(true);
}

public void testSendUserBean(){
for(int i = 0;i<10;i++){
userMessageProducer.send(user);
}
System.out.println("test end");
}



private User getUser() {
User user = new User();
user.setName("luohua");
user.setId(Integer.valueOf("2"));
return user;
}

}



读书人网 >软件架构设计

热点推荐