读书人

activemq 发送资料(转)

发布时间: 2012-07-15 20:20:06 作者: rapoo

activemq 发送文件(转)

这里使用的 MQ 中间件是开源的 ActiveMQ,我们没有采用 BytesMessage 来按字节传送文件,而是 ActiveMQ 为我们提供了 org.apache.activemq.BlobMessage,可以用它来传送大对象。org.apache.activemq.ActiveMQSession 中有以下几个创建 BlobMessage 对象的方法:

createBlobMessage(URL url)
createBlobMessage(URL url, boolean deletedByBroker)
createBlobMessage(File file)
createBlobMessage(InputStream in)

接收到 BlobMessage 消息后,可以调用其 getInputStream() 方法获得数据,然后写成磁盘文件,文件名、文件大小等可通过 Message 的 getXxxProperty("Property.Name") 取的。

注意,传输入文件的时候,发送方创建 ConnectionFactory 时的 brokerURL 需要指定 jms.blobTransferPolicy.uploadUrl 或者jms.blobTransferPolicy.defaultUploadUrl 属性为 ActiveMQ 中 fileserver 应用的 URI,即指定传输 BlogMessage 的 BlobTransferPolicy 策略,参看 Configuring the BLOB Transfer Policy。
1. 启动 ActiveMQ

2. 编写发送文件的程序 FileSender.java

view sourceprint?01.package com.unmi.jms;

* 通过 ActiveMQ 发送文件的程序?
* @author Unmi?
*/
public class FileSender {???
/**????
* @param args????
* @throws JMSException?????
*/???
public static void main(String[] args) throws JMSException {???????
// 选择文件???????
JFileChooser fileChooser = new JFileChooser();????????
fileChooser.setDialogTitle("请选择要传送的文件");???????
if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) {?????????
return;?????????
}???????
File file = fileChooser.getSelectedFile();????????
// 获取 ConnectionFactory?????????
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(???????????????
"tcp://10.80.38.10:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.80.38.10:8161/fileserver/");?????????
// 创建 Connection???????
Connection connection = connectionFactory.createConnection();????????
connection.start(); .????????
// 创建 Session?????
ActiveMQSession session = (ActiveMQSession) connection.createSession(????????????????
false, Session.AUTO_ACKNOWLEDGE);??????
// 创建 Destination??????
Destination destination = session.createQueue("File.Transport");???????
// 创建 Producer??????
MessageProducer producer = session.createProducer(destination);???????
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置为非持久性????????
// 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件???????
// 构造 BlobMessage,用来传输文件???
BlobMessage blobMessage = session.createBlobMessage(file);??????
blobMessage.setStringProperty("FILE.NAME", file.getName());???????
blobMessage.setLongProperty("FILE.SIZE", file.length());???????
System.out.println("开始发送文件:" + file.getName() + ",文件大小:"????????????????
+ file.length() + " 字节");???????
// 7. 发送文件?????
producer.send(blobMessage);???????
System.out.println("完成文件发送:" + file.getName());????????
producer.close();????????
session.close();???????
connection.close(); // 不关闭 Connection, 程序则不退出?????
} }
3. 编写接收文件的程序 FileReceiver.java

public class FileReciever {?????
/**????
* @param args?
* @throws JMSException????
*/?
public static void main(String[] args) throws JMSException {????
// 获取 ConnectionFactory????????
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(?????????????????
"tcp://10.80.38.10:61616");????????
// 创建 Connection??????
Connection connection = connectionFactory.createConnection();?????????
connection.start();???????
// 创建 Session???????
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);???????
// 创建 Destinatione?????????
Destination destination = session.createQueue("File.Transport");???????
// 创建 Consumer????????
MessageConsumer consumer = session.createConsumer(destination);??????
// 注册消息监听器,当消息到达时被触发并处理消息???????
consumer.setMessageListener(new MessageListener() {???????????
// 监听器中处理消息?????????
public void onMessage(Message message) {????????????????
if (message instanceof BlobMessage) {????????????????????
BlobMessage blobMessage = (BlobMessage) message;??????????????????
try {???????????????????????
String fileName = blobMessage.getStringProperty("FILE.NAME");???????????????????????
System.out.println("文件接收请求处理:" + fileName + ",文件大小:"??????????????????????????????
+ blobMessage.getLongProperty("FILE.SIZE")+ " 字节");???????????????????????
JFileChooser fileChooser = new JFileChooser();????????????????????
fileChooser.setDialogTitle("请指定文件保存位置"); .????????????????????????
fileChooser.setSelectedFile(new File(fileName));????????????????????????
if (fileChooser.showSaveDialog(null) == JFileChooser.APPROVE_OPTION) {????????????????????????????
File file = fileChooser.getSelectedFile();????????????????????????????
OutputStream os = new FileOutputStream(file);???????????????????????????
System.out.println("开始接收文件:" + fileName);????????????????????????????
InputStream inputStream = blobMessage.getInputStream();????????????????????????????
//写文件,你也可以使用其他方式????????????????????????
byte[] buff = new byte[256];??????????????????????????
int len = 0;????????????????????????????
while ((len = inputStream.read(buff)) > 0) {????????????????????????????????
os.write(buff, 0, len);?????????????????????????????
}????????????????????????????
os.close();???????????????????????????
System.out.println("完成文件接收:" + fileName);??????????????????????
}???????????????????
} catch (Exception e) {????????????????????????
e.printStackTrace();????????????????????
}???????????????
}???????????
}????????
});???
} }

4. 运行程序

先执行 FileReceiver 程序来监听消息
再执行发送程序 FileSender,将会提示你选择一个要传送的文件(上图左边),确定后就会把文件发送到 ActiveMQ 服务器上
接收端 FileReceiver 监听到有文件传过来的消息后,会自动弹出保存文件的对话框,要你选择保存位置(上图右边),文件名能保持一致。确定后就开始接收文件,存到指定的位置。

如果设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 消息持久性的话,发送方传文件的时候,接收方可以不在线,文件会暂存在 ActiveMQ 服务器上,等到接收程序上线后仍然可以收到发过来的文件。

读书人网 >开源软件

热点推荐