服务端中接口实现到Invoker的转换源码解析
前面说过有三种类型的Invoker:
- 服务端本地执行的invoker
- 远程通信执行的invoker
- 集群版的远程执行invoker
服务端发布服务的第一个过程就是将接口实现转换成Invoker,此处的invoker就是服务端本地执行的invoker,即AbstractProxyInvoker抽象类。
1 | Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); |
proxyFactory采用SPI机制,此处为扩展代理类,以下以jdk代理为例说明转换过程,所以代理内部采用JdkProxyFactory实现。
1 | // 抽象父类AbstractProxyFactory中无相关实现代码 |
在JdkProxyFactory的getInvoker()方法中直接new了一个AbstractProxyInvoker返回,并实现了doInvoke()抽象方法,该方法在执行返回的invoker的invoke()方法时回调,AbstractProxyInvoker的invoke()方法源码如下:
1 | public Result invoke(Invocation invocation) throws RpcException { |
所以这样把该invoker的具体执行逻辑转移到了AbstractProxyInvoker的抽象方法中,在JdkProxyFactory的getInvoker()方法中new AbstractProxyInvoker时被实现,其实现如上,通过反射执行接口实现相应方法。
通过协议Protocol建立监听服务
以上使用ProxyFactory将接口实现HelloServiceImpl封装成一个本地执行的Invoker。
执行这个服务,即执行这个本地Invoker,即调用这个本地Invoker的invoke(Invocation invocation)方法,方法的执行过程就是通过反射执行了HelloServiceImpl的内容。现在的问题是:这个方法的参数 Invocation invocation的来源问题。
针对server端来说,Protocol要解决的问题就是:根据指定协议对外公布这个HelloService服务,当客户端根据协议调用这个服 务时,将客户端传递过来的Invocation参数交给上述的Invoker来执行。所以Protocol加入了远程通信协议的这一块,根据客户端的请求 来获取参数Invocation invocation。
服务发布的第二个过程就是将invoker转换成Exporter,并建立监听服务
1 | Exporter<?> exporter = protocol.export(invoker); |
protocol采用SPI机制,此处为protocol接口的扩展代理实现,export(Invoker invoker)的过程即根据Invoker中url的配置信息来最终选择的Protocol实现,默认实现是”dubbo”的扩展实现即 DubboProtocol,然后再对DubboProtocol进行依赖注入,进行wrap包装。
在返回DubboProtocol之前,经过了ProtocolFilterWrapper、ProtocolListenerWrapper、RegistryProtocol的包装。使用装饰器模式,类似AOP的功能。
下面主要讲解RegistryProtocol和DubboProtocol,先暂时忽略ProtocolFilterWrapper、ProtocolListenerWrapper。
protocol的export()干了哪些事呢?
- 先提一下Exporter, Exporter前面已经提过,负责维护invoker的生命周期。该接口定义如下:
1 | public interface Exporter<T> { |
- 会先经过RegistryProtocol的export()
- 利用内部的Protocol即DubboProtocol,将服务进行导出,如下
1 | final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); |
- 根据注册中心的registryUrl获取注册服务Registry,然后将serviceUrl注册到注册中心上,供客户端订阅注意区分registryUrl和serviceUrl,前者是注册中心url,后者服务提供者url
1
2
3
4//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
- 下面来详细看看上述DubboProtocol的服务导出功能:
- 创建一个DubboExporter,封装invoker。然后根据url的port、path(接口的名称)、版本号、分组号作为 key,将DubboExporter存至Map<String, Exporter<?>> exporterMap中
1
2
3
4
5
6
7// DubboProtocol中成员变量
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter); - 首先根据Invoker的url获取ExchangeServer通信对象(负责与客户端的通信模块),以url中的host和port作 为key存至Map<String, ExchangeServer> serverMap中。既可以采用全部服务的通信交给这一个ExchangeServer通信对象,也可以某些服务单独使用新的 ExchangeServer通信对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51// DubboProtocol中成员变量
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
// DubboProtocol中的export()方法中
openServer(url);
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
// ExchangeServer不存在,调用createServer创建
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 绑定监听,并设置接收请求处理器requestHandler,返回server
// ExchangeServer为通信层相关概念,后面单独开篇介绍
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
现在我们要搞清楚我们的目的:通过通信对象获取客户端传来的Invocation invocation参数,然后找到对应的DubboExporter(即能够获取到本地Invoker)就可以执行服务了。
上述每一个ExchangeServer通信对象都绑定了一个ExchangeHandler requestHandler对象,内容简略如下:
1 | // DubboProtocol类中成员变量 |
可以看到在获取到Invocation参数后,调用getInvoker(channel, inv)来获取本地Invoker。获取过程就是根据channel获取port,根据Invocation inv信息获取要调用的服务接口、版本号、分组号等,以此组装成key,从上述Map<String, Exporter<?>> exporterMap中获取Exporter,然后就可以找到对应的Invoker了,就可以顺利的调用服务了。
而对于通信这一块,接下来会专门来详细的说明。