读书人

运用ActiveMQ的来传送文件

发布时间: 2012-06-30 17:20:12 作者: rapoo

使用ActiveMQ的来传送文件

1. ActiveMQ 报出这样的信息:

运用ActiveMQ的来传送文件INFO?|?Usage?Manager?memory?limit?(1048576)?reached?for?topic://EXCHANGE.FILE. Producers?will?be?throttled?to?the?rate?at?which?messages?are?removed?from?this
运用ActiveMQ的来传送文件destination?to?prevent?flooding?it.?See?http://activemq.apache.org/producer-flow-control.html?for?more?info

2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?

使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。

客户端收到内容后,根据命令将内容合并到一个文件中。?

package?org.apache.activemq.exchange.file;

import?java.io.BufferedOutputStream;
import?java.io.FileOutputStream;
import?java.io.IOException;

import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.StreamMessage;

import?org.apache.activemq.ActiveMQConnectionFactory;

public?class?Consumer?{

????/**
?????*?@param?args
?????*/
????public?static?void?main(String[]?args)?throws?JMSException,?IOException?{
????????ConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");

????????Connection?connection?=?factory.createConnection();
????????connection.start();

????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);

????????Destination?destination?=?session.createTopic("EXCHANGE.FILE");

????????MessageConsumer?consumer?=?session.createConsumer(destination);

????????boolean?appended?=?false;
????????try?{
????????????while?(true)?{
????????????????Message?message?=?consumer.receive(5000);
????????????????if?(message?==?null)?{
????????????????????continue;
????????????????}

????????????????if?(message?instanceof?StreamMessage)?{
????????????????????StreamMessage?streamMessage?=?(StreamMessage)?message;
????????????????????String?command?=?streamMessage.getStringProperty("COMMAND");
????????????????????
????????????????????if?("start".equals(command))?{
????????????????????????appended?=?false;
????????????????????????continue;
????????????????????}

????????????????????if?("sending".equals(command))?{
????????????????????????byte[]?content?=?new?byte[4096];
????????????????????????String?file_name?=?message.getStringProperty("FILE_NAME");
????????????????????????BufferedOutputStream?bos?=?null;
????????????????????????bos?=?new?BufferedOutputStream(new?FileOutputStream("c:/"?+?file_name,?appended));
????????????????????????if?(!appended)?{
????????????????????????????appended?=?true;
????????????????????????}
????????????????????????while?(streamMessage.readBytes(content)?>?0)?{
????????????????????????????bos.write(content);
????????????????????????}
????????????????????????bos.close();
????????????????????????continue;
????????????????????}

????????????????????if?("end".equals(command))?{
????????????????????????appended?=?false;
????????????????????????continue;
????????????????????}
????????????????}
????????????}
????????}?catch?(JMSException?e)?{
????????????throw?e;
????????}?finally?{
????????????if?(connection?!=?null)?{
????????????????connection.close();
????????????}
????????}

????}

}


发送端将文件分包,逐次发送到客户端?

package?org.apache.activemq.exchange.file;

import?java.io.BufferedInputStream;
import?java.io.IOException;
import?java.io.InputStream;

import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.MessageProducer;
import?javax.jms.Session;
import?javax.jms.StreamMessage;

import?org.apache.activemq.ActiveMQConnectionFactory;

public?class?Publisher?{

????public?static?String?FILE_NAME?=?"01.mp3";
????
????public?static?void?main(String[]?args)?throws?JMSException,?IOException?{
????????ConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");
????????Connection?connection?=?factory.createConnection();
????????connection.start();
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????Destination?destination?=?session.createTopic("EXCHANGE.FILE");????????
????????MessageProducer?producer?=?session.createProducer(destination);
????????long?time?=?System.currentTimeMillis();
????????
????????//通知客户端开始接受文件
????????StreamMessage?message?=?session.createStreamMessage();
????????message.setStringProperty("COMMAND",?"start");
????????producer.send(message);
????????
????????//开始发送文件
????????byte[]?content?=?new?byte[4096];
????????InputStream?ins?=?Publisher.class.getResourceAsStream(FILE_NAME);
????????BufferedInputStream?bins?=?new?BufferedInputStream(ins);
????????while?(bins.read(content)?>?0)?{
????????????//
????????????message?=?session.createStreamMessage();
????????????message.setStringProperty("FILE_NAME",?FILE_NAME);
????????????message.setStringProperty("COMMAND",?"sending");
????????????message.clearBody();
????????????message.writeBytes(content);
????????????producer.send(message);
????????}
????????bins.close();
????????ins.close();
????????
????????//通知客户端发送完毕
????????message?=?session.createStreamMessage();
????????message.setStringProperty("COMMAND",?"end");
????????producer.send(message);
????????
????????connection.close();
????????
????????System.out.println("Total?Time?costed?:?"?+?(System.currentTimeMillis()?-?time)?+?"?mili?seconds");
????}
}

读书人网 >开源软件

热点推荐