Spring整合JMS——基于ActiveMQ实现(一)
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>javax.annotation</groupId> <artifactId>jsr250-api</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency></dependencies>
<bean id="connectionFactory" name="code"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" ref="targetConnectionFactory"/> </bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" ref="connectionFactory"/> </bean>
<!--这个是队列目的地,点对点的--> <bean id="queueDestination" name="code">package com.tiantian.springintejms.service.impl; import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session; import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component; import com.tiantian.springintejms.service.ProducerService; @Componentpublic class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, final String message) { System.out.println("---------------生产者发送消息-----------------"); System.out.println("---------------生产者发了一个消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } public JmsTemplate getJmsTemplate() { returnjmsTemplate; } @Resource public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
package com.tiantian.springintejms.listener; import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一个纯文本消息。"); try { System.out.println("消息内容是:" + textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
<!--这个是队列目的地--> <bean id="queueDestination" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean>
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:component-scan base-package="com.tiantian" /> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" ref="connectionFactory"/> </bean> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" ref="targetConnectionFactory"/> </bean> <!--这个是队列目的地--> <bean id="queueDestination" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean></beans>
package com.tiantian.springintejms.test; import javax.jms.Destination; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.tiantian.springintejms.service.ProducerService; @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest { @Autowired private ProducerService producerService; @Autowired @Qualifier("queueDestination") private Destination destination; @Test public void testSend() { for (int i=0; i<2; i++) { producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1)); } } }
?
?
?附:
Spring整合JMS(二)——消息监听器
?
6 楼 234390216 2013-06-24 jysemel 写道希望继续 有更多类似深入的文章分享后续会接着发布该系列的二和三的。 7 楼 lvwenwen 2013-06-24 麻烦楼主上传工程,谢谢 8 楼 234390216 2013-06-24 lvwenwen 写道麻烦楼主上传工程,谢谢
已经上传lvwenwen 写道麻烦楼主上传工程,谢谢
。 9 楼 liu_shui8 2013-06-25 学习学习!!!! 10 楼 zhangzhiwei1403 2013-06-25 不错。。。 11 楼 至尊宝_唯一 2013-06-25 写的很好,希望再写深入一点 12 楼 追求技术 2013-06-25 至尊宝_唯一 写道写的很好,希望再写深入一点
你觉得要怎么深入呢?我觉得博主这种实用性的文章讲明白了怎么用就可以了。 13 楼 234390216 2013-06-25 追求技术 写道至尊宝_唯一 写道写的很好,希望再写深入一点
你觉得要怎么深入呢?我觉得博主这种实用性的文章讲明白了怎么用就可以了。
确实,可以说一下需要往哪方面深入。 14 楼 至尊宝_唯一 2013-06-25 234390216 写道追求技术 写道至尊宝_唯一 写道写的很好,希望再写深入一点
你觉得要怎么深入呢?我觉得博主这种实用性的文章讲明白了怎么用就可以了。
确实,可以说一下需要往哪方面深入。
可以深入的有很多的方面,例如消息持久化,消息的安全性,使用Hook保证消息传送事务的原子性等等 15 楼 234390216 2013-06-25 至尊宝_唯一 写道234390216 写道追求技术 写道至尊宝_唯一 写道写的很好,希望再写深入一点
你觉得要怎么深入呢?我觉得博主这种实用性的文章讲明白了怎么用就可以了。
确实,可以说一下需要往哪方面深入。
可以深入的有很多的方面,例如消息持久化,消息的安全性,使用Hook保证消息传送事务的原子性等等
这个会在后续第四篇讲到。 16 楼 至尊宝_唯一 2013-06-26 234390216 写道至尊宝_唯一 写道234390216 写道追求技术 写道至尊宝_唯一 写道写的很好,希望再写深入一点
你觉得要怎么深入呢?我觉得博主这种实用性的文章讲明白了怎么用就可以了。
确实,可以说一下需要往哪方面深入。
可以深入的有很多的方面,例如消息持久化,消息的安全性,使用Hook保证消息传送事务的原子性等等
这个会在后续第四篇讲到。
继续加油哈 17 楼 huangshurenjk 2013-06-26 写得挺好,能说说实际应用中消息丢失的情况和应对方式吗 18 楼 234390216 2013-06-26 huangshurenjk 写道写得挺好,能说说实际应用中消息丢失的情况和应对方式吗
如果是在接收过程中的消息丢失的话,那属于整合后事务控制的范围,这个在以后会讲到。如果是发送时候的消息丢失的话,那就属于对JMS的讨论了,不在本文讨论范围之内。http://www.myexception.cn/software-architecture-design/899510.html这篇文章有讨论,希望对你有帮助。 19 楼 aa1asdasd 2013-06-26 MQ控制太输出
java.lang.IllegalStateException: Cannot remove session from connection that had not been registered: ID:user-PC-61504-13
72256221562-1:1
at org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:678)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:74)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:288)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:722)
WARN | Async error occurred: java.lang.IllegalStateException: Cannot remove a consumer from a connection that had not b
een registered: ID:user-PC-61504-1372256221562-1:1
java.lang.IllegalStateException: Cannot remove a consumer from a connection that had not been registered: ID:user-PC-615
04-1372256221562-1:1
at org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:637)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:76)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:288)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:722)
WARN | Transport Connection to: tcp://127.0.0.1:61505 failed: java.net.SocketException: Software caused connection abor
t: recv failed 20 楼 yzsunlight 2013-06-26 JMS 一般用在哪里 21 楼 yzsunlight 2013-06-26 JMS 一般用在哪里,能举个具体的实际例子吗 22 楼 234390216 2013-06-27 aa1asdasd 写道MQ控制太输出
java.lang.IllegalStateException: Cannot remove session from connection that had not been registered: ID:user-PC-61504-13
72256221562-1:1
at org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:678)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:74)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:288)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:722)
WARN | Async error occurred: java.lang.IllegalStateException: Cannot remove a consumer from a connection that had not b
een registered: ID:user-PC-61504-1372256221562-1:1
java.lang.IllegalStateException: Cannot remove a consumer from a connection that had not been registered: ID:user-PC-615
04-1372256221562-1:1
at org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:637)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:76)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:288)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:722)
WARN | Transport Connection to: tcp://127.0.0.1:61505 failed: java.net.SocketException: Software caused connection abor
t: recv failed
我回头研究一下。 23 楼 234390216 2013-06-27 yzsunlight 写道JMS 一般用在哪里,能举个具体的实际例子吗
JMS就是生产者与消费者模式。消费者负责消费生产者产生的消息。通过JMS可以做后台的异步操作,应用到具体工作中的话,有用它来发内部消息的、发邮件的、发短信的,做大操作时在后台做异步操作的。 24 楼 yzsunlight 2013-06-27 234390216 写道yzsunlight 写道JMS 一般用在哪里,能举个具体的实际例子吗
JMS就是生产者与消费者模式。消费者负责消费生产者产生的消息。通过JMS可以做后台的异步操作,应用到具体工作中的话,有用它来发内部消息的、发邮件的、发短信的,做大操作时在后台做异步操作的。
跟task 有什么区别 25 楼 234390216 2013-06-27 yzsunlight 写道234390216 写道yzsunlight 写道JMS 一般用在哪里,能举个具体的实际例子吗
JMS就是生产者与消费者模式。消费者负责消费生产者产生的消息。通过JMS可以做后台的异步操作,应用到具体工作中的话,有用它来发内部消息的、发邮件的、发短信的,做大操作时在后台做异步操作的。
跟task 有什么区别
你的task一般是定时任务,而这个JMS是在有需要的时候由人工发送消息;另外我们发送的消息可以当做参数,消费者接收到消息之后可以根据消息内容的不同做不同的事情。