客户端同步调用和异步调用的实现 首先设想一下我们目前的通信方式,使用netty mina等异步事件驱动的通信框架,将Channel中信息都分发到Handler中去处理了,Handler中的send方法只负责不断的发送消息,receive方法只负责不断接收消息,这时候就产生一个问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); // Client的子接口 ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
1 2 3 4 // 同步时返回值 String result1 = helloService.hello("World"); // 异步时获取返回值 RpcContext.getContext().getFuture().get();
异步请求的整个实现过程 然后我们来看下异步请求的整个实现过程,即上述ExchangeClient的request方法的具体内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 // currentClient的实现类为HeaderExchangeClient // HeaderExchangeClient的channel成员变量 private final ExchangeChannel channel; // HeaderExchangeClient的request()方法 public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); } // ExchangeChannel接口的实现类为HeaderExchangeChannel // HeaderExchangeChannel类的request()方法 public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 此处channel的为com.alibaba.dubbo.remoting.Channel // 其实现为NettyChannel channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
创建出一个request对象,创建过程中就自动产生了requestId,如下1 2 3 4 5 6 7 8 9 10 11 // Request的无参构造方法 public Request() { mId = newId(); } // newId()方法 private static final AtomicLong INVOKE_ID = new AtomicLong(0); private static long newId() { // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID return INVOKE_ID.getAndIncrement(); }
1 2 // DefaultFuture中的成员变量,key就是请求id private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
1 2 3 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 // DefaultFuture类get()方法 public Object get() throws RemotingException { return get(timeout); } // 锁的条件 private final Condition done = lock.newCondition(); // 存放服务端响应对象 private volatile Response response; public boolean isDone() { return response != null; } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } // 1. isDone()中判读response != null if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 2. 阻塞等待超时后while循环 // 直至isDone()为true,即response != null done.await(timeout, TimeUnit.MILLISECONDS); <1> if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 // HeaderExchangeHandler类的received()方法 public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { // 1. 当相应的message是Response时处理 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { // 2. 将Response响应对象传给DefaultFuture DefaultFuture.received(channel, response); } } public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 3. 去doReceived() future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } // 4. 锁定将相应对象赋值给response成员变量 private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { // 5. 为response赋完值 // 通过done锁条件发送信号释放get()方法处的阻塞 // 从而get()方法返回服务端执行结果 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
1 2 3 4 5 6 String result1 = helloService.hello("World"); String result2 = helloService.hello("java"); System.out.println("result :"+result1); System.out.println("result :"+result2); System.out.println("result : "+RpcContext.getContext().getFuture().get()); System.out.println("result : "+RpcContext.getContext().getFuture().get());
1 2 3 4 5 6 7 8 String result1 = helloService.hello("World"); Future<String> result1Future=RpcContext.getContext().getFuture(); String result2 = helloService.hello("java"); Future<String> result2Future=RpcContext.getContext().getFuture(); System.out.println("result :"+result1); System.out.println("result :"+result2); System.out.println("result : "+result1Future.get()); System.out.println("result : "+result2Future.get());