ActiveMQ基于Queue的发送消息模式
???? activemq既然是基于jms的就存在所谓的点对点模式和发布订阅模式,下面编写基于queue的发送消息方式。
启动activemq的broker进程。
代码如下:
package easyway.app.activemq.demo14;import javax.jms.JMSException;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueSender;import javax.jms.QueueSession;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * ActiveMQ的Queue消息队列的 * @author longgangbai * */public final class SimpleQueueSender { private static final Logger LOG = LoggerFactory.getLogger(SimpleQueueSender.class); private SimpleQueueSender() { } public static void main(String[] args) { String queueName = "activemqqueue"; QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null; QueueSession queueSession = null; Queue queue = null; QueueSender queueSender = null; TextMessage message = null; final int numMsgs=10; try { //创建链接工厂 queueConnectionFactory=new ActiveMQConnectionFactory(); //创建连接 queueConnection = queueConnectionFactory.createQueueConnection(); //创建会话 queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列 queue = queueSession.createQueue(queueName); //创建消息发送者 queueSender = queueSession.createSender(queue); message = queueSession.createTextMessage(); for (int i = 0; i < numMsgs; i++) { message.setText("This is message " + (i + 1)); LOG.info("Sending message: " + message.getText()); queueSender.send(message); } //发送消息 queueSender.send(queueSession.createMessage()); } catch (JMSException e) { LOG.info("Exception occurred: " + e.toString()); } finally { if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { } } } }}?
package easyway.app.activemq.demo14;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueReceiver;import javax.jms.QueueSession;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * ActiveMQ的Queue消息队列的 * @author longgangbai * */public final class SimpleQueueReceiver { private static final Logger LOG = LoggerFactory.getLogger(SimpleQueueReceiver.class); private SimpleQueueReceiver() { } public static void main(String[] args) { String queueName = "activemqqueue"; QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null; QueueSession queueSession = null; Queue queue = null; QueueReceiver queueReceiver = null; TextMessage message = null; try { //创建连接工厂 queueConnectionFactory=new ActiveMQConnectionFactory(); //创建连接 queueConnection = queueConnectionFactory.createQueueConnection(); //创建连接会话 queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列 queue = queueSession.createQueue(queueName); //创建消息接受者 queueReceiver = queueSession.createReceiver(queue); queueConnection.start(); while (true) { Message m = queueReceiver.receive(1); if (m != null) { if (m instanceof TextMessage) { message = (TextMessage)m; LOG.info("Reading message: " + message.getText()); } else { break; } } } } catch (JMSException e) { LOG.info("Exception occurred: " + e.toString()); } finally { if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { } } } }}?