前言
上一篇简单分析了Dubbo的扩展点机制的实现,以及其与java spi的区别与改进。
本篇文章准备从扩展点特性的角度分析一下源码。
扩展点自动装配
加载扩展点时,自动注入依赖的扩展点。加载扩展点时,扩展点实现类的成员如果为其它扩展点类型,ExtensionLoader 在会自动注入依赖的扩展点。ExtensionLoader 通过扫描扩展点实现类的所有 setter 方法来判定其成员。即 ExtensionLoader 会执行扩展点的拼装操作。
上一篇提到了Dubbo的ExtensionLoader提供了三种获取扩展点实现类的方式,其中的一种是根据名字获取扩展点实现:
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) { // 判断是否是获取默认实现
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name); // 从缓存中取
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name); // 创建缓存实例
holder.set(instance);
}
}
}
return (T) instance;
}
还是用到了缓存,先是判断是否取默认实例,再是从缓存取和设置缓存。接下来看一下创建扩展点的方法createExtension(name)
:
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name); // 加载当前Extension的所有实现, 并从中获取指定name的Extension
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz); // 从Extension实例缓存中获取实例
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance); // 注入扩展点信息
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
如上代码所示,createExtension(name)
创建一个扩展点实例大致做了两件事:
- 调用
getExtensionClasses()
加载了扩展点类,并通过名字获取到扩展点的Class类对象 - 创建扩展点的Class类对象的实例,并调用
injectExtension(instance)
注入扩展点信息
getExtensionClasses()
暂且不说,比较重要的一个方法,在获取自适应扩展点的时候也会用到此方法。injectExtension(instance)
注入扩展点信息,这里便展示了扩展点自动装配的特性。
这里可以看到,扩展点自动注入就是根据private T injectExtension(T instance) { try { if (objectFactory != null) { for (Method method : instance.getClass().getMethods()) { // 处理所有set方法 if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) { Class<?> pt = method.getParameterTypes()[0]; try { // 获取setter对应的property名称 String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; // 根据参数类型和属性名称,从 ExtensionFactory 里获取扩展点 Object object = objectFactory.getExtension(pt, property); if (object != null) { // 如果不为空,则 setter 方法的参数是扩展点类型,那么进行注入 method.invoke(instance, object); } } catch (Exception e) { logger.error("fail to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; }
setter
方法对应的参数类型和property名称从ExtensionFactory
中查询,如果有返回扩展点实例,那么就进行注入操作。
扩展点自适应
Dubbo扩展点有一个非常重要的概念:Adaptive
.
ExtensionLoader
注入的依赖扩展点是一个Adaptive
实例,直到扩展点方法执行时才决定调用是一个扩展点实现。Dubbo 使用 URL 对象(包含了Key-Value)传递配置信息。
扩展点方法调用会有URL参数(或是参数有URL成员)
这样依赖的扩展点也可以从URL拿到配置信息,所有的扩展点自己定好配置的Key后,配置信息从URL上从最外层传入。URL在配置传递上即是一条总线。
来看看Dubbo获取自适应扩展点的方法:getAdaptiveExtension()
:
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get(); // 从缓存中获取自适应实例
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension(); // 创建自适应实例并缓存
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
又是缓存,看看如何创建一个自适应扩展点:createAdaptiveExtension()
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
injectExtension(instance)
方法上面说明过了,是对扩展点自动装配。主要看getAdaptiveExtensionClass()
方法:
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses(); // 加载当前Extension的所有实现
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass(); // 动态创建自适应扩展类 Class 对象
}
getExtensionClasses()
是个很重要的方法,三种获取扩展点实现的方法都会用到这个方法,这个稍后说明一下。这里主要看createAdaptiveExtensionClass()
:
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode(); // 自适应扩展类拼装代码
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader); // 动态编译
}
这里也是获取了Compiler
接口的自适应扩展点AdaptiveCompiler
的实现,由于实现里又compiler = loader.getDefaultExtension()
获取了默认的扩展点,即JavassistCompiler
的实例,来实现了动态编译。通过断点看一下通过 javassist 生成的实现类长啥样,以Protocol
的自适应扩展点来看(debug打印后格式化并做了注释方便看):
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
// 从url中获取扩展点名称,如果没有就赋值为默认的值
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
// 通过名字获取扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
// 从url中获取扩展点名称,如果没有就赋值为默认的值
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
// 通过名字获取扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
从上可以看到,确实正如Dubbo所描述的那样,通过url传递配置信息。
扩展点自动包装
在官方文档中有如下说明:
自动包装扩展点的 Wrapper 类。
ExtensionLoader
在加载扩展点时,如果加载到的扩展点有拷贝构造函数,则判定为扩展点 Wrapper 类。
在源码里的体现,在加载扩展点文件 loadFile(Map<String, Class<?>> extensionClasses, String dir)
里会根据构造器函数进行判断是否是 Wrapper类:
try {
clazz.getConstructor(type); // 判断是否 Wrapper 类型
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) { // 非 Wrapper 类型
...
}
其次在创建扩展点实例的时候也会根据是否是 Wrapper 类来创建相应的扩展点,这在createExtension(String name)
中的体现:
Set<Class<?>> wrapperClasses = cachedWrapperClasses; // 获取缓存的Wrapper类集合
// 如果是包装类则创建包装类扩展点实例
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
那么,为什么要使用 Wrapper 类呢?
Wrapper 类同样实现了扩展点接口,但是 Wrapper 不是扩展点的真正实现。它的用途主要是用于从
ExtensionLoader
返回扩展点时,包装在真正的扩展点实现外。即从ExtensionLoader
中返回的实际上是 Wrapper 类的实例,Wrapper 持有了实际的扩展点实现类。扩展点的 Wrapper 类可以有多个,也可以根据需要新增。
通过 Wrapper 类可以把所有扩展点公共逻辑移至 Wrapper 中。新加的 Wrapper 在所有的扩展点上添加了逻辑,有些类似 AOP,即 Wrapper 代理了扩展点。
从这里看 Dubbo 的aop
实际上是装饰者设计模式 + 自适应特性的动态代理。
举个例子,在之前写Dubbo暴露过程的源码中对Protocol
接口的调用过程进行了分析,在ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
获取Protocol自适应扩展点的时序是ProtocolListenerWrapper
->ProtocolFilterWrapper
->DubboProtocol
。
在真正暴露服务之前,对此进行了一些额外的扩展操作,通过这些层层包装使得各个类逻辑分明,代码维护性高。
扩展点自动激活
先看官方文档:
对于集合类扩展点,比如:Filter, InvokerListener, ExportListener, TelnetHandler, StatusChecker 等,可以同时加载多个实现,此时,可以用自动激活来简化配置。
Dubbo实现自动激活的核心关键词:Activate
。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* 根据group匹配当前扩展点
*
* @return 匹配的group名称
*/
String[] group() default {};
/**
* 当URL上的参数包含指定的keys时,激活当前的扩展点
* <p>
* 举个栗子, 当使用 <code>@Activate("cache, validation")</code>, 当URL参数上包含<code>cache</code> 或 <code>validation</code> 时,当前扩展点才会被激活
* </p>
*
* @return URL 参数上的key值
*/
String[] value() default {};
/**
* 相对排序信息, 可选
*
* @return 应当放在当前扩展点之前的扩展点列表
*/
String[] before() default {};
/**
* 相对排序信息, 可选
*
* @return 应当放在当前扩展点之前的扩展点列表
*/
String[] after() default {};
/**
* 绝对排序信息, 可选
*
* @return 绝对排序信息
*/
int order() default 0;
}
总之就是根据注解的 value
和 group
两个属性来决定是否激活。
比如CacheFilter
:
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {}
当满足条件:
- 服务提供者 和 服务消费者
- url上的参数包含
cache
则激活 CacheFilter
那么,Dubbo如何使用 Activate
呢?
没错,一定还记得那个方法:getActivateExtension
。 getActivateExtension
有多个重写的方法,但实际最终会调用到如下:
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses(); // 此处缓存了 cachedActivates
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey(); // 获取 可激活的扩展点的spi扩展名
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) { // 如果group匹配
T ext = getExtension(name); // 根据扩展点名称获取扩展点实例
// name不在 values 指定的列,且没排除name,且url上有activate的value,则激活
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR); // 排序
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) { // 指定使用values的时候
String name = names.get(i);
// 所有未被排除的扩展名
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
关于Activate
的使用场景:当需要提供一组需要指定条件的扩展点并使用的时候。比如在ProtocolFilterWrapper
的buildInvokerChain
里构建一组Filter
时,Dubbo是这么处理的:
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
扩展点的一些核心方法
在整个扩展点源码里,有一些核心的方法贯穿整个ExtensionLoader。
首先是之前常见得getExtensionClasses()
这个方法里进行了:
- 缓存扩展点Class
- 从指定文件路径加载扩展点文件
- 创建扩展点
getExtensionClasses()
里的核心方法是loadExtensionClasses
.
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class); // 先获取 @SPI 注解中的默认值
if (defaultAnnotation != null) {
// 如果 @SPI 注解存在 value 默认值, 赋值给 cachedDefaultName 属性
String value = defaultAnnotation.value();
if (value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) { // 每个扩展点实现只能配置一个名字
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
// 从配置路径中加载扩展实现类
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
真正开始加载扩展点文件的方法:loadFile
.下面,简单粗暴的贴源码:
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName(); // 获取文件路径名
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader(); // 获取了类加载器
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); // 也是以utf-8方式读取配置文件
try {
String line = null;
while ((line = reader.readLine()) != null) { // 解析每行数据
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) { // 非注释内容
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim(); // 配置中的 key
line = line.substring(i + 1).trim(); // 配置中的 value
}
if (line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader); // 获取class对象
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) { // 如果注解了@Adaptive
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz; // 缓存 cachedAdaptiveClass
} else if (!cachedAdaptiveClass.equals(clazz)) { // 只允许一个 Adaptive 实现
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else {
try {
clazz.getConstructor(type); // 判断是否 Wrapper 类型
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) { // 非 Wrapper 类型
clazz.getConstructor(); // 获取class对象的无参构造器
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate); // 缓存 cachedActivates
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n); // 缓存 cachedNames ,每个class只对应一个名称
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz); // 放入到extensionClasses中,多个 name 可能对应一个Class
} else if (c != clazz) { // 重复抛异常
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
文件加载完成后,几个实例被缓存了:
- cachedAdaptiveClass(自适应扩展点)
- cachedWrapperClasses(扩展点包装类)
- cachedActivates(扩展点激活类)
- cachedNames(Class->name映射)
在获取扩展点的三个方法中会常使用缓存了的数据,由此可见Dubbo在这里的缓存优化。
结束语
至此,扩展点机制大致介绍完毕,自己从源码中也体会了在扩展设计上原来还可以这么玩,收益匪浅了。
最后做个总结:
- 从代码层面上对扩展点的四个特性进行了分析
- 三种获取扩展点的方式相互结合,分别体现了不同的扩展点特点
- 代码层面合理的设计模式(装饰器模式,动态代理)对代码分层解耦