ActiveMQ 定时调度的使用查看任务消息
?? 当我们发送消息之后,如果想查看当前有多少消息尚未发送可以自己写一些代码实现,可以写一些定时获取任务或者发送消息的实现,必须使用SchedulerBroker,JobScheduler,测试实现如下:
package easyway.app.activemq.demo.schedules;import java.io.File;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ScheduledMessage;import org.apache.activemq.broker.BrokerService;import org.apache.activemq.broker.scheduler.Job;import org.apache.activemq.broker.scheduler.JobScheduler;import org.apache.activemq.broker.scheduler.SchedulerBroker;import org.apache.activemq.command.ActiveMQDestination;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.apache.activemq.util.IOHelper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.core.JmsTemplate;public class ActiveMQCronSchedulere { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQCronSchedulere.class); protected String bindAddress = "vm://localhost"; protected BrokerService broker; // protected String bindAddress = "tcp://localhost:61616"; protected ConnectionFactory connectionFactory; protected boolean useTopic; protected ActiveMQDestination destination; protected JmsTemplate template; protected BrokerService createBroker() throws Exception { return createBroker(true); } protected boolean isPersistent() { return false; } protected BrokerService createBroker(boolean delete) throws Exception { File schedulerDirectory = new File("data/scheduler"); if (delete) { IOHelper.mkdirs(schedulerDirectory); IOHelper.deleteChildren(schedulerDirectory); } BrokerService answer = new BrokerService(); answer.setPersistent(isPersistent()); answer.getManagementContext().setCreateConnector(false); answer.setDeleteAllMessagesOnStartup(true); answer.setDataDirectory("data"); answer.setSchedulerDirectoryFile(schedulerDirectory); answer.setSchedulerSupport(true); answer.setUseJmx(false); answer.addConnector(bindAddress); return answer; } protected ConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory(bindAddress); } protected Connection createConnection() throws Exception { return createConnectionFactory().createConnection(); } protected ActiveMQDestination createDestination() { return createDestination(getDestinationString()); } protected ActiveMQDestination createDestination(String subject) { if (useTopic) { return new ActiveMQTopic(subject); } else { return new ActiveMQQueue(subject); } } protected String getDestinationString() { return getClass().getName() + "." + "Queue"; } protected JmsTemplate createJmsTemplate() { return new JmsTemplate(connectionFactory); } protected void startBroker() throws Exception { broker.start(); } public void init() throws Exception{ if (broker == null) { broker = createBroker(); } startBroker(); connectionFactory = createConnectionFactory(); destination = createDestination(); template = createJmsTemplate(); template.setDefaultDestination(destination); template.setPubSubDomain(useTopic); template.afterPropertiesSet(); } public void cron() throws Exception { final int COUNT = 10; final AtomicInteger count = new AtomicInteger(); Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); final CountDownLatch latch = new CountDownLatch(COUNT); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { latch.countDown(); count.incrementAndGet(); TextMessage txtmessage=(TextMessage)message; try {System.out.println("activemq receive message "+txtmessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();} LOG.debug("Received one Message, count is at: " + count.get()); } }); connection.start(); for (int i = 0; i < COUNT; i++) { MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("activemq send test message "+i); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); producer.send(message); producer.close(); //wait a couple sec so cron start time is different for next message Thread.sleep(100); } SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class); JobScheduler js = sb.getJobScheduler(); List<Job> list = js.getAllJobs(); System.out.println("COUNT ="+COUNT+" list.size()="+list.size()); latch.await(2, TimeUnit.MINUTES); //All should messages should have been received by now System.out.println("COUNT ="+COUNT+" count.get()="+count.get()); }public static void main(String[] args) throws Exception {ActiveMQCronSchedulere test=new ActiveMQCronSchedulere();test.init();test.cron();} }
?