读书人

spring hornetq selector 过滤讯息

发布时间: 2012-12-24 10:43:14 作者: rapoo

spring hornetq selector 过滤消息

在接收 JMS消息的时候,我们经常要在消息队列里面过滤出自己需要的消息,摒弃我们不需要的消息。这个时候就需要用到 JMS的selector功能。这里结合spring3.1,给出一个例子。

?

发送消息的配置:

?

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsdhttp://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsdhttp://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"><bean id="selectorQueue" /></bean><bean id="transportConfiguration" /><constructor-arg><map key-type="java.lang.String" value-type="java.lang.Object"><entry key="host" value="localhost"></entry><entry key="port" value="5445"></entry></map></constructor-arg></bean><bean id="connectionFactory" /><constructor-arg ref="transportConfiguration" /></bean><bean id="jmsTemplate" ref="connectionFactory" /></bean><bean id="sender" ref="jmsTemplate" /><property name="destination" ref="selectorQueue" /></bean></beans>

?

?发送消息的Sender类

?

/** * <pre> * </pre> */package com.wanmei.jms.spring.selector.config.sender;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.log4j.Logger;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import com.wanmei.jms.spring.selector.java.State;/** * <pre> * date 2012-12-20 * </pre> */public class Sender implements State {private static final Logger LOGGER = Logger.getLogger(Sender.class);private JmsTemplate jmsTemplate;private Destination destination;public void send(final String message, final String fromNode,final String toNode) {try {LOGGER.info("start to send message to " + destination+ " [message:" + message + ",fromNode:" + fromNode+ ",toNode:" + toNode);jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session)throws JMSException {LOGGER.info("session:"+session + "\nmessage : " + message);TextMessage msg = session.createTextMessage(message);msg.setStringProperty(FROM_NODE, fromNode);msg.setStringProperty(TO_NODE, toNode);LOGGER.info("-->"+msg);LOGGER.info(TO_NODE+"-->"+toNode);return msg;}});LOGGER.info("send message to " + destination + " successfully!");} catch (Throwable t) {LOGGER.error("Error:" + t.getMessage(), t);}}// ----------------- setter / getterpublic JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public Destination getDestination() {return destination;}public void setDestination(Destination destination) {this.destination = destination;}public static Logger getLogger() {return LOGGER;}}
?

?

发送消息的main函数类

?

/** * <pre> * </pre> */package com.wanmei.jms.spring.selector.config.sender;import org.apache.log4j.Logger;import org.springframework.context.ApplicationContext;import org.springframework.context.support.FileSystemXmlApplicationContext;/** * <pre> * date 2012-12-20 * </pre> */public class BootstrapSender {private static final Logger LOGGER = Logger.getLogger(BootstrapSender.class);/** * <pre> * @param args * </pre> */public static void main(String[] args) {LOGGER.info("start to work and initialize spring frame.");String configLocation = "E:/workspace_java/hornetq/src/com/wanmei/jms/spring/selector/config/sender/applicationContext.xml";ApplicationContext applicationContext = new FileSystemXmlApplicationContext(configLocation);LOGGER.info("initialize spring frame successfully!");Sender sender = applicationContext.getBean("sender", Sender.class);for (int i = 0; i < 10; i++) {Msg msg = createMessage(i);sender.send(msg.getMessage() + "-" + i, msg.getFromNode(), msg.getToNode());}}private static Msg createMessage(int i) {String base1 = "127.0.0.1";String base2 = "127.0.0.2";String message = message(base1);String fromNode = from(base1);String toNode = to(base2);if (i % 2 == 0) {message = message(base2);fromNode = from(base2);toNode = to(base1);}return new Msg(message, fromNode, toNode);}private static String message(String str) {return "send " + str;}private static String from(String str) {return "from " + str;}private static String to(String str) {return "to " + str;}}class Msg {private String message;private String fromNode;private String toNode;/** * <pre> * </pre> */public Msg() {super();}/** * <pre> * @param message * @param fromNode * @param toNode * </pre> */public Msg(String message, String fromNode, String toNode) {super();this.message = message;this.fromNode = fromNode;this.toNode = toNode;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public String getFromNode() {return fromNode;}public void setFromNode(String fromNode) {this.fromNode = fromNode;}public String getToNode() {return toNode;}public void setToNode(String toNode) {this.toNode = toNode;}}

?

?用到的一个接口类:

?

/** * <pre> * </pre> */package com.wanmei.jms.spring.selector.java;/** * <pre> * date 2012-12-20 * </pre> */public interface State {public final static String FROM_NODE = "FROM_NODE";public final static String TO_NODE = "TO_NODE";}

?

?

------------------------------------------------------ 开始接收消息-------------------------------------

接收的spring的配置文件:

?

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsdhttp://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsdhttp://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"><bean id="selectorQueue" /></bean><bean id="transportConfiguration" /><constructor-arg><map key-type="java.lang.String" value-type="java.lang.Object"><entry key="host" value="localhost"></entry><entry key="port" value="5445"></entry></map></constructor-arg></bean><bean id="connectionFactory" /><constructor-arg ref="transportConfiguration" /></bean><bean id="receiveListener" ref="connectionFactory"/>    <property name="destination" ref="selectorQueue"/>    <property name="messageListener" ref="receiveListener"/>    <property name="messageSelector" value="TO_NODE='to 127.0.0.1'"/></bean></beans>

?

?实现了MessageListener接口的类:

?

/** * <pre> * </pre> */package com.wanmei.jms.spring.selector.java;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import org.apache.log4j.Logger;/** * <pre> * date 2012-12-20 * </pre> */public class ReceiveListener implements MessageListener, State {private static final Logger LOGGER = Logger.getLogger(ReceiveListener.class);/* * (non-Javadoc) *  * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */@Overridepublic void onMessage(Message msg) {try {// LOGGER.info("start to receive from " + msg.getJMSDestination());TextMessage message = (TextMessage) msg;String fromNode = message.getStringProperty(FROM_NODE);String toNode = message.getStringProperty(TO_NODE);LOGGER.info("receive message from " + message.getJMSDestination()+ ", msg : " + message.getText() + ", fromNode : " + fromNode+ ", toNode : " + toNode);} catch (JMSException e) {LOGGER.error("Error" + e.getMessage(), e);}}}

?

?含有main函数的引导类:

?

/** * <pre> * </pre> */package com.wanmei.jms.spring.selector;import org.apache.log4j.Logger;import org.springframework.context.support.FileSystemXmlApplicationContext;/** * <pre> * date 2012-12-20 * </pre> */public class Bootstrap {private static final Logger LOGGER = Logger.getLogger(Bootstrap.class);/** *<pre> * @param args *</pre> */public static void main(String[] args) {LOGGER.info("start to work and initialize spring frame.");String configLocation = "E:/workspace_tmp_copy/hornetq/src/com/wanmei/jms/spring/selector/config/receiver/c1/applicationContext.xml";new FileSystemXmlApplicationContext(configLocation);LOGGER.info("initialize spring frame successfully!");LOGGER.info("spring : " + configLocation);}}

?

------------ 备注:

上面接收的spring的配置文件还可以采取另一种 JMS Namespace Support 来进行配置,注意destination="org.spring.jms.selector.queue" 这里是queue的名称,不是引用

配置文件如下:

?

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsdhttp://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsdhttp://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"><bean id="transportConfiguration" /><constructor-arg><map key-type="java.lang.String" value-type="java.lang.Object"><entry key="host" value="localhost"></entry><entry key="port" value="5445"></entry></map></constructor-arg></bean><bean id="connectionFactory" /><constructor-arg ref="transportConfiguration" /></bean><bean id="receiveListener" ref="receiveListener" selector="TO_NODE='to 127.0.0.1'"/></jms:listener-container></beans>

?具体内容请参考spring.3.1的reference的文档

读书人网 >软件架构设计

热点推荐