Fork me on GitHub

Dubbo扩展机制实现(三)之扩展点特性

前言

上一篇简单分析了Dubbo的扩展点机制的实现,以及其与java spi的区别与改进。

本篇文章准备从扩展点特性的角度分析一下源码。

扩展点自动装配

加载扩展点时,自动注入依赖的扩展点。加载扩展点时,扩展点实现类的成员如果为其它扩展点类型,ExtensionLoader 在会自动注入依赖的扩展点。ExtensionLoader 通过扫描扩展点实现类的所有 setter 方法来判定其成员。即 ExtensionLoader 会执行扩展点的拼装操作。

上一篇提到了Dubbo的ExtensionLoader提供了三种获取扩展点实现类的方式,其中的一种是根据名字获取扩展点实现:

public T getExtension(String name) {
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    if ("true".equals(name)) { // 判断是否是获取默认实现
        return getDefaultExtension();
    }
    Holder<Object> holder = cachedInstances.get(name); // 从缓存中取
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name); // 创建缓存实例
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

还是用到了缓存,先是判断是否取默认实例,再是从缓存取和设置缓存。接下来看一下创建扩展点的方法createExtension(name):

private T createExtension(String name) {
    Class<?> clazz = getExtensionClasses().get(name); // 加载当前Extension的所有实现, 并从中获取指定name的Extension
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz); // 从Extension实例缓存中获取实例
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        injectExtension(instance); // 注入扩展点信息
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

如上代码所示,createExtension(name)创建一个扩展点实例大致做了两件事:

  • 调用getExtensionClasses()加载了扩展点类,并通过名字获取到扩展点的Class类对象
  • 创建扩展点的Class类对象的实例,并调用injectExtension(instance) 注入扩展点信息
  1. getExtensionClasses()暂且不说,比较重要的一个方法,在获取自适应扩展点的时候也会用到此方法。
  2. injectExtension(instance) 注入扩展点信息,这里便展示了扩展点自动装配的特性。
    private T injectExtension(T instance) {
     try {
         if (objectFactory != null) {
             for (Method method : instance.getClass().getMethods()) {
                 // 处理所有set方法
                 if (method.getName().startsWith("set")
                         && method.getParameterTypes().length == 1
                         && Modifier.isPublic(method.getModifiers())) {
                     Class<?> pt = method.getParameterTypes()[0];
                     try {
                         // 获取setter对应的property名称
                         String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                         // 根据参数类型和属性名称,从 ExtensionFactory 里获取扩展点
                         Object object = objectFactory.getExtension(pt, property);
                         if (object != null) { // 如果不为空,则 setter 方法的参数是扩展点类型,那么进行注入
                             method.invoke(instance, object);
                         }
                     } catch (Exception e) {
                         logger.error("fail to inject via method " + method.getName()
                                 + " of interface " + type.getName() + ": " + e.getMessage(), e);
                     }
                 }
             }
         }
     } catch (Exception e) {
         logger.error(e.getMessage(), e);
     }
     return instance;
    }
    
    这里可以看到,扩展点自动注入就是根据setter方法对应的参数类型和property名称从ExtensionFactory中查询,如果有返回扩展点实例,那么就进行注入操作。

扩展点自适应

Dubbo扩展点有一个非常重要的概念:Adaptive.

ExtensionLoader 注入的依赖扩展点是一个 Adaptive 实例,直到扩展点方法执行时才决定调用是一个扩展点实现。

Dubbo 使用 URL 对象(包含了Key-Value)传递配置信息。

扩展点方法调用会有URL参数(或是参数有URL成员)

这样依赖的扩展点也可以从URL拿到配置信息,所有的扩展点自己定好配置的Key后,配置信息从URL上从最外层传入。URL在配置传递上即是一条总线。

来看看Dubbo获取自适应扩展点的方法:getAdaptiveExtension():

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get(); // 从缓存中获取自适应实例
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension(); // 创建自适应实例并缓存
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

又是缓存,看看如何创建一个自适应扩展点:createAdaptiveExtension()

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

injectExtension(instance)方法上面说明过了,是对扩展点自动装配。主要看getAdaptiveExtensionClass()方法:

private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses(); // 加载当前Extension的所有实现
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass(); // 动态创建自适应扩展类 Class 对象
}

getExtensionClasses()是个很重要的方法,三种获取扩展点实现的方法都会用到这个方法,这个稍后说明一下。这里主要看createAdaptiveExtensionClass():

private Class<?> createAdaptiveExtensionClass() {
    String code = createAdaptiveExtensionClassCode(); // 自适应扩展类拼装代码
    ClassLoader classLoader = findClassLoader();
    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader); // 动态编译
}

这里也是获取了Compiler接口的自适应扩展点AdaptiveCompiler的实现,由于实现里又compiler = loader.getDefaultExtension()获取了默认的扩展点,即JavassistCompiler的实例,来实现了动态编译。通过断点看一下通过 javassist 生成的实现类长啥样,以Protocol的自适应扩展点来看(debug打印后格式化并做了注释方便看):

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        // 从url中获取扩展点名称,如果没有就赋值为默认的值
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        // 通过名字获取扩展点实现
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        // 从url中获取扩展点名称,如果没有就赋值为默认的值
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        // 通过名字获取扩展点实现
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

从上可以看到,确实正如Dubbo所描述的那样,通过url传递配置信息。

扩展点自动包装

在官方文档中有如下说明:

自动包装扩展点的 Wrapper 类。ExtensionLoader 在加载扩展点时,如果加载到的扩展点有拷贝构造函数,则判定为扩展点 Wrapper 类。

在源码里的体现,在加载扩展点文件 loadFile(Map<String, Class<?>> extensionClasses, String dir) 里会根据构造器函数进行判断是否是 Wrapper类:

try {
    clazz.getConstructor(type); // 判断是否 Wrapper 类型
    Set<Class<?>> wrappers = cachedWrapperClasses;
    if (wrappers == null) {
        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
        wrappers = cachedWrapperClasses;
    }
    wrappers.add(clazz);
} catch (NoSuchMethodException e) { // 非 Wrapper 类型
    ...
}

其次在创建扩展点实例的时候也会根据是否是 Wrapper 类来创建相应的扩展点,这在createExtension(String name)中的体现:

Set<Class<?>> wrapperClasses = cachedWrapperClasses; // 获取缓存的Wrapper类集合
// 如果是包装类则创建包装类扩展点实例
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
    for (Class<?> wrapperClass : wrapperClasses) {
        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
    }
}
return instance;

那么,为什么要使用 Wrapper 类呢?

Wrapper 类同样实现了扩展点接口,但是 Wrapper 不是扩展点的真正实现。它的用途主要是用于从 ExtensionLoader返回扩展点时,包装在真正的扩展点实现外。即从 ExtensionLoader 中返回的实际上是 Wrapper 类的实例,Wrapper 持有了实际的扩展点实现类。

扩展点的 Wrapper 类可以有多个,也可以根据需要新增。

通过 Wrapper 类可以把所有扩展点公共逻辑移至 Wrapper 中。新加的 Wrapper 在所有的扩展点上添加了逻辑,有些类似 AOP,即 Wrapper 代理了扩展点。

从这里看 Dubbo 的aop实际上是装饰者设计模式 + 自适应特性的动态代理。

举个例子,在之前写Dubbo暴露过程的源码中对Protocol接口的调用过程进行了分析,在ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();获取Protocol自适应扩展点的时序是ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol

在真正暴露服务之前,对此进行了一些额外的扩展操作,通过这些层层包装使得各个类逻辑分明,代码维护性高。

扩展点自动激活

先看官方文档:

对于集合类扩展点,比如:Filter, InvokerListener, ExportListener, TelnetHandler, StatusChecker 等,可以同时加载多个实现,此时,可以用自动激活来简化配置。

Dubbo实现自动激活的核心关键词:Activate

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /**
     * 根据group匹配当前扩展点
     *
     * @return 匹配的group名称
     */
    String[] group() default {};

    /**
     * 当URL上的参数包含指定的keys时,激活当前的扩展点
     * <p>
     * 举个栗子, 当使用 <code>@Activate("cache, validation")</code>, 当URL参数上包含<code>cache</code> 或 <code>validation</code> 时,当前扩展点才会被激活
     * </p>
     *
     * @return URL 参数上的key值
     */
    String[] value() default {};

    /**
     * 相对排序信息, 可选
     *
     * @return 应当放在当前扩展点之前的扩展点列表
     */
    String[] before() default {};

    /**
     * 相对排序信息, 可选
     *
     * @return 应当放在当前扩展点之前的扩展点列表
     */
    String[] after() default {};

    /**
     * 绝对排序信息, 可选
     *
     * @return 绝对排序信息
     */
    int order() default 0;
}

总之就是根据注解的 valuegroup 两个属性来决定是否激活。
比如CacheFilter

@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {}

当满足条件:

  • 服务提供者 和 服务消费者
  • url上的参数包含cache

则激活 CacheFilter

那么,Dubbo如何使用 Activate 呢?
没错,一定还记得那个方法:getActivateExtensiongetActivateExtension有多个重写的方法,但实际最终会调用到如下:

public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();
    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        getExtensionClasses(); // 此处缓存了 cachedActivates
        for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
            String name = entry.getKey(); // 获取 可激活的扩展点的spi扩展名
            Activate activate = entry.getValue();
            if (isMatchGroup(group, activate.group())) { // 如果group匹配
                T ext = getExtension(name); // 根据扩展点名称获取扩展点实例
                // name不在 values 指定的列,且没排除name,且url上有activate的value,则激活
                if (!names.contains(name)
                        && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                        && isActive(activate, url)) {
                    exts.add(ext);
                }
            }
        }
        Collections.sort(exts, ActivateComparator.COMPARATOR); // 排序
    }
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i++) { // 指定使用values的时候
        String name = names.get(i);
        // 所有未被排除的扩展名
        if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
            if (Constants.DEFAULT_KEY.equals(name)) {
                if (!usrs.isEmpty()) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                T ext = getExtension(name);
                usrs.add(ext);
            }
        }
    }
    if (!usrs.isEmpty()) {
        exts.addAll(usrs);
    }
    return exts;
}

关于Activate的使用场景:当需要提供一组需要指定条件的扩展点并使用的时候。比如在ProtocolFilterWrapperbuildInvokerChain里构建一组Filter时,Dubbo是这么处理的:

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

扩展点的一些核心方法

在整个扩展点源码里,有一些核心的方法贯穿整个ExtensionLoader。

首先是之前常见得getExtensionClasses() 这个方法里进行了:

  • 缓存扩展点Class
  • 从指定文件路径加载扩展点文件
  • 创建扩展点

getExtensionClasses()里的核心方法是loadExtensionClasses.

private Map<String, Class<?>> loadExtensionClasses() {
    final SPI defaultAnnotation = type.getAnnotation(SPI.class); // 先获取 @SPI 注解中的默认值
    if (defaultAnnotation != null) {
        // 如果 @SPI 注解存在 value 默认值, 赋值给 cachedDefaultName 属性
        String value = defaultAnnotation.value();
        if (value != null && (value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) { // 每个扩展点实现只能配置一个名字
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if (names.length == 1) cachedDefaultName = names[0];
        }
    }

    // 从配置路径中加载扩展实现类
    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    loadFile(extensionClasses, DUBBO_DIRECTORY);
    loadFile(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

真正开始加载扩展点文件的方法:loadFile.下面,简单粗暴的贴源码:

private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
    String fileName = dir + type.getName(); // 获取文件路径名
    try {
        Enumeration<java.net.URL> urls;
        ClassLoader classLoader = findClassLoader(); // 获取了类加载器
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL url = urls.nextElement();
                try {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); // 也是以utf-8方式读取配置文件
                    try {
                        String line = null;
                        while ((line = reader.readLine()) != null) { // 解析每行数据
                            final int ci = line.indexOf('#');
                            if (ci >= 0) line = line.substring(0, ci);
                            line = line.trim();
                            if (line.length() > 0) { // 非注释内容
                                try {
                                    String name = null;
                                    int i = line.indexOf('=');
                                    if (i > 0) {
                                        name = line.substring(0, i).trim(); // 配置中的 key
                                        line = line.substring(i + 1).trim(); // 配置中的 value
                                    }
                                    if (line.length() > 0) {
                                        Class<?> clazz = Class.forName(line, true, classLoader); // 获取class对象
                                        if (!type.isAssignableFrom(clazz)) {
                                            throw new IllegalStateException("Error when load extension class(interface: " +
                                                    type + ", class line: " + clazz.getName() + "), class "
                                                    + clazz.getName() + "is not subtype of interface.");
                                        }
                                        if (clazz.isAnnotationPresent(Adaptive.class)) { // 如果注解了@Adaptive
                                            if (cachedAdaptiveClass == null) {
                                                cachedAdaptiveClass = clazz; // 缓存 cachedAdaptiveClass
                                            } else if (!cachedAdaptiveClass.equals(clazz)) { // 只允许一个 Adaptive 实现
                                                throw new IllegalStateException("More than 1 adaptive class found: "
                                                        + cachedAdaptiveClass.getClass().getName()
                                                        + ", " + clazz.getClass().getName());
                                            }
                                        } else {
                                            try {
                                                clazz.getConstructor(type); // 判断是否 Wrapper 类型
                                                Set<Class<?>> wrappers = cachedWrapperClasses;
                                                if (wrappers == null) {
                                                    cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                    wrappers = cachedWrapperClasses;
                                                }
                                                wrappers.add(clazz);
                                            } catch (NoSuchMethodException e) { // 非 Wrapper 类型
                                                clazz.getConstructor(); // 获取class对象的无参构造器
                                                if (name == null || name.length() == 0) {
                                                    name = findAnnotationName(clazz);
                                                    if (name == null || name.length() == 0) {
                                                        if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                            name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                        } else {
                                                            throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                        }
                                                    }
                                                }
                                                String[] names = NAME_SEPARATOR.split(name);
                                                if (names != null && names.length > 0) {
                                                    Activate activate = clazz.getAnnotation(Activate.class);
                                                    if (activate != null) {
                                                        cachedActivates.put(names[0], activate); // 缓存 cachedActivates
                                                    }
                                                    for (String n : names) {
                                                        if (!cachedNames.containsKey(clazz)) {
                                                            cachedNames.put(clazz, n); // 缓存 cachedNames ,每个class只对应一个名称
                                                        }
                                                        Class<?> c = extensionClasses.get(n);
                                                        if (c == null) {
                                                            extensionClasses.put(n, clazz); // 放入到extensionClasses中,多个 name 可能对应一个Class
                                                        } else if (c != clazz) { // 重复抛异常
                                                            throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                } catch (Throwable t) {
                                    IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                    exceptions.put(line, e);
                                }
                            }
                        } // end of while read lines
                    } finally {
                        reader.close();
                    }
                } catch (Throwable t) {
                    logger.error("Exception when load extension class(interface: " +
                            type + ", class file: " + url + ") in " + url, t);
                }
            } // end of while urls
        }
    } catch (Throwable t) {
        logger.error("Exception when load extension class(interface: " +
                type + ", description file: " + fileName + ").", t);
    }
}

文件加载完成后,几个实例被缓存了:

  • cachedAdaptiveClass(自适应扩展点)
  • cachedWrapperClasses(扩展点包装类)
  • cachedActivates(扩展点激活类)
  • cachedNames(Class->name映射)

在获取扩展点的三个方法中会常使用缓存了的数据,由此可见Dubbo在这里的缓存优化。

结束语

至此,扩展点机制大致介绍完毕,自己从源码中也体会了在扩展设计上原来还可以这么玩,收益匪浅了。

最后做个总结:

  • 从代码层面上对扩展点的四个特性进行了分析
  • 三种获取扩展点的方式相互结合,分别体现了不同的扩展点特点
  • 代码层面合理的设计模式(装饰器模式,动态代理)对代码分层解耦
-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%