Fork me on GitHub

Dubbo路由层之Directory

前言

Dubbo服务引用之ReferenceConfig中看到在引用协议订阅远程服务的过程中涉及到这么一个接口Directory。这个东东是做什么的呢?

回顾时序图

之前说到了拿到引用配置ReferenceConfig类,并且调用了RegistryProtocolrefer方法,接下来在手撕源码前,先超前的看看后面还要经历哪些环节:Directory,Cluster,Protocol,Invoker。。。(省略)。到这里可以根据源码结构找到关键的接口Directory,Cluster,这俩都是dubbo-cluster下的接口,一起理解会比较方便,在细嚼源码前看看开发者文档有没有给我们一些帮助。

可以找到官网介绍图:

从图中大致了解到各个接口的作用,今天要分析的Directory主要作用是服务目录列表。

Directory

先看官方文档怎么说:

Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更

看下接口设计:

public interface Directory<T> extends Node {

    /**
     * get service type.
     *
     * @return service type.
     */
    Class<T> getInterface();

    /**
     * list invokers.
     *
     * @return invokers
     */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

}

就俩方法:

  • 获取服务类型
  • 获取invoker列表

看一下其实现的子类: 再结合官网介绍图可知主要实现有StaticDirectory,RegistryDirectory。模板类是AbstractDirectory.

AbstractDirectory

看名便知,提供了抽象的Directory实现,也是模板方法模式的体现。 doList方法交由具体的子类实现。父类(即此类)实现了模板方法list

list方法:

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 从子类的实现中获取调用者列表
    List<Invoker<T>> invokers = doList(invocation);
    List<Router> localRouters = this.routers; // local reference
    // 路由过滤,获取匹配的调用者列表
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

可见主要作用就是找到真的匹配的invoker列表。 此处涉及接口Router,之后分析。

StaticDirectory

主要还是看doList的实现,这是区分不同Directory的根本方法。

StaticDirectory#doList:

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

    return invokers;
}

惊不惊喜?意不意外?没错,就是这么简单,因此也和这个实现的名称也想对应了,“静态的服务目录”,调用者列表不会动态改变。它的invokers主要还是通过构造器传入:

public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
    super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
    if (invokers == null || invokers.isEmpty())
        throw new IllegalArgumentException("invokers == null");
    this.invokers = invokers;
}

此类的应用本人才疏学浅,暂时没有看到,存在即合理,之后看到再补充吧。

RegistryDirectory

这个子类实现应该是应用最广泛的一个了。

RegistryDirectory#doList:

public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 下面这个异常简直不要太熟悉了
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                    + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
    }
    List<Invoker<T>> invokers = null;
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation); // 获取服务提供者方法名称
        Object[] args = RpcUtils.getArguments(invocation); // 获取调用参数
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

工作流程: 可以看到主要是从methodInvokerMap中获取。 先尝试从map中获取key为方法名.参数0的invokers,没找到再尝试从map中获取key为方法名的invokers。没找到再尝试从map中获取key为*的invokers。都找不到就遍历map找到最后的一个invokers。

那么问题来了,获取invokers是从methodInvokerMap中获取,那么methodInvokerMap是怎么来的呢。

借助IDE找到methodInvokerMap的赋值处,是在refreshInvoker方法中。refreshInvoker又是在notify方法里调用。现在问题来了,notify方法何时何地调用?

我们可以注意到:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener`。

RegistryDirectory实现了NotifyListener,并实现了notify方法。方便起见,利用IDE生成一个类图:

关于在哪里调用又借助一下IDE: 可以确定的是在注册服务处调用的,这里先不细看。

到这里再细看一下notify方法:

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // merge override parameters
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
    // 清除不存在注册中心的数据
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 空协议禁止访问
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        // invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls); // 使用缓存的url
        } else {
            // 更新缓存
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // 是否存在group?是则对method对应的invoker进行cluster伪装
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

notify方法主要就是对url进行分类处理,分为providerconfiguratorsrouters三类url。然后再使用provider类的url调用refreshInvoker进行增量刷新.

refreshInvoker主要根据url协议过滤不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,并且这些invoker会利用缓存防止重复创建。

总结

到这里,基本Directory的核心方法都已经研究完,尤其是RegistryDirectory,它涉及动态更新invokerUrls,核心在于注册中心的通知与监听。dubbo的路由层还有Cluster,Router以及LoadBalance。留到明天再看!不知不觉已经凌晨,一看源码就容易上头哈~

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%