读书人

rabbitmq 学习-5

发布时间: 2013-02-24 17:58:56 作者: rapoo

rabbitmq 学习-5-
RpcClient发送消息和同步接收消息原理

本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:

RpcClient client = new RpcClient(channel, exchange, routingKey);

String msg = "hello world!";

byte[] result = client.primitiveCall(msg.getBytes());

这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息

一个完整的发送消息与接收回复消息的图例:

整个流程详解:

l 创建RpcClient实例
RpcClient client = new RpcClient(channel, exchange, routingKey);

创建RpcClient时会做两件事:

A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息

_replyQueue = setupReplyQueue();



protected String setupReplyQueue() throws IOException {

return _channel.queueDeclare("", false, false, true, true, null).getQueue();

//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueName,queueName是由server生成的,使用的是以下这个方法:

Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,

Map<String, Object> arguments)

}



B:创建一个接收回复消息的consumer

_consumer = setupConsumer();



protected DefaultConsumer setupConsumer() throws IOException {

//创建一个接收消息的DefaultConsumer实例

DefaultConsumer consumer = new DefaultConsumer(_channel) {

@Override //发生shutdown的时候回调

public void handleShutdownSignal(String consumerTag,

ShutdownSignalException signal) {

synchronized (_continuationMap) {

for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {

entry.getValue().set(signal);

}

_consumer = null;

}

}



@Override //处理消息交付

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException {

//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收

synchronized (_continuationMap) {

String replyId = properties.getCorrelationId();

BlockingCell<Object> blocker = _continuationMap.get(replyId);

_continuationMap.remove(replyId);

blocker.set(body);

}

}

};

//让接收消息的consumer去replyQueue上去接收消息,这个过程对于主线程来说是异步进行的,只要replyQueue上有消息了,consumer就会去replyQueue上去接收消息,并回调它的handleDelivery方法

_channel.basicConsume(_replyQueue, true, consumer);

return consumer;

}



l 发送消息
byte[] result = rpcClient.primitiveCall(msg.getBytes());

使用rpcClient的primitiveCall发送消息,看看是怎么做的

public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {

return primitiveCall(null, message);

}

继续跟踪,核心方法是这个

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{

//检查consumer是否为空,若为空,抛出异常

checkConsumer();



BlockingCell<Object> k = new BlockingCell<Object>();

synchronized (_continuationMap) {

_correlationId++;

String replyId = "" + _correlationId;

//如果props不为空,则将上一步骤创建的replyQueue设置到props上去,还有replyId

if (props != null) {

props.setCorrelationId(replyId);

props.setReplyTo(_replyQueue);

}

else {

//如果props为空,则创建一个,并将replyId和replyQueue都设置到props上

props = new AMQP.BasicProperties(null, null, null, null,

null, replyId,

_replyQueue, null, null, null,

null, null, null, null);

}

_continuationMap.put(replyId, k);

}

//使用上面的props发送消息,这样replyQueue和replyId就跟着传递到了接收消息的那一方去了,接收消息的client去props上去取到replyQueue,它就知道了它接收的消息的回复queue,然后它会将回复消息发送到replyQueue上去,而在上一步骤我们已经指定了一个consumer去replyQueue上去取消息,所以整个发送和接收消息的所有client是有条不紊的进行着

publish(props, message); //这行代码执行完后,只是将消息发送出去了,接收回复消息是异步的,由上一步骤的consumer去接收回复消息

//这里就是进行同步等待接收回复消息,将异步接收变成同步回复接收的核心就在这里

Object reply = k.uninterruptibleGet();

if (reply instanceof ShutdownSignalException) {

ShutdownSignalException sig = (ShutdownSignalException) reply;

ShutdownSignalException wrapper =

new ShutdownSignalException(sig.isHardError(),

sig.isInitiatedByApplication(),

sig.getReason(),

sig.getReference());

wrapper.initCause(sig);

throw wrapper;

} else {

return (byte[]) reply;

}

}


完整描述
创建RpcClient实例:
1,定义一个Map,用于存放每个消息的相关信息:
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
Key是一个correlationId,相当于当前rpcClient实例发送消息的一个计数器,初始化时是0,每发送一个消息时,加1
Value是一个com.rabbitmq.utility.BlockingCell对象,它是在发送消息前创建,并和当前的correlationId进行关联,放进来
_continuationMap.put(correlationId, blockingCell);
2,correlationId初始化为0
3,创建一个回复queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,创建一个接收回复消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);

RpcClient发送消息:
1,创建一个BlockingCell<Object>对象blockingCell
1,correlationId++
2,创建BasicProperties对象,并将correlationId,replyQueue设置到它上面,发送消息时,它会被传递到接收方
3,以correlationId为Key,将blockingCell放入到_continuationMap中
4,发送消息:channel.basicPublish(exchange, routingKey, 上面 步骤得到的BasicProperties对象, message);
5,获取回复消息,Object reply = blockingCell.uninterruptibleGet();这里就是同步等待回复消息

RpcServer接收消息:
1,接收消息
2,从request中获取BasicProperties对象requestProperties,requestProperties=request.getProperties()
3,从requestProperties中得到correlationId,replyQueue
4,创建一个回复消息用的BasicProperties对象replyProperties,并将correlationId设置到它上面
4,发送回复消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);

RpcClient接收回复:
1,replyQueue一有消息,consumer就会接收到并回调consumer的handleDelivery方法
2,获取传递过来的BasicProperties获取correlationId
3,根据correlationId去continuationMap中取BlockingCell对象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,从continuationMap中删除,continuationMap.remove(correlationId);
5,将回复消息设置到blocker对象里面,blocker.set(replyMessage);

同步等待回复消息:
1,【RpcClient发送消息】第4步主线程,发送消息后,第5步就去获取回复消息
2,【RpcClient发送消息】第5步主线程,blockingCell.uninterruptibleGet(),如果blockingCell没有被set(value)过,那么让当前主线程处于等待wait(),等待状态
3,【RpcClient接收回复】第5步blocker.set(replyMessage);这里的blocker其实就是上面主线程创建的blockingCell,因为它是根据correlationId去continuationMap中取的,set(replyMessage),blocker会用一个属性将replyMessage保存起来,供get的时候去返回这个属性,然后调用notify();唤醒处于等待的主线程(当前这步所在的线程和上一步主线程是在两个线程,所以主线程的等待是可以被这个线程唤醒的),主线程被唤醒后,get()就会取到replyMessage,最终整个步骤实现了将异步接收强制转换为同步等待接收

BlockingCell类
public class BlockingCell<T> {

private boolean _filled = false;
private T _value;

private static final long NANOS_IN_MILLI = 1000 * 1000;
private static final long INFINITY = -1;

public BlockingCell() {
}

public synchronized T get() throws InterruptedException {
while (!_filled) { //如果value没有被设置过
wait(); //让当前线程处于等待,直到其它线程调用当前对象的notify()或notifyAll()为止
}
return _value;
}

//带超时的get
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout < 0 && timeout != INFINITY)
throw new AssertionError("Timeout cannot be less than zero");
if (!_filled && timeout != 0) {
wait(timeout == INFINITY ? 0 : timeout);
}
if (!_filled)
throw new TimeoutException();
return _value;
}

//无限制的等待,直到取到值为止
public synchronized T uninterruptibleGet() {
while (true) {
try {
return get();
} catch (InterruptedException ex) {
}
}
}

public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
long now = System.nanoTime() / NANOS_IN_MILLI;
long runTime = now + timeout;
do {
try {
return get(runTime - now);
} catch (InterruptedException e) {
}
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
throw new TimeoutException();
}

public synchronized void set(T newValue) {
if (_filled) {
throw new AssertionError("BlockingCell can only be set once");
}
_value = newValue;
_filled = true;
notify(); //唤醒当前线程(处于等待状态)
}

//保证只能被set(value)一次
public synchronized boolean setIfUnset(T newValue) {
if (_filled) {
return false;
}
set(newValue);
_filled = true;
return true;
}
}

---------------------------------
channel 说明

rabbitmq java api 关于消息处理的一个重要的类是channel
channel 主要进行相关定义,发送消息,获取消息,事务处理等。
channel可以在多线程中使用,但是在任何时候保证只有一个线程执行命令是很重要的,这在前面 rabbitmq 学习-6-rabbitmq基础 已经说的很清楚了。

public interface Channel extends ShutdownNotifier {

// 重新得到channel number
int getChannelNumber();

//得到当前channel的connection
Connection getConnection();

//关闭 channel,closeCode=com.rabbitmq.client.AMQP#REPLY_SUCCESS,closeMessage='OK'
void close() throws IOException;

//指定code和message关闭channel
void close(int closeCode, String closeMessage) throws IOException;

//中止 channel,closeCode=com.rabbitmq.client.AMQP#REPLY_SUCCESS,closeMessage='OK'
//此操作中的所有异常将被丢弃
void abort() throws IOException;

//指定code和message中止channel
//此操作中的所有异常将被丢弃
void abort(int closeCode, String closeMessage) throws IOException;

//得到当前channel的ReturnListener
ReturnListener getReturnListener();

//设置当前channel的ReturnListener
void setReturnListener(ReturnListener listener);

/**
* Request specific "quality of service" settings.
*
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring the receipt of
* acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire connection rather than just the current channel
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

/**
* Request a specific prefetchCount "quality of service" settings
* for this channel.
*
* @see #basicQos(int, int, boolean)
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchCount) throws IOException;

//发送消息,"mandatory" and "immediate" 都是 false
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

/**
* 发送消息
* @param exchange名称
* @param routingKey名称
* @param mandatory 是否强制发送
* @param immediate 是否立即发送
* @param props other properties for the message - routing headers etc
* @param body 消息
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

/**
* 删除exchange,不管是否在使用
* @param exchange名称
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

/**
* 删除exchange
* @param exchange名称
* @param ifUnused 设置是否只删除没有使用的
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

/**
* 定义exchange,non-autodelete, non-durable
* @param exchange名称
* @param exchange类型
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

/**
* 定义exchange,non-autodelete
* @param exchange名称
* @param exchange类型
* @param durable 是否持续存在(持续存在,即使server重启也会存在)
* @throws java.io.IOException if an error is encountered
* @return a declaration-confirm method to indicate the exchange was successfully declared
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

/**
* 定义exchange
* @param exchange名称
* @param exchange类型
* @param passive true if we are passively declaring a exchange (asserting the exchange already exists)
* @param durable 是否持续存在(持续存在,即使server重启也会存在)
* @param autoDelete 是否自动删除,自动删除-server会在它不在使用的时候将其删除
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean passive, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

/**
* 定义一个queue,由server去命名,exclusive, autodelete, non-durable
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare() throws IOException;

/**
* 定义一个queue,non-exclusive, non-autodelete, non-durable
* @param queue 名称
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue) throws IOException;

/**
* 定义一个queue,non-exclusive, non-autodelete
* @param queue 名称
* @param durable 是否持续存在(true:server重启也会存在)
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable) throws IOException;

/**
* 定义一个queue
* @param queue 名称
* @param passive true if we are passively declaring a queue (asserting the queue already exists)
* @param durable 是否持续存在
* @param exclusive true if we are declaring an exclusive queue
* @param autoDelete 是否自动删除,true:不在使用了server将会自动删除它
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

/**
* 删除queue,不管它是否在使用
* @param queue 名称
* @return a deletion-confirm method to indicate the queue was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Queue.DeleteOk queueDelete(String queue) throws IOException;

/**
* 删除queue
* @param queue 名称
* @param ifUnused 是否只删除没有被使用的queue
* @param ifEmpty 是否只删除消息是空的queue
* @return a deletion-confirm method to indicate the queue was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

/**
* 使用routingKey将queue绑定到exchange上
* @param queue 名称
* @param exchange the name of the exchange
* @param routingKey the routine key to use for the binding
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

/**
* 使用routingKey将queue绑定到exchange上,带参数
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey the routine key to use for the binding
* @param arguments other properties (binding parameters)
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

/**
* 解除绑定
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey the routine key to use for the binding
* @return an unbinding-confirm method if the binding was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

/**
* 解除绑定,带参数
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey the routine key to use for the binding
* @param arguments other properties (binding parameters)
* @return an unbinding-confirm method if the binding was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

/**
* Purges the contents of the given queue and awaits a completion.
* @see com.rabbitmq.client.AMQP.Queue.Purge
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
* @param queue the name of the queue
* @return a purge-confirm method if the purge was executed succesfully
* @throws java.io.IOException if an error is encountered
*/
Queue.PurgeOk queuePurge(String queue) throws IOException;

/**
* Purges the contents of the given queue.
* @see com.rabbitmq.client.AMQP.Queue.Purge
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
* @param queue the name of the queue
* @param nowait whether to await completion of the purge
* @return a purge-confirm method if the purge was executed succesfully
* @throws java.io.IOException if an error is encountered
*/
Queue.PurgeOk queuePurge(String queue, boolean nowait) throws IOException;

/**
* 从queue上取消息
* @param queue the name of the queue
* @param noAck true if no handshake is required
* @return a {@link GetResponse} containing the retrieved message data
* @throws java.io.IOException if an error is encountered
*/
GetResponse basicGet(String queue, boolean noAck) throws IOException;

/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true if we are acknowledging multiple messages with the same delivery tag
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* explicit acknowledgements required and a server-generated consumerTag.
* @param queue the name of the queue
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicAck
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
*/
String basicConsume(String queue, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param noAck true if no handshake is required
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
*/
String basicConsume(String queue, boolean noAck, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer.
* @param queue the name of the queue
* @param noAck true if no handshake is required
* @param consumerTag a client-generated consumer tag to establish context
* @param callback an interface to the consumer object
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
*/
String basicConsume(String queue, boolean noAck, String consumerTag, Consumer callback) throws IOException;

/**
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
* method before returning.
* @param queue the name of the queue
* @param noAck true if no handshake is required
* @param consumerTag a client-generated consumer tag to establish context
* @param noLocal flag set to true unless server local buffering is required
* @param exclusive true if this is an exclusive consumer
* @param callback an interface to the consumer object
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
*/
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Consumer callback) throws IOException;

/**
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
* method before returning.
* @param consumerTag a client- or server-generated consumer tag to establish context
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Cancel
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
*/
void basicCancel(String consumerTag) throws IOException;

/**
* Ask the broker to resend unacknowledged messages. In 0-8
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
* the new, deprecated method basic.recover_async is asynchronous.
* To avoid this API changing, this is named for the latter, and
* will be deprecated.
* @param requeue If true, messages will be requeued and possibly
* delivered to a different consumer. If false, messages will be
* redelivered to the same consumer.
*/
void basicRecoverAsync(boolean requeue) throws IOException;

/**
* 启用事务模式
* @return a transaction-selection method to indicate the transaction was successfully initiated
* @throws java.io.IOException if an error is encountered
*/
Tx.SelectOk txSelect() throws IOException;

/**
* 提交事务
* @return a transaction-commit method to indicate the transaction was successfully committed
* @throws java.io.IOException if an error is encountered
*/
Tx.CommitOk txCommit() throws IOException;

/**
* 回流事务
* @return a transaction-rollback method to indicate the transaction was successfully rolled back
* @throws java.io.IOException if an error is encountered
*/
Tx.RollbackOk txRollback() throws IOException;
}

读书人网 >软件架构设计

热点推荐