读书人

ActiveMQ模式:Queue与Topic 详解 教程

发布时间: 2012-12-21 12:03:49 作者: rapoo

ActiveMQ方式:Queue与Topic 详解 教程 加入代码解释说明
一、特性及优势


1、实现 JMS1.1 规范,支持 J2EE1.4以上
2、可运行于任何 jvm和大部分 web 容器(ActiveMQ works great in any JVM)
3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT 等等)
4、支持多种协议(stomp,openwire,REST)
5、良好的 spring 支持(ActiveMQ has great Spring Support)
6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than
JBossMQ.)
7、与 OpenJMS、JbossMQ等开源jms provider 相比,ActiveMQ有 Apache 的支
持,持续发展的优势明显。



二、下载部署

1、下载
http://activemq.apache.org/activemq-510-release.html ,下载 5.1.0 Windows
Distribution版本
2、安装
直接解压至任意目录(如:d:\ apache-activemq-5.1.0)
3、启动 ActiveMQ服务器
方法 1:
直接运行 bin\activemq.bat
方法 2(在 JVM 中嵌套启动):
cd example
ant embedBroker
4、ActiveMQ消息管理后台系统:
http://localhost:8161/admin





三、运行附带的示例程序

1、Queue 消息示例:(点对点)
* 启动 Queue 消息消费者
cd example ant consumer
* 启动 Queue 消息生产者
cd example
ant producer
简要说明:生产者(producer)发消息,消费者(consumer)接消息,发送/接
收 2000 个消息后自动关闭
2、Topic 消息示例:(群组订阅)
* 启动 Topic 消息消费者
cd example
ant topic-listener
* 启动 Topic 消息生产者
cd example
ant topic-publisher
简要说明:重复 10 轮,publisher每轮发送2000 个消息,并等待获取 listener
的处理结果报告,然后进入下一轮发送,最后统计全局发送时间。


四、Queue与 Topic 的比较


1、JMS Queue 执行 load balancer语义:
一条消息仅能被一个 consumer(消费者) 收到。如果在 message 发送的时候没有可用的
consumer,那么它将被保存一直到能处理该 message 的 consumer 可用。如果一
个 consumer 收到一条 message 后却不响应它,那么这条消息将被转到另一个
consumer 那儿。一个 Queue 可以有很多 consumer,并且在多个可用的 consumer
中负载均衡。





注:


点对点消息传递域的特点如下:
? 每个消息只能有一个消费者。
? 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发
送消息的时候是否处于运行状态,它都可以提取消息。


2、Topic 实现 publish和 subscribe 语义:
一条消息被 publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber
将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的
subscriber能够获得消息的一个拷贝。



注:



发布/订阅消息传递域的特点如下:
? 每个消息可以有多个消费者。
? 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费
自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程
度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激
活状态时发送的消息。


3、分别对应两种消息模式:
Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者) 其中在 Publicher/Subscriber 模式下又有Nondurable subscription(非持久订阅)
和 durable subscription (持久化订阅)2种消息处理方式(支持离线消息)。



注:

在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递
域中,目的地被成为主题(topic)。


五、Point-to-Point (点对点)消息模式开发流程


1、生产者(producer)开发流程(ProducerTool.java):

1.1 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。


据 url,user 和 password 创建一个 jms Connection。


Java代码

1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);     2.             connection = connectionFactory.createConnection();     3.             connection.start();  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);            connection = connectionFactory.createConnection();            connection.start();






1.2 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。

Java代码

   1. Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);  Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 



1.3 创建 Destination对象:
需指定其对应的主题(subject)名称,producer 和 consumer 将根据 subject
来发送/接收对应的消息。


Java代码

  1. if (topic) {     2.     destination = session.createTopic(subject);     3. } else {     4.     destination = session.createQueue(subject);     5. }              if (topic) {                destination = session.createTopic(subject);            } else {                destination = session.createQueue(subject);            }





1.4 创建 MessageProducer:
根据 Destination创建MessageProducer 对象,同时设置其持久模式。


Java代码

 1. MessageProducer producer = session.createProducer(destination);                        MessageProducer producer = session.createProducer(destination);          

1.5 发送消息到队列(Queue):
封装 TextMessage 消息, 使用 MessageProducer 的 send 方法将消息发送出去。


Java代码

1. TextMessage message = session.createTextMessage(createMessageText(i));     2. producer.send(message);              TextMessage message = session.createTextMessage(createMessageText(i));            producer.send(message);




2、消费者(consumer)开发流程(ConsumerTool.java):
2.1 实现 MessageListener 接口:
消费者类必须实现MessageListener 接口,然后在onMessage()方法中监听消息的
到达并处理。


Java代码

1. public class ConsumerTool extends Thread implements MessageListener, ExceptionListener  public class ConsumerTool extends Thread implements MessageListener, ExceptionListener


实现 onMessage(Message message)方法,实现监听消息的到达


2.2 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection,如果是durable 模式,
还需要给 connection设置一个 clientId。


Java代码
   1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);     2. Connection connection = connectionFactory.createConnection();     3. //是否是 durable 模式.(离线消息持久化支持)      4. if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {     5.     connection.setClientID(clientId);     6. }     7. connection.setExceptionListener(this);     8. connection.start();              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);            Connection connection = connectionFactory.createConnection();            //是否是 durable 模式.(离线消息持久化支持)             if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {                connection.setClientID(clientId);            }            connection.setExceptionListener(this);            connection.start(); 



2.3 创建 Session 和 Destination:
与 ProducerTool.java 中的流程类似,不再赘述。


Java代码

 1. session = connection.createSession(transacted, ackMode);   session = connection.createSession(transacted, ackMode); 

2.4 创建 replyProducer【可选】:
可以用来将消息处理结果发送给 producer。

2.5 创建 MessageConsumer:
根据 Destination创建MessageConsumer 对象。


Java代码
   1. MessageConsumer consumer = null;     2. if (durable && topic) {     3.     consumer = session.createDurableSubscriber((Topic) destination, consumerName);     4. } else {     5.     consumer = session.createConsumer(destination);     6. }              MessageConsumer consumer = null;            if (durable && topic) {                consumer = session.createDurableSubscriber((Topic) destination, consumerName);            } else {                consumer = session.createConsumer(destination);            } 




2.6 消费 message:
在 onMessage()方法中接收producer 发送过来的消息进行处理,并可以通过
replyProducer 反馈信息给 producer



Java代码

 1. if (message.getJMSReplyTo() != null) {     2.     replyProducer.send(message.getJMSReplyTo()     3.                                    , session.createTextMessage("Reply: "      4.                                                                              + message.getJMSMessageID()));     5. }              if (message.getJMSReplyTo() != null) {                replyProducer.send(message.getJMSReplyTo()                                               , session.createTextMessage("Reply: "                                                                                          + message.getJMSMessageID()));            }




六、Publisher/Subscriber(发布/订阅者)消息模式开发流程


1、订阅者(Subscriber)开发流程(TopicListener.java):
1.1 实现 MessageListener 接口:
在 onMessage()方法中监听发布者发出的消息队列,并做相应处理。


Java代码
   1. public void onMessage(Message message) {     2.     if (checkText(message, "SHUTDOWN")) {     3.      4.         try {     5.             connection.close();     6.         } catch (Exception e) {     7.             e.printStackTrace(System.out);     8.         }     9.     10.     } else if (checkText(message, "REPORT")) {    11.         // send a report:    12.         try {    13.             long time = System.currentTimeMillis() - start;    14.             String msg = "Received " + count + " in " + time + "ms";    15.             producer.send(session.createTextMessage(msg));    16.         } catch (Exception e) {    17.             e.printStackTrace(System.out);    18.         }    19.         count = 0;    20.     21.     } else {    22.     23.         if (count == 0) {    24.             start = System.currentTimeMillis();    25.         }    26.     27.         if (++count % 1000 == 0) {    28.             System.out.println("Received " + count + " messages.");    29.         }    30.     }    31. }      public void onMessage(Message message) {        if (checkText(message, "SHUTDOWN")) {            try {                connection.close();            } catch (Exception e) {                e.printStackTrace(System.out);            }        } else if (checkText(message, "REPORT")) {            // send a report:            try {                long time = System.currentTimeMillis() - start;                String msg = "Received " + count + " in " + time + "ms";                producer.send(session.createTextMessage(msg));            } catch (Exception e) {                e.printStackTrace(System.out);            }            count = 0;        } else {            if (count == 0) {                start = System.currentTimeMillis();            }            if (++count % 1000 == 0) {                System.out.println("Received " + count + " messages.");            }        }    } 


1.2 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。



Java代码

   1. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);     2. connection = factory.createConnection();          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);        connection = factory.createConnection();





1.3 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。


Java代码
   1. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 



1.4 创建 Topic:
创建 2 个Topic, topictest.messages用于接收发布者发出的消息,
topictest.control用于向发布者发送消息,实现双方的交互。


Java代码

  1. topic = session.createTopic("topictest.messages");     2. control = session.createTopic("topictest.control");          topic = session.createTopic("topictest.messages");        control = session.createTopic("topictest.control");





1.5 创建 consumer 和 producer 对象:
根据topictest.messages创建consumer,根据topictest.control创建producer。


Java代码

   1. MessageConsumer consumer = session.createConsumer(topic);//创建消费者     2. consumer.setMessageListener(this);     3.      4. connection.start();     5.      6. producer = session.createProducer(control);//创建生产者          MessageConsumer consumer = session.createConsumer(topic);//创建消费者        consumer.setMessageListener(this);        connection.start();        producer = session.createProducer(control);//创建生产者 


1.6 接收处理消息:
在 onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消
息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作
等等),并且可以使用 producer对象将处理结果返回给发布者。


Java代码

 1. //可以先检查消息类型     2. ate static boolean checkText(Message m, String s) {     3.     try {     4.         return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);     5.     } catch (JMSException e) {     6.         e.printStackTrace(System.out);     7.         return false;     8.     }     9. }      //可以先检查消息类型private static boolean checkText(Message m, String s) {        try {            return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);        } catch (JMSException e) {            e.printStackTrace(System.out);            return false;        }    }



Java代码

  1. //然后     2.   if (checkText(message, "SHUTDOWN")) {     3.      4.           //关机     5.      6.         } else if (checkText(message, "REPORT")) {     7.             // 打印     8.                9.     10.         } else {    11.             //别的操作    12.              13.         }  //然后  if (checkText(message, "SHUTDOWN")) {          //关机        } else if (checkText(message, "REPORT")) {            // 打印                  } else {            //别的操作                 } 



2、发布者(Publisher)开发流程(TopicPublisher.java):


2.1 实现 MessageListener 接口:
在 onMessage()方法中接收订阅者的反馈消息。


Java代码
   1. public void onMessage(Message message) {     2.     synchronized (mutex) {     3.         System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");     4.         if (remaining == 0) {     5.             mutex.notify();     6.         }     7.     }     8. }      public void onMessage(Message message) {        synchronized (mutex) {            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");            if (remaining == 0) {                mutex.notify();            }        }    } 



2.2 创建 Connection:
根据 url 创建一个 jms Connection。


Java代码

 1. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);     2. connection = factory.createConnection();          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);        connection = factory.createConnection(); 



2.3 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。


Java代码
   1. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);




2.4 创建 Topic:
创建 2 个Topic,topictest.messages用于向订阅者发布消息,topictest.control用
于接收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应
的。


Java代码
   1. topic = session.createTopic("topictest.messages");     2. control = session.createTopic("topictest.control");          topic = session.createTopic("topictest.messages");        control = session.createTopic("topictest.control");




2.5 创建 consumer 和 producer 对象:
根据topictest.messages创建publisher;
根据topictest.control创建consumer,同时监听订阅者反馈的消息。


Java代码

  1. publisher = session.createProducer(topic);     2. publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式     3.      4. session.createConsumer(control).setMessageListener(this);//加入监听     5. connection.start();          publisher = session.createProducer(topic);        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式        session.createConsumer(control).setMessageListener(this);//加入监听        connection.start();




2.6 给所有订阅者发送消息,并接收反馈消息:
示例代码中,一共重复 10 轮操作。


Java代码
   1. for (int i = 0; i < batch; i++) {     2.     if (i > 0) {     3.         Thread.sleep(delay * 1000);     4.     }     5.     times[i] = batch(messages);     6.     System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");     7. }          for (int i = 0; i < batch; i++) {            if (i > 0) {                Thread.sleep(delay * 1000);            }            times[i] = batch(messages);            System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");        }




每轮先向所有订阅者发送 2000 个消息;


Java代码

 1. private long batch(int msgCount) throws Exception {     2.     long start = System.currentTimeMillis();     3.     remaining = subscribers;     4.     publish();     5.     waitForCompletion();     6.     return System.currentTimeMillis() - start;     7. }      private long batch(int msgCount) throws Exception {        long start = System.currentTimeMillis();        remaining = subscribers;        publish();        waitForCompletion();        return System.currentTimeMillis() - start;    }





Java代码

 1. private void publish() throws Exception {     2.      3.     // send events     4.     BytesMessage msg = session.createBytesMessage();     5.     msg.writeBytes(payload);     6.     for (int i = 0; i < messages; i++) {     7.         publisher.send(msg);     8.         if ((i + 1) % 1000 == 0) {     9.             System.out.println("Sent " + (i + 1) + " messages");    10.         }    11.     }    12.     13.     // request report    14.     publisher.send(session.createTextMessage("REPORT"));    15. }      private void publish() throws Exception {        // send events        BytesMessage msg = session.createBytesMessage();        msg.writeBytes(payload);        for (int i = 0; i < messages; i++) {            publisher.send(msg);            if ((i + 1) % 1000 == 0) {                System.out.println("Sent " + (i + 1) + " messages");            }        }        // request report        publisher.send(session.createTextMessage("REPORT"));    }



然后堵塞线程,开始等待;


Java代码
   1. private void waitForCompletion() throws Exception {     2.     System.out.println("Waiting for completion...");     3.     synchronized (mutex) {     4.         while (remaining > 0) {     5.             mutex.wait();//赌赛线程     6.         }     7.     }     8. }      private void waitForCompletion() throws Exception {        System.out.println("Waiting for completion...");        synchronized (mutex) {            while (remaining > 0) {                mutex.wait();//赌赛线程            }        }    }




最后通过 onMessage()方法,接收到订阅者反馈的“REPORT”类信息后,才
print 反馈信息并解除线程堵塞,进入下一轮。


Java代码

 1. public void onMessage(Message message) {     2.     synchronized (mutex) {     3.         System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");     4.         if (remaining == 0) {     5.             mutex.notify();//唤醒线程     6.         }     7.     }     8. }      public void onMessage(Message message) {        synchronized (mutex) {            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");            if (remaining == 0) {                mutex.notify();//唤醒线程            }        }    }





注:可同时运行多个订阅者测试查看此模式效果

读书人网 >编程

热点推荐