读书人

QX项目实战-8.ActiveMQ的Queue讯息和T

发布时间: 2012-11-23 00:03:43 作者: rapoo

QX项目实战-8.ActiveMQ的Queue消息和Topic消息

项目使用ActiveMQ传递消息,保证大量数据信息的同步任务。对于这种基础架构层面的东西必须进行测试和熟悉。前几天已经下载安装和测试了ActiveMQ服务器[1],对该服务有了直观认识,下面我们深入了解下ActiveMQ到底能完成什么样的功能。

首先测试Queue消息的生产者消费者问题,在cmd下进入ActiveMQ的example目录,运行ant consumer命令,编译执行消费者程序,消费者运行起来后,在等待传递2000条消息。在另外的cmd下也进入example目录,执行antproducer指令,编译执行生产者程序,生产者程序执行发送消息的命令,给ActiveMQ服务器发送了2000条消息。这时消息由消费者接收到。随后消费者和生产者先后完成、关闭程序。需要注意的是执行ant编译指令,必须安装apache-ant自动构建程序,参考[2-4]。

Queue消息是点对点模式消息传播机制,一条消息仅能被一个消费者收到,如果未处理,将会一直保存着。多个消费者的情况下,消费者会自动实现负载均衡。

再来测试Topic消息,Topic消息是发布者/订阅者模式,这种模式下,发布者的消息发布到服务器,订阅者接收到该消息的一份拷贝。发布者/订阅者模式有非持久订阅和持久订阅两种。本例中首先运行订阅者,在example目录下执行anttopic-listener命令执行topic消息消费者,提示接受消息。再在example下执行anttopic-publisher命令执行Topic消息生产者。生产的消息十轮分发给订阅者。

下面分析Queue消息的生产者ProducerTool.java代码:

ProducerTool代码实现了Thread线程类,其中的Run方法定义如下:

ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory(user, password, url);            Connectionconnection = connectionFactory.createConnection();            if (durable&& clientId != null && clientId.length() > 0 &&!"null".equals(clientId)) {               connection.setClientID(clientId);            }           connection.setExceptionListener(this);            connection.start();             session =connection.createSession(transacted, ackMode);            if (topic) {                destination =session.createTopic(subject);            } else {                destination =session.createQueue(subject);            }             replyProducer =session.createProducer(null);           replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);             MessageConsumerconsumer = null;            if (durable&& topic) {                consumer =session.createDurableSubscriber((Topic) destination, consumerName);            } else {                consumer =session.createConsumer(destination);            }             if (maxiumMessages> 0) {               consumeMessagesAndClose(connection, session, consumer);            } else {                if(receiveTimeOut == 0) {                   consumer.setMessageListener(this);                } else {                   consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);                }            }

参考

1. QX项目实战-7.ActiveMQ的安装与测试

2. Ant系统构建工具的使用

3. 使用Subversionant结合Subversion进行项目构建

4. Eclipse下用Ant运行JUnit

读书人网 >系统运维

热点推荐