前言
在Dubbo服务引用之ReferenceConfig中看到在引用协议订阅远程服务的过程中涉及到这么一个接口Directory
。这个东东是做什么的呢?
回顾时序图
之前说到了拿到引用配置ReferenceConfig
类,并且调用了RegistryProtocol
的refer
方法,接下来在手撕源码前,先超前的看看后面还要经历哪些环节:Directory
,Cluster
,Protocol
,Invoker
。。。(省略)。到这里可以根据源码结构找到关键的接口Directory
,Cluster
,这俩都是dubbo-cluster
下的接口,一起理解会比较方便,在细嚼源码前看看开发者文档有没有给我们一些帮助。
可以找到官网介绍图:
从图中大致了解到各个接口的作用,今天要分析的Directory
主要作用是服务目录列表。
Directory
先看官方文档怎么说:
Directory
代表多个Invoker
,可以把它看成List<Invoker>
,但与List
不同的是,它的值可能是动态变化的,比如注册中心推送变更
看下接口设计:
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
就俩方法:
- 获取服务类型
- 获取invoker列表
看一下其实现的子类: 再结合官网介绍图可知主要实现有StaticDirectory
,RegistryDirectory
。模板类是AbstractDirectory
.
AbstractDirectory
看名便知,提供了抽象的Directory
实现,也是模板方法模式的体现。 doList
方法交由具体的子类实现。父类(即此类)实现了模板方法list
。
list方法:
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 从子类的实现中获取调用者列表
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
// 路由过滤,获取匹配的调用者列表
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
可见主要作用就是找到真的匹配的invoker
列表。 此处涉及接口Router
,之后分析。
StaticDirectory
主要还是看doList
的实现,这是区分不同Directory
的根本方法。
StaticDirectory#doList:
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return invokers;
}
惊不惊喜?意不意外?没错,就是这么简单,因此也和这个实现的名称也想对应了,“静态的服务目录”,调用者列表不会动态改变。它的invokers
主要还是通过构造器传入:
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
if (invokers == null || invokers.isEmpty())
throw new IllegalArgumentException("invokers == null");
this.invokers = invokers;
}
此类的应用本人才疏学浅,暂时没有看到,存在即合理,之后看到再补充吧。
RegistryDirectory
这个子类实现应该是应用最广泛的一个了。
RegistryDirectory#doList:
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 下面这个异常简直不要太熟悉了
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation); // 获取服务提供者方法名称
Object[] args = RpcUtils.getArguments(invocation); // 获取调用参数
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
工作流程: 可以看到主要是从methodInvokerMap
中获取。 先尝试从map中获取key为方法名.参数0
的invokers,没找到再尝试从map中获取key为方法名
的invokers。没找到再尝试从map中获取key为*
的invokers。都找不到就遍历map找到最后的一个invokers。
那么问题来了,获取invokers
是从methodInvokerMap
中获取,那么methodInvokerMap
是怎么来的呢。
借助IDE找到methodInvokerMap
的赋值处,是在refreshInvoker
方法中。refreshInvoker
又是在notify
方法里调用。现在问题来了,notify
方法何时何地调用?
我们可以注意到:
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener`。
RegistryDirectory
实现了NotifyListener
,并实现了notify
方法。方便起见,利用IDE生成一个类图:
关于在哪里调用又借助一下IDE: 可以确定的是在注册服务处调用的,这里先不细看。
到这里再细看一下notify
方法:
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
// 清除不存在注册中心的数据
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 空协议禁止访问
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
// invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls); // 使用缓存的url
} else {
// 更新缓存
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 是否存在group?是则对method对应的invoker进行cluster伪装
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
notify
方法主要就是对url进行分类处理,分为provider
,configurators
,routers
三类url。然后再使用provider
类的url调用refreshInvoker
进行增量刷新.
refreshInvoker
主要根据url协议过滤不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,并且这些invoker会利用缓存防止重复创建。
总结
到这里,基本Directory的核心方法都已经研究完,尤其是RegistryDirectory,它涉及动态更新invokerUrls
,核心在于注册中心的通知与监听。dubbo的路由层还有Cluster
,Router
以及LoadBalance
。留到明天再看!不知不觉已经凌晨,一看源码就容易上头哈~