读书人

使用 ActiveMQ 示范

发布时间: 2013-09-09 20:31:09 作者: rapoo

使用 ActiveMQ 示例

企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。

在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。还有一个比较热的是 RabbitMQ (是 erlang 语言实现的)。这里示例下使用 ActiveMQ

用 ActiveMQ 最好还是了解下 JMS

JMS 公共点对点域发布/订阅域ConnectionFactoryQueueConnectionFactoryTopicConnectionFactoryConnectionQueueConnectionTopicConnectionDestinationQueueTopicSessionQueueSessionTopicSessionMessageProducerQueueSenderTopicPublisherMessageConsumerQueueReceiverTopicSubscriber

JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。

ConnectionFactory 是连接工厂,负责创建Connection。

Connection 负责创建 Session。

Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

Destination 是消息的目的地。

详细的可以网上找些 JMS 规范(有中文版)。

下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html, 解压,然后双击 bin/activemq.bat。运行后,可以在?http://localhost:8161/admin?观察。也有 demo,http://localhost:8161/demo。 把 activemq-all-5.3.0.jar 加入 classpath。

Jms 发送 代码:

  1. public?static?void?main(String[]?args)?throws?Exception?{??
  2. ????ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();??
  3. ??
  4. ????Connection?connection?=?connectionFactory.createConnection();??
  5. ????connection.start();??
  6. ??
  7. ????Session?session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);??
  8. ????Destination?destination?=?session.createQueue("my-queue");??
  9. ??
  10. ????MessageProducer?producer?=?session.createProducer(destination);??
  11. ????for(int?i=0;?i<3;?i++)?{??
  12. ????????MapMessage?message?=?session.createMapMessage();??
  13. ????????message.setLong("count",?new?Date().getTime());??
  14. ????????Thread.sleep(1000);??
  15. ????????//通过消息生产者发出消息??
  16. ????????producer.send(message);??
  17. ????}??
  18. ????session.commit();??
  19. ????session.close();??
  20. ????connection.close();??
  21. }??

Jms 接收代码:

  1. public?static?void?main(String[]?args)?throws?Exception?{??
  2. ????ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();??
  3. ??
  4. ????Connection?connection?=?connectionFactory.createConnection();??
  5. ????connection.start();??
  6. ??
  7. ????final?Session?session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);??
  8. ????Destination?destination?=?session.createQueue("my-queue");??
  9. ??
  10. ????MessageConsumer?consumer?=?session.createConsumer(destination);??
  11. ????/*//listener?方式?
  12. ????consumer.setMessageListener(new?MessageListener()?{?
  13. ?
  14. ????????public?void?onMessage(Message?msg)?{?
  15. ????????????MapMessage?message?=?(MapMessage)?msg;?
  16. ????????????//TODO?something....?
  17. ????????????System.out.println(" 收到消息:"?+?new?Date(message.getLong("count")));?
  18. ????????????session.commit();?
  19. ????????}?
  20. ?
  21. ????});?
  22. ????Thread.sleep(30000);?
  23. ????*/??
  24. ????int?i=0;??
  25. ????while(i<3)?{??
  26. ????????i++;??
  27. ????????MapMessage?message?=?(MapMessage)?consumer.receive();??
  28. ????????session.commit();??
  29. ??
  30. ????????//TODO?something....??
  31. ????????System.out.println("收到消 息:"?+?new?Date(message.getLong("count")));??
  32. ????}??
  33. ??
  34. ????session.close();??
  35. ????connection.close();??
  36. }??

启动 JmsReceiver 和 JmsSender 可以在看输出三条时间信息。当然 Jms 还指定有其它格式的数据,如 TextMessage

结合 Spring 的 JmsTemplate 方便用:

xml:

  1. <?xml?version="1.0"?encoding="UTF-8"?>??
  2. <beans?xmlns="http://www.springframework.org/schema/beans"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"??
  3. ????????xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">??
  4. ??
  5. <!--?在非?web?/?ejb?容器中使用?pool?时,要手动?stop,spring?不会为 你执行?destroy-method?的方法??
  6. ????<bean?id="jmsFactory"?class="org.apache.activemq.pool.PooledConnectionFactory"?destroy-method="stop">??
  7. ????????<property?name="connectionFactory">??
  8. ????????????<bean?class="org.apache.activemq.ActiveMQConnectionFactory">??
  9. ????????????????<property?name="brokerURL"?value="tcp://localhost:61616"?/>??
  10. ????????????</bean>??
  11. ????????</property>??
  12. ????</bean>??
  13. -->??
  14. ????<bean?id="jmsFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
  15. ????????<property?name="brokerURL"?value="tcp://localhost:61616"?/>??
  16. ????</bean>??
  17. ????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
  18. ????????<property?name="connectionFactory"?ref="jmsFactory"?/>??
  19. ????????<property?name="defaultDestination"?ref="destination"?/>??
  20. ????????<property?name="messageConverter">??
  21. ????????????<bean?class="org.springframework.jms.support.converter.SimpleMessageConverter"?/>??
  22. ????????</property>??
  23. ????</bean>??
  24. ??
  25. ????<bean?id="destination"?class="org.apache.activemq.command.ActiveMQQueue">??
  26. ????????<constructor-arg?index="0"?value="my-queue"?/>??
  27. ????</bean>??
  28. ??
  29. </beans>??

sender:

  1. public?static?void?main(String[]?args)?{??
  2. ????ApplicationContext?ctx?=?new?FileSystemXmlApplicationContext("classpath:app*.xml");??
  3. ??
  4. ????JmsTemplate?jmsTemplate?=?(JmsTemplate)?ctx.getBean("jmsTemplate");??
  5. ??
  6. ????jmsTemplate.send(new?MessageCreator()?{??
  7. ??
  8. ????????public?Message?createMessage(Session?session)?throws?JMSException?{??
  9. ????????????MapMessage?mm?=?session.createMapMessage();??
  10. ????????????mm.setLong("count",?new?Date().getTime());??
  11. ????????????return?mm;??
  12. ????????}??
  13. ??
  14. ????});??
  15. }??

receiver:

  1. public?static?void?main(String[]?args)?{??
  2. ????ApplicationContext?ctx?=?new?FileSystemXmlApplicationContext("classpath:app*.xml");??
  3. ??
  4. ????JmsTemplate?jmsTemplate?=?(JmsTemplate)?ctx.getBean("jmsTemplate");??
  5. ????while(true)?{??
  6. ????????Map<String,?Object>?mm?=??(Map<String,?Object>)?jmsTemplate.receiveAndConvert();??
  7. ????????System.out.println("收到消 息:"?+?new?Date((Long)mm.get("count")));??
  8. ????}??
  9. }??

注意:直接用 Jms 接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath

读书人网 >软件架构设计

热点推荐