读书人

ActiveMQ 定时支配的使用查看任务消息

发布时间: 2012-11-03 10:57:43 作者: rapoo

ActiveMQ 定时调度的使用查看任务消息

?? 当我们发送消息之后,如果想查看当前有多少消息尚未发送可以自己写一些代码实现,可以写一些定时获取任务或者发送消息的实现,必须使用SchedulerBroker,JobScheduler,测试实现如下:

package easyway.app.activemq.demo.schedules;import java.io.File;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ScheduledMessage;import org.apache.activemq.broker.BrokerService;import org.apache.activemq.broker.scheduler.Job;import org.apache.activemq.broker.scheduler.JobScheduler;import org.apache.activemq.broker.scheduler.SchedulerBroker;import org.apache.activemq.command.ActiveMQDestination;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.apache.activemq.util.IOHelper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.core.JmsTemplate;public class ActiveMQCronSchedulere {  private static final Logger LOG = LoggerFactory.getLogger(ActiveMQCronSchedulere.class);  protected String bindAddress = "vm://localhost";   protected BrokerService broker;    // protected String bindAddress = "tcp://localhost:61616";    protected ConnectionFactory connectionFactory;    protected boolean useTopic;    protected ActiveMQDestination destination;    protected JmsTemplate template;            protected BrokerService createBroker() throws Exception {        return createBroker(true);    }    protected boolean isPersistent() {        return false;    }    protected BrokerService createBroker(boolean delete) throws Exception {        File schedulerDirectory = new File("data/scheduler");        if (delete) {            IOHelper.mkdirs(schedulerDirectory);            IOHelper.deleteChildren(schedulerDirectory);        }        BrokerService answer = new BrokerService();        answer.setPersistent(isPersistent());        answer.getManagementContext().setCreateConnector(false);        answer.setDeleteAllMessagesOnStartup(true);        answer.setDataDirectory("data");        answer.setSchedulerDirectoryFile(schedulerDirectory);        answer.setSchedulerSupport(true);        answer.setUseJmx(false);        answer.addConnector(bindAddress);        return answer;    }    protected ConnectionFactory createConnectionFactory() throws Exception {        return new ActiveMQConnectionFactory(bindAddress);    }    protected Connection createConnection() throws Exception {        return createConnectionFactory().createConnection();    }        protected ActiveMQDestination createDestination() {        return createDestination(getDestinationString());    }    protected ActiveMQDestination createDestination(String subject) {        if (useTopic) {            return new ActiveMQTopic(subject);        } else {            return new ActiveMQQueue(subject);        }    }    protected String getDestinationString() {        return getClass().getName() + "." + "Queue";    }    protected JmsTemplate createJmsTemplate() {        return new JmsTemplate(connectionFactory);    }        protected void startBroker() throws Exception {        broker.start();    }    public void init() throws Exception{       if (broker == null) {               broker = createBroker();           }           startBroker();           connectionFactory = createConnectionFactory();           destination = createDestination();           template = createJmsTemplate();           template.setDefaultDestination(destination);           template.setPubSubDomain(useTopic);           template.afterPropertiesSet();    }    public void cron() throws Exception {        final int COUNT = 10;        final AtomicInteger count = new AtomicInteger();        Connection connection = createConnection();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        MessageConsumer consumer = session.createConsumer(destination);        final CountDownLatch latch = new CountDownLatch(COUNT);        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                latch.countDown();                count.incrementAndGet();                TextMessage  txtmessage=(TextMessage)message;        try {System.out.println("activemq receive message "+txtmessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}            LOG.debug("Received one Message, count is at: " + count.get());            }        });        connection.start();        for (int i = 0; i < COUNT; i++) {        MessageProducer producer = session.createProducer(destination);        TextMessage message = session.createTextMessage("activemq send  test message  "+i);        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");        producer.send(message);        producer.close();        //wait a couple sec so cron start time is different for next message            Thread.sleep(100);        }        SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);        JobScheduler js = sb.getJobScheduler();        List<Job> list = js.getAllJobs();        System.out.println("COUNT ="+COUNT+" list.size()="+list.size());        latch.await(2, TimeUnit.MINUTES);        //All should messages should have been received by now        System.out.println("COUNT ="+COUNT+" count.get()="+count.get());    }public static void main(String[] args) throws Exception {ActiveMQCronSchedulere test=new ActiveMQCronSchedulere();test.init();test.cron();}        }

?

读书人网 >编程

热点推荐