读书人

rabbitmq 学习-7- 官方rabbitmq+sprin

发布时间: 2013-02-24 17:58:56 作者: rapoo

rabbitmq 学习-7- 官方rabbitmq+spring进行远程接口调用
到http://github.com/momania/spring-rabbitmq下载其示例程序

实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,
RabbitRpcClient.java(对RpcClient的简单封装,添加了发送消息时的选项:
mandatory--是否强制发送,immediate--是否立即发送,timeOutMs--超时时间)


发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;

import static java.lang.String.format;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;

public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
implements InitializingBean, DisposableBean, ShutdownListener {

private final Log log = LogFactory
.getLog(RabbitInvokerServiceExporter.class);

private RabbitChannelFactory channelFactory;
private String exchange;
private ExchangeType exchangeType;
private String queueName;
private String routingKey;

private Object proxy;
private List<RpcServer> rpcServerPool;
private int poolsize = 1;

public void afterPropertiesSet() {
// 检查exchange type类型不能为fanout
if (exchangeType.equals(ExchangeType.FANOUT)) {
throw new InvalidRoutingKeyException(String.format(
"Exchange type %s not allowed for service exporter",
exchangeType));
}
exchangeType.validateRoutingKey(routingKey);

// 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
proxy = getProxyForService();

// 初始化rpcServer池
rpcServerPool = new ArrayList<RpcServer>(poolsize);

// 初始化RpcServer,并开始接收请求
startRpcServer();
}

// 初始化RpcServer,并开始接收请求
private void startRpcServer() {
try {
log.info("Creating channel and rpc server");

// 创建临时的channel,用来定义queue,exchange,并进行bind
// 这里有两个用处:
// 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
// 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
Channel tmpChannel = channelFactory.createChannel();
tmpChannel.getConnection().addShutdownListener(this);
tmpChannel.queueDeclare(queueName, false, false, false, true, null);
if (exchange != null) {
tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
tmpChannel.queueBind(queueName, exchange, routingKey);
}

// 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
for (int i = 1; i <= poolsize; i++) {
try {
// 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
Channel channel = channelFactory.createChannel();
String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
log.info(String.format(format, i, exchange, exchangeType,
queueName, routingKey));

// 使用当前的channel创建一个RpcServer去处理请求
final RpcServer rpcServer = createRpcServer(channel);
rpcServerPool.add(rpcServer);

// 创建一个线程让当前的RpcServer去处理请求
Runnable main = new Runnable() {
@Override
public void run() {
try {
// rpcServer开始处理请求
throw rpcServer.mainloop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
// 线程开始
new Thread(main).start();
} catch (IOException e) {
log.warn("Unable to create rpc server", e);
}
}
} catch (Exception e) {
log.error("Unexpected error trying to start rpc servers", e);
}
}

// 创建RpcServer对象
private RpcServer createRpcServer(Channel channel) throws IOException {
return new RpcServer(channel, queueName) {

// 重写处理接收到的消息的方法
public byte[] handleCall(byte[] requestBody,
AMQP.BasicProperties replyProperties) {
// 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
// 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
.deserialize(requestBody);

// 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
RemoteInvocationResult result = invokeAndCreateResult(
invocation, proxy);

// 将执行结果序列化为byte数据,然后返回给客户端
return SerializationUtils.serialize(result);

}
};
}

public void setChannelFactory(RabbitChannelFactory channelFactory) {
this.channelFactory = channelFactory;
}

@Required
public void setQueueName(String queueName) {
this.queueName = queueName;
}

public Object getProxy() {
return proxy;
}

@Override
public void destroy() throws Exception {
clearRpcServers();
}

// 清除所有的RpcServer
private void clearRpcServers() {
if (log.isInfoEnabled()) {
log.info(format("Closing %d rpc servers", rpcServerPool.size()));
}

for (RpcServer rpcServer : rpcServerPool) {
try {
// 中止处理请求
rpcServer.terminateMainloop();
rpcServer.close();
} catch (Exception e) {
log.warn("Error termination rpcserver loop", e);
}
}
rpcServerPool.clear();
if (log.isInfoEnabled()) {
log.info("Rpc servers closed");
}

}

@Override
public void shutdownCompleted(ShutdownSignalException cause) {
if (log.isInfoEnabled()) {
log.info(String.format("Channel connection lost for reason [%s]",
cause.getReason()));
log.info(String.format("Reference [%s]", cause.getReference()));
}

if (cause.isInitiatedByApplication()) {
if (log.isInfoEnabled()) {
log.info("Sutdown initiated by application");
}
} else if (cause.isHardError()) {
log
.error("Shutdown is a hard error, trying to restart the RPC server...");
startRpcServer();
}
}

public void setExchange(String exchange) {
this.exchange = exchange;
}

@Required
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}

public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}

@Required
public void setExchangeType(ExchangeType exchangeType) {
this.exchangeType = exchangeType;
}
}
----------------------------------
服务端,发布接口
/**
* 获取连接的工厂类
*/
package com.sun.study.spring.rabbitmq;

import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.springframework.beans.factory.InitializingBean;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author sunjun 2010-5-9 下午01:49:40
*/
public class RabbitConnectionFactory implements InitializingBean {

private final static Log logger = org.apache.commons.logging.LogFactory
.getLog(RabbitConnectionFactory.class);

private Map<String, Connection> CONNECTION_POOL = new HashMap<String, Connection>();
private Address[] address;
private ConnectionFactory connectionFactory;

private String hosts;
private String username = "guest";
private String password = "guest";
private String vhost = "/";

/**
* init address
*
* @return
*/
private void initAddress() {
String[] hostArray = hosts.split(";");
List<Address> addressList = new LinkedList<Address>();
for (int i = 0; i < hostArray.length; i++) {
if (!StringUtils.isBlank(hostArray[i]))
addressList.add(new Address(hostArray[i].trim(),
AMQP.PROTOCOL.PORT));
}
address = addressList.toArray(new Address[] {});
}

/**
* init connectionFactory
*/
private void initConnectionFactory() {
ConnectionParameters connectionParameters = new ConnectionParameters();
connectionParameters.setUsername(username);
connectionParameters.setPassword(password);
connectionParameters.setVirtualHost(vhost);
connectionFactory = new ConnectionFactory(connectionParameters);
}

public void afterPropertiesSet() throws Exception {
initAddress();
initConnectionFactory();
}

/**
* get connection
*
* @param name
* @return
* @throws Exception
*/
public Connection getConnection(String name) throws Exception {
synchronized (CONNECTION_POOL) {
Connection connection = CONNECTION_POOL.get(name);
if (connection == null || !connection.isOpen()) {
connection = connectionFactory.newConnection(address);
if (logger.isInfoEnabled()) {
logger.info("new rabbitmq connection sucess with host: "
+ connection.getHost() + " at time " + new Date());
}
CONNECTION_POOL.put(name, connection);
}
return connection;
}
}

public void shutdownCompleted(ShutdownSignalException cause) {
if (logger.isInfoEnabled()) {
logger.info(String.format(
"Channel connection lost for reason [%s]", cause
.getReason()));
logger.info(String.format("Reference [%s]", cause.getReference()));
}
if (cause.isInitiatedByApplication()) {
if (logger.isInfoEnabled()) {
logger.info("Sutdown initiated by application");
}
} else if (cause.isHardError()) {
logger
.error("Shutdown is a hard error, you can trying to reconnect...");
}
}

public String getHosts() {
return hosts;
}

public void setHosts(String hosts) {
this.hosts = hosts;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getVhost() {
return vhost;
}

public void setVhost(String vhost) {
this.vhost = vhost;
}
}
/**
* 处理客户端的请求,服务端核心处理类
*/
package com.sun.study.spring.rabbitmq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.remoting.support.RemoteExporter;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
* @author sunjun
* @create 2010-5-9 下午03:11:07
*/
public class RabbitmqInvokerExporter extends RemoteExporter implements
InitializingBean, DisposableBean, ShutdownListener {

private final static String PREFIX_QUEUE = "queue_";

private RabbitConnectionFactory rabbitConnectionFactory;
private int poolsize = 1;

private List<RpcServer> rpcServerPool;
private Object proxy;
private String queueName;

/**
* create a channel
*
* @return
* @throws Exception
*/
private Channel createChannel() throws Exception {
Connection connection = rabbitConnectionFactory
.getConnection(getServiceInterface().getName());
connection.addShutdownListener(this);
Channel channel = connection.createChannel();
channel.addShutdownListener(this);
return channel;
}

/**
* create a RpcServer
*
* @param channel
* @return
* @throws IOException
*/
private RpcServer createRpcServer(Channel channel) throws IOException {
return new RpcServer(channel, queueName) {

public byte[] handleCall(byte[] requestBody,
BasicProperties replyProperties) {
RemoteInvocation remoteInvocation = (RemoteInvocation) SerializationUtils
.deserialize(requestBody);
RemoteInvocationResult result = null;
try {
result = new RemoteInvocationResult(remoteInvocation
.invoke(proxy));
} catch (Exception e) {
logger.error("handle request error...", e);
result = new RemoteInvocationResult(e);
}
return SerializationUtils.serialize(result);
}

};
}

/**
* init a RpcServer
*
* @throws Exception
* @throws Exception
*/
private void initRpcServer(boolean queueDeclare) throws Exception {
Channel channel = createChannel();
if (queueDeclare)
channel.queueDeclare(queueName);
final RpcServer rpcServer = createRpcServer(channel);
rpcServerPool.add(rpcServer);
Runnable thread = new Runnable() {

@Override
public void run() {
try {
throw rpcServer.mainloop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

};
new Thread(thread).start();
}

/**
* init RpcServers
*
* @throws IOException
*/
private void initRpcServers() {
try {
Channel channel = createChannel();
// queueDeclare(queue, false, false, false, false, null);
channel.queueDeclare(queueName);
for (int i = 0; i < poolsize; i++)
initRpcServer(false);
if (logger.isInfoEnabled())
logger.info("init " + rpcServerPool.size()
+ " RpcServer success for " + queueName);
} catch (Exception e) {
logger.error("init RpcServers error.", e);
}
}

/**
* check RpcServer
*/
private void checkRpcServers() {
Thread checkThread = new Thread() {
public void run() {
long checkTime = 2000;
while (true) {
try {
Thread.sleep(checkTime);
} catch (InterruptedException e) {
}
if (rpcServerPool.isEmpty()) {
initRpcServers();
continue;
}
boolean error = false;
try {
for (RpcServer rpcServer : rpcServerPool) {
Channel channel = rpcServer.getChannel();
if (channel == null || !channel.isOpen()) {
try {
rpcServer.terminateMainloop();
rpcServer.close();
} catch (Exception e) {
error = true;
}
rpcServerPool.remove(rpcServer);
initRpcServer(true);
}
}
for (int i = 0; i < poolsize - rpcServerPool.size(); i++)
initRpcServer(true);
} catch (Exception e) {
error = true;
}
checkTime = error ? 2000 : 5000;
}
}
};
checkThread.setDaemon(true);
checkThread.start();
}

public void afterPropertiesSet() {
proxy = getProxyForService();
rpcServerPool = new ArrayList<RpcServer>(poolsize);
queueName = PREFIX_QUEUE + getServiceInterface().getName();
initRpcServers();
checkRpcServers();
}

public void shutdownCompleted(ShutdownSignalException cause) {
if (logger.isInfoEnabled()) {
logger.info(String.format(
"Channel connection lost for reason [%s]", cause
.getReason()));
logger.info(String.format("Reference [%s]", cause.getReference()));
}
if (cause.isInitiatedByApplication()) {
if (logger.isInfoEnabled()) {
logger.info("Sutdown initiated by application");
}
} else if (cause.isHardError()) {
logger
.error("Shutdown is a hard error, you can trying to reconnect...");
}
}

public void destroy() throws Exception {
for (RpcServer rpcServer : rpcServerPool) {
rpcServer.terminateMainloop();
rpcServer.close();
}
rpcServerPool.clear();
}

public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}

public void setRabbitConnectionFactory(
RabbitConnectionFactory rabbitConnectionFactory) {
this.rabbitConnectionFactory = rabbitConnectionFactory;
}

}
-----------------------------
客户端,获得接口代理对象,进行远程调用
/**
* 包装RpcClient,进行远程调用后,需要同步等待结果
*/
package com.sun.study.spring.rabbitmq;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.MethodArgumentReader;
import com.rabbitmq.client.impl.MethodArgumentWriter;
import com.rabbitmq.utility.BlockingCell;

/**
* @author sunjun
* @create 2010-5-16 下午09:56:55
*/
class RabbitRpcClient {

private final Channel channel;
private final String exchange;
private final String routingKey;

private final Map<String, BlockingCell<Object>> continuationMap = new HashMap<String, BlockingCell<Object>>();
private int correlationId;

private final String replyQueue;
private DefaultConsumer consumer;

private final boolean mandatory;
private final boolean immediate;
private final int timeOutMs;

public RabbitRpcClient(Channel channel, String exchange, String routingKey,
int timeOutMs) throws IOException {
this(channel, exchange, routingKey, timeOutMs, false, false);
}

@SuppressWarnings( { "ConstructorWithTooManyParameters" })
public RabbitRpcClient(Channel channel, String exchange, String routingKey,
int timeOutMs, boolean mandatory, boolean immediate)
throws IOException {
this.channel = channel;
this.exchange = exchange;
this.routingKey = routingKey;
this.timeOutMs = timeOutMs;
this.mandatory = mandatory;
this.immediate = immediate;
correlationId = 0;

replyQueue = setupReplyQueue();
consumer = setupConsumer();
}

void checkConsumer() throws IOException {
if (consumer == null) {
throw new EOFException("RpcClient is closed");
}
}

public void close() throws IOException {
if (consumer != null) {
channel.basicCancel(consumer.getConsumerTag());
consumer = null;
}
}

private String setupReplyQueue() throws IOException {
return channel.queueDeclare("", false, false, true, true, null)
.getQueue();
}

private DefaultConsumer setupConsumer() throws IOException {
DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException signal) {

synchronized (continuationMap) {
for (Map.Entry<String, BlockingCell<Object>> entry : continuationMap
.entrySet()) {
entry.getValue().set(signal);
}
RabbitRpcClient.this.consumer = null;
}
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
synchronized (continuationMap) {
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker = continuationMap.get(replyId);
continuationMap.remove(replyId);
blocker.set(body);
}
}
};
channel.basicConsume(replyQueue, true, consumer);
return consumer;
}

void publish(AMQP.BasicProperties props, byte[] message) throws IOException {
channel.basicPublish(exchange, routingKey, mandatory, immediate, props,
message);

}

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
throws IOException, ShutdownSignalException, TimeoutException {
AMQP.BasicProperties localProps = props;
checkConsumer();
BlockingCell<Object> k = new BlockingCell<Object>();
synchronized (continuationMap) {
correlationId++;
String replyId = "" + correlationId;
if (localProps != null) {
localProps.setCorrelationId(replyId);
localProps.setReplyTo(replyQueue);
} else {
localProps = new AMQP.BasicProperties(null, null, null, null,
null, replyId, replyQueue, null, null, null, null,
null, null, null);
}
continuationMap.put(replyId, k);
}
publish(localProps, message);
Object reply = k.uninterruptibleGet(timeOutMs);
if (reply instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) reply;
ShutdownSignalException wrapper = new ShutdownSignalException(sig
.isHardError(), sig.isInitiatedByApplication(), sig
.getReason(), sig.getReference());
wrapper.initCause(sig);
throw wrapper;
} else {
return (byte[]) reply;
}
}

public byte[] primitiveCall(byte[] message) throws IOException,
ShutdownSignalException, TimeoutException {
return primitiveCall(null, message);
}

public String stringCall(String message) throws IOException,
ShutdownSignalException, TimeoutException {
return new String(primitiveCall(message.getBytes()));
}

@SuppressWarnings( { "IOResourceOpenedButNotSafelyClosed" })
public Map<String, Object> mapCall(Map<String, Object> message)
throws IOException, ShutdownSignalException, TimeoutException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
MethodArgumentWriter writer = new MethodArgumentWriter(
new DataOutputStream(buffer));
writer.writeTable(message);
writer.flush();
byte[] reply = primitiveCall(buffer.toByteArray());
MethodArgumentReader reader = new MethodArgumentReader(
new DataInputStream(new ByteArrayInputStream(reply)));
return reader.readTable();
}

public Map<String, Object> mapCall(Object[] keyValuePairs)
throws IOException, ShutdownSignalException, TimeoutException {
Map<String, Object> message = new HashMap<String, Object>();
for (int i = 0; i < keyValuePairs.length; i += 2) {
message.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
}
return mapCall(message);
}

public Channel getChannel() {
return channel;
}

public String getExchange() {
return exchange;
}

public String getRoutingKey() {
return routingKey;
}

public Map<String, BlockingCell<Object>> getContinuationMap() {
return Collections.unmodifiableMap(continuationMap);
}

public int getCorrelationId() {
return correlationId;
}

public String getReplyQueue() {
return replyQueue;
}

public Consumer getConsumer() {
return consumer;
}
}
/**
* 生成接口代理对象,进行远程调用
*/
package com.sun.study.spring.rabbitmq;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang.SerializationUtils;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.remoting.support.RemoteAccessor;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author sunjun
* @create 2010-5-9 下午04:49:58
*/
public class RabbitmqProxyFactoryBean extends RemoteAccessor implements
FactoryBean, MethodInterceptor, InitializingBean, DisposableBean,
ShutdownListener {

private final static String PREFIX_QUEUE = "queue_";
private final static String PREFIX_EXCHANGE = "exchange_";
private final static String PREFIX_ROUTINGKEY = "routingkey_";
private final static String EXCHANGE_TYPE = "topic";

private String exchange;
private String routingKey;
private String queueName;

private RabbitConnectionFactory rabbitConnectionFactory;
private String exchangeType = EXCHANGE_TYPE;
private int poolsize = 1;
private int timeoutMs = 0;
private boolean mandatory;
private boolean immediate;

private Object serviceProxy;
private final BlockingQueue<RabbitRpcClient> rpcClientPool = new LinkedBlockingQueue<RabbitRpcClient>();

/**
* create a channel
*
* @return
* @throws Exception
*/
private Channel createChannel() throws Exception {
Connection connection = rabbitConnectionFactory
.getConnection(getServiceInterface().getName());
connection.addShutdownListener(this);
Channel channel = connection.createChannel();
channel.addShutdownListener(this);
return channel;
}

/**
* init many RpcClient
*/
private void initRpcClients() {
try {
Channel temChannel = createChannel();

temChannel.queueDeclare(queueName);
temChannel.exchangeDeclare(exchange, EXCHANGE_TYPE);
temChannel.queueBind(queueName, exchange, routingKey);

for (int i = 0; i < poolsize; i++) {
RabbitRpcClient rpcClient = createRpcClient(exchange,
routingKey, false);
rpcClientPool.add(rpcClient);
}
} catch (Exception e) {
logger.error("create many RpcClient error.", e);
for (int i = 0; i < poolsize - rpcClientPool.size(); i++)
rpcClientPool.add(null);
}
}

/**
* create a RpcClient
*
* @throws Exception
*/
private RabbitRpcClient createRpcClient(String exchange, String routingKey,
boolean declareAndBind) throws Exception {
Channel channel = createChannel();
if (declareAndBind) {
channel.queueDeclare(queueName);
channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
channel.queueBind(queueName, exchange, routingKey);
}
final RabbitRpcClient rpcClient = new RabbitRpcClient(channel,
exchange, routingKey, timeoutMs, mandatory, immediate);
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
// call handle result here, so uninterruptable cal will
// be interrupted
Throwable resultException;
String msg;
switch (replyCode) {
case AMQP.NO_CONSUMERS:
msg = "No consumers for message [%s] - [%s] - [%s]";
resultException = new RuntimeException(String.format(msg,
SerializationUtils.deserialize(body), exchange,
routingKey));
break;
case AMQP.NO_ROUTE:
msg = "Unroutable message [%s] - [%s] - [%s]";
resultException = new RuntimeException(String.format(msg,
SerializationUtils.deserialize(body), exchange,
routingKey));
break;
default:
msg = "Message returned [%s] - [%s] - [%s] - [%d] - [%s]";
resultException = new RuntimeException(String.format(msg,
SerializationUtils.deserialize(body), exchange,
routingKey, replyCode, replyText));
}
RemoteInvocationResult remoteInvocationResult = new RemoteInvocationResult(
resultException);
rpcClient.getConsumer().handleDelivery(null, null, properties,
SerializationUtils.serialize(remoteInvocationResult));
}
});
if (logger.isInfoEnabled()) {
String str = "Started rpc client on exchange [%s(%s)] - routingKey [%s]";
logger.info(String.format(str, exchange, exchangeType, routingKey));
}
return rpcClient;
}

/**
* check RpcClient
*
* @param rpcClient
* @return
* @throws Exception
*/
private RabbitRpcClient checkRpcClient(RabbitRpcClient rpcClient)
throws Exception {
boolean create = false;
if (rpcClient == null)
create = true;
else {
Channel channel = rpcClient.getChannel();
if (channel == null || !channel.isOpen())
create = true;
}
if (!create)
return rpcClient;
return createRpcClient(exchange, routingKey, true);
}

public void afterPropertiesSet() {
String serviceInterfaceName = getServiceInterface().getName();
exchange = PREFIX_EXCHANGE + serviceInterfaceName;
routingKey = PREFIX_ROUTINGKEY + serviceInterfaceName;
queueName = PREFIX_QUEUE + serviceInterfaceName;

initRpcClients();

serviceProxy = new ProxyFactory(getServiceInterface(), this)
.getProxy(getBeanClassLoader());
}

public Object invoke(MethodInvocation methodInvocation) throws Throwable {
RemoteInvocation remoteInvocation = new RemoteInvocation(
methodInvocation);
RabbitRpcClient rpcClient = rpcClientPool.poll(timeoutMs,
TimeUnit.MILLISECONDS);
rpcClient = checkRpcClient(rpcClient);
if (rpcClient != null) {
byte[] response;
try {
byte[] message = SerializationUtils.serialize(remoteInvocation);
response = rpcClient.primitiveCall(message);
} finally {
rpcClientPool.put(rpcClient);
}
RemoteInvocationResult remoteInvocationResult = (RemoteInvocationResult) SerializationUtils
.deserialize(response);
return remoteInvocationResult.recreate();
}
throw new TimeoutException(
"Timed out while waiting for available rpc client");
}

public void destroy() throws Exception {

}

public void shutdownCompleted(ShutdownSignalException cause) {

}

public Object getObject() throws Exception {
return serviceProxy;
}

public Class getObjectType() {
return getServiceInterface();
}

public boolean isSingleton() {
return true;
}

public void setRabbitConnectionFactory(
RabbitConnectionFactory rabbitConnectionFactory) {
this.rabbitConnectionFactory = rabbitConnectionFactory;
}

public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}

public void setTimeoutMs(int timeoutMs) {
this.timeoutMs = timeoutMs;
}

public void setMandatory(boolean mandatory) {
this.mandatory = mandatory;
}

public void setImmediate(boolean immediate) {
this.immediate = immediate;
}

public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}

}
---------------------------------
前面两步后,进行Spring的配置
src/main/resources/rabbitmq/spring-rabbitmq-base.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

<bean id="rabbitConnectionFactory" value="localhost" />
<property name="username" value="guest" />
<property name="password" value="guest" />
<property name="vhost" value="/" />
</bean>

</beans>
src/main/resources/rabbitmq/spring-rabbitmq-server.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

<import resource="spring-rabbitmq-base.xml" />

<bean id="userService" />

<bean ref="userService" />
<property name="serviceInterface" value="com.sun.study.service.UserService" />
<property name="poolsize" value="10" />
<property name="rabbitConnectionFactory" ref="rabbitConnectionFactory" />
</bean>

</beans>
src/main/resources/rabbitmq/spring-rabbitmq-client.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

<import resource="spring-rabbitmq-base.xml" />

<bean id="userService" value="com.sun.study.service.UserService" />
<property name="rabbitConnectionFactory" ref="rabbitConnectionFactory" />
<property name="timeoutMs" value="5000" />
<property name="mandatory" value="true" />
<property name="immediate" value="false" />
</bean>

</beans>
--------------------------------
测试:
Server:
/**
* 开启服务端
*/
package rabbitmq;

import org.testng.annotations.Test;
import org.unitils.UnitilsTestNG;
import org.unitils.spring.annotation.SpringApplicationContext;

/**
* @author Administrator
*
*/
@Test
@SpringApplicationContext(value = { "classpath:/rabbitmq/spring-rabbitmq-server.xml" })
public class RpcServerTest extends UnitilsTestNG {

public void test() {
while (true) {
}
}

}
Client:
/**
* Client调用,测试
*/
package rabbitmq;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import com.sun.study.service.UserService;

/**
* @author sunjun
* @create 2010-5-15 下午08:01:05
*/
public class Test {

/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext context = new FileSystemXmlApplicationContext(
"classpath:/rabbitmq/spring-rabbitmq-client.xml");
UserService userService = (UserService) context.getBean("userService");
System.out.println(userService.save(11));
}
}

读书人网 >软件架构设计

热点推荐