Alibaba Dubbo框架同步调用原理分析
?
2)?? 半双工
信息能双向传输,但不能同时双向传输
?

3)?? 全双工
能双向传输并且可以同时双向传输
?

n? Socket
Socket?是一种应用接口, TCP/IP?是网络传输协议,虽然接口相同,?但是不同的协议会有不同的服务性质。创建Socket?连接时,可以指定使用的传输层协议,Socket?可以支持不同的传输层协议(TCP?或UDP?),当使用TCP?协议进行连接时,该Socket?连接就是一个TCP?连接。Soket?跟TCP/IP?并没有必然的联系。Socket?编程接口在设计的时候,就希望也能适应其他的网络协议。所以,socket?的出现只是可以更方便的使用TCP/IP?协议栈而已。
引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html
上一个通信理论其实是想说Socket(TCP)通信是全双工的方式
n? Dubbo远程同步调用原理分析
从Dubbo开源文档上了解到一个调用过程如下图
http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference
另外文档里有说明:Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。
?

Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。
连接个数:单连接连接方式:长连接传输协议:TCP传输方式:NIO异步传输序列化:Hessian二进制序列化适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。适用场景:常规远程服务方法调用?通常,一个典型的同步远程调用应该是这样的:

- client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁,再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?? ? ?答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。?这种做法不是第一次见了,10年在上一公司里,也是远程接口调用,不过走的消息中间件rabbitmq,同步调用的原理跟这类似,详见:rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
com.taobao.remoting.impl.DefaultClient.java
//同步调用远程接口
public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
??????? byte protocol = getProtocol(control);
??????? if (!TRConstants.isValidProtocol(protocol)) {
??????????? throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
??????? }
??????? ResponseFuture future = invokeWithFuture(appRequest, control);
??????? return future.get(); ?//获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback
}
public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
???????? byte protocol = getProtocol(control);
???????? long timeout = getTimeout(control);
???????? ConnectionRequest request = new ConnectionRequest(appRequest);
???????? request.setSerializeProtocol(protocol);
???????? Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
???????? connection.sendRequestWithCallback(request, adapter, timeout);
???????? return adapter;
}
?
?
Callback2FutureAdapter implements ResponseFuture
public Object get() throws RemotingException, InterruptedException {
synchronized (this) { ?// 旋锁
? ?while (!isDone) { ?// 是否有结果了
wait(); //没结果是释放锁,让当前线程处于等待状态
? ?}
}
if (errorCode == TRConstants.RESULT_TIMEOUT) {
? ?throw new TimeoutException("Wait response timeout, request["
? ?+ connectionRequest.getAppRequest() + "].");
}
else if (errorCode > 0) {
? ?throw new RemotingException(errorMsg);
}
else {
? ?return appResp;
}
}
客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()
public void handleResponse(Object _appResponse) {
???????? appResp = _appResponse; //将远程调用结果设置到callback中来
???????? setDone();
}
public void onRemotingException(int _errorType, String _errorMsg) {
???????? errorCode = _errorType;
???????? errorMsg = _errorMsg;
???????? setDone();
}
private void setDone() {
???????? isDone = true;
???????? synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
???????? ??? notifyAll(); // 唤醒处于等待的线程
???????? }
}
?
com.taobao.remoting.impl.DefaultConnection.java
?
// 用来存放请求和回调的MAP
private final ConcurrentHashMap<Long, Object[]> requestResidents;
?
//发送消息出去
void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
???????? long requestId = connRequest.getId();
???????? long waitBegin = System.currentTimeMillis();
???????? long waitEnd = waitBegin + timeoutMs;
???????? Object[] queue = new Object[4];
???????? int idx = 0;
???????? queue[idx++] = waitEnd;
???????? queue[idx++] = waitBegin;?? //用于记录日志
???????? queue[idx++] = connRequest; //用于记录日志
???????? queue[idx++] = callback;
???????? requestResidents.put(requestId, queue); // 记录响应队列
???????? write(connRequest);
?
???????? // 埋点记录等待响应的Map的大小
???????? StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
?????????????????? 1L);
}
public void write(final Object connectionMsg) {
//mina里的IoSession.write()发送消息
???????? WriteFuture writeFuture = ioSession.write(connectionMsg);
???????? // 注册FutureListener,当请求发送失败后,能够立即做出响应
???????? writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
}
?
/**
* 在得到响应后,删除对应的请求队列,并执行回调
* 调用者:MINA线程
*/
public void putResponse(final ConnectionResponse connResp) {
???????? final long requestId = connResp.getRequestId();
???????? Object[] queue = requestResidents.remove(requestId);
???????? if (null == queue) {
???????? ??? Object appResp = connResp.getAppResponse();
???????? ??? String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
???????? ??? StringBuilder sb = new StringBuilder();
???????? ??? sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
???????? ??? sb.append("from [").append(connResp.getHost()).append("],");
???????? ??? sb.append("response type [").append(appRespClazz).append("].");
???????? ??? LOGGER.warn(sb.toString());
???????? ??? return;
???????? }
???????? int idx = 0;
???????? idx++;
???????? long waitBegin = (Long) queue[idx++];
???????? ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
???????? ResponseCallback callback = (ResponseCallback) queue[idx++];
???????? // ** 把回调任务交给业务提供的线程池执行 **
???????? Executor callbackExecutor = callback.getExecutor();
???????? callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
?
???????? long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间
???????? logIfResponseError(connResp, duration, connRequest.getAppRequest());
}
?
CallbackExecutorTask
static private class CallbackExecutorTask implements Runnable {
???????? final ConnectionResponse resp;
???????? final ResponseCallback callback;
???????? final Thread createThread;
?
???????? CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {
???????? ??? resp = _resp;
???????? ??? callback = _cb;
???????? ??? createThread = Thread.currentThread();
???????? }
?
???????? public void run() {
???????? ??? // 预防这种情况:业务提供的Executor,让调用者线程来执行任务
???????? ??? if (createThread == Thread.currentThread()
?????????????????? ??? && callback.getExecutor() != DIYExecutor.getInstance()) {
?????????????????? StringBuilder sb = new StringBuilder();
?????????????????? sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");
?????????????????? sb.append("Can not callback task on the network io thhread.");
?????????????????? LOGGER.warn(sb.toString());
?????????????????? return;
???????? ??? }
?
???????? ??? if (TRConstants.RESULT_SUCCESS == resp.getResult()) {
?????????????????? callback.handleResponse(resp.getAppResponse()); //设置调用结果
???????? ??? }
???????? ??? else {
?????????????????? callback.onRemotingException(resp.getResult(), resp
??????????????????????????? .getErrorMsg());? //处理调用异常
???????? ??? }
???????? }
}
?
另外:
1,服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理
同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调