读书人

ActiveMQ中ActiveMQBytesMessage类型可

发布时间: 2013-12-02 12:00:40 作者: rapoo

ActiveMQ中ActiveMQBytesMessage类型可能会丢失数据的问题及解决

ActiveMQBytesMessage类型的消息在特殊情况下会丢失数据,就是在被拷贝前设置消息的某个属性。下面是测试代码:

?producer代码

MessageProducer producer;//initialize Connection, Session, Producer......byte[] bs = "bytes message".getBytes();BytesMessage message = session.createBytesMessage();message.writeBytes(bs);for(int i=0; i< 0; i++){    message.setLongProperty("sendTime", System.currentTimeMillis());    try{        producer.send(message);    }catch(){         e.printStackTrace();    }    }

?Consumer代码

MessageConsumer consumer//initailize Connection, Session, MessageConsumerfor(int i=0; i<10; i++){    ActiveMQBytesMessage msg = (ActiveMQBytesMessage)consumer.receive(60*1000);    long sendTime = message.getLongProperty("sendTime");    System.out.println("sendtime:" + sendTime);    ByteSequence bs = message.getMessage().getContent();    System.out.println("bytes data:" + new String(bs.getData()));}

?

期待的结果:
consumer在所有接收到的消息中得到bytes数据

?

实际结果:
只有第一条消息有bytes数据,其他消息都丢失了bytes数据,而long property的值都没有丢失。

?

分析:
ActiveMQ在发送消息的时候会拷贝消息,实际发送的是消息的拷贝。在拷贝之前会调用storeContent() ,ActiveMQBytesMessage中的属性DataOutputStream dataOut?会被关闭,dataOut 会被置为null,dataOut中的值会被set到content属性中。如果不设置消息的属性,这个逻辑是没有问题的。当设置消息属性时,setLongProperty 方法中会调用setObjectProperty() ,然后调用initializeWriting(),在initializeWriting()中DataOutputStream dataOut会再次被创建。这样,当消息第二次被拷贝的时候,DataOutputStream dataOut不是null,而是EMPTY。由于不是null,dataOut的值会被set到content,这样content的值就被清空了。

?

根据JMS规范的要求:

 3.9 Access to Sent Messages After sending a message, a client may retain and modify it without affecting the message that has been sent. The same message object may be sent multiple times. During the execution of its sending method, the message must not be changed by the client. If it is modified, the result of the send is undefined.

?

消息发送后,原消息不应该受到影响。根据这个要求,目前的实现是个bug

建议:

?

在initializeWriting()中把content的数据设置到DataOutputStream dataOut中,这样就保持消息中bytes数据不变。

?

我的fix代码

?

ActiveMQBytesMessage :private void initializeWriting() throws JMSException {669 The original code......701 //fix codeif(this.content !=null && this.content.length >0){try{ this.dataOut.write(this.content.getData()); }catch(IOException ioe){ throw JMSExceptionSupport.create(ioe); }}702 }

?

?这个bug?已经提交给APACHE,还没有被修复。

读书人网 >开源软件

热点推荐