读书人

ActiveMQ运用笔记(三)ActiveMQ消息发

发布时间: 2012-10-25 10:58:58 作者: rapoo

ActiveMQ使用笔记(三)ActiveMQ消息发送与接收

?

配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:

ActiveMQ运用笔记(三)ActiveMQ消息发送与接收

一段发送消息的代码:

package send;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class SendTest {public static void send(){    try {            // 创建一个连接工厂            String url = "tcp://localhost:61616";            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);            // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置            connectionFactory.setUserName("system");            connectionFactory.setPassword("manager");            // 创建连接            Connection connection = connectionFactory.createConnection();            connection.start();            // 创建Session,参数解释:            // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。            // 第二个参数消息的确认模式:            // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。            // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)            // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // 创建目标,就创建主题也可以创建队列            Destination destination = session.createQueue("test");            // 创建消息生产者            MessageProducer producer = session.createProducer(destination);            // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT            producer.setDeliveryMode(DeliveryMode.PERSISTENT);            // 创建消息            String text = "Hello ActiveMQ!";            TextMessage message = session.createTextMessage(text);            // 发送消息到ActiveMQ            producer.send(message);            System.out.println("Message is sent!");            // 关闭资源            session.close();            connection.close();        }        catch (Exception e) {            e.printStackTrace();        }    }public static void main(String[] args) {new SendTest().send();}}

?执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:

ActiveMQ运用笔记(三)ActiveMQ消息发送与接收

?

?

点击队列名test,然后点击消息ID即可查看消息内容,如图:

ActiveMQ运用笔记(三)ActiveMQ消息发送与接收

?

?

?

如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:

?

?

?

package send;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class GetTest {public static void get(){    try{        String url = "tcp://localhost:61616";        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);        // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置        connectionFactory.setUserName("system");        connectionFactory.setPassword("manager");        // 创建连接        Connection connection = connectionFactory.createConnection();        connection.start();        // 创建Session        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建目标,就创建主题也可以创建队列        Destination destination = session.createQueue("test");        // 创建消息消费者        MessageConsumer consumer = session.createConsumer(destination);        // 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null        Message message = consumer.receive(1000);        if (message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            String text = textMessage.getText();            System.out.println("Received: " + text);        } else {            System.out.println("Received: " + message);        }        consumer.close();        session.close();        connection.close();    } catch (Exception e) {        e.printStackTrace();    }}public static void main(String[] args) {new GetTest().get();}}

?执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:

ActiveMQ运用笔记(三)ActiveMQ消息发送与接收

?

这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。

读书人网 >开源软件

热点推荐