dubbo中的SPI

昨天写完了dubbo源码的一个粗略浏览的笔记,其中有一块个人觉得理解的不是很透彻,就是SPI的部分。由于之前一直做Android的原因,对SPI的了解比较少,只是知道ServiceLoader的存在,并且slf4j用的就是ServiceLoader(还有美团的组件化,哈哈哈),今天花了点时间看了看dubbo里有关SPI的源码,做一个记录。

首先我们来看一个dubbo中的具体使用场景,ServiceConfig中的protocol注入:

1
private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

可以看到protocol是通过ExtensionLoader去获取的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if(type == null) {
throw new IllegalArgumentException("Extension type == null");
} else if(!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
} else if(!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
} else {
ExtensionLoader<T> loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
if(loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
}

return loader;
}
}

可以看到getExtensionLoader这个方法就是去new出来一个ExtensionLoader然后放到缓存的map里。

1
2
3
4
private ExtensionLoader(Class<?> type) {
this.type = type;
this.objectFactory = type == ExtensionFactory.class?null:(ExtensionFactory)getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension();
}

ExtensionLoader去判断class是不是ExtensionFactory,如果不是则先去创建ExtensionFactory并且赋值给变量objectFactory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public T getAdaptiveExtension() {
Object instance = this.cachedAdaptiveInstance.get();
if(instance == null) {
if(this.createAdaptiveInstanceError != null) {
throw new IllegalStateException("fail to create adaptive instance: " + this.createAdaptiveInstanceError.toString(), this.createAdaptiveInstanceError);
}

Holder var2 = this.cachedAdaptiveInstance;
synchronized(this.cachedAdaptiveInstance) {
instance = this.cachedAdaptiveInstance.get();
if(instance == null) {
try {
instance = this.createAdaptiveExtension();
this.cachedAdaptiveInstance.set(instance);
} catch (Throwable var5) {
this.createAdaptiveInstanceError = var5;
throw new IllegalStateException("fail to create adaptive instance: " + var5.toString(), var5);
}
}
}
}

return instance;
}

getAdaptiveExtension中最重要的就是通过createAdaptiveExtension去创建一个adaptive的实例并缓存。

1
2
3
4
5
6
7
private T createAdaptiveExtension() {
try {
return this.injectExtension(this.getAdaptiveExtensionClass().newInstance());
} catch (Exception var2) {
throw new IllegalStateException("Can not create adaptive extenstion " + this.type + ", cause: " + var2.getMessage(), var2);
}
}

createAdaptiveExtension方法中,先通过getAdaptiveExtensionClass去获取class,然后创建实例并调用injectExtension方法进行注入。

1
2
3
4
private Class<?> getAdaptiveExtensionClass() {
this.getExtensionClasses();
return this.cachedAdaptiveClass != null?this.cachedAdaptiveClass:(this.cachedAdaptiveClass = this.createAdaptiveExtensionClass());
}

getAdaptiveExtensionClass这个方法很关键了,一共两行,我们一行一行来看,先看第一个行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = (Map)this.cachedClasses.get();
if(classes == null) {
Holder var2 = this.cachedClasses;
synchronized(this.cachedClasses) {
classes = (Map)this.cachedClasses.get();
if(classes == null) {
classes = this.loadExtensionClasses();
this.cachedClasses.set(classes);
}
}
}

return classes;
}

private Map<String, Class<?>> loadExtensionClasses() {
SPI defaultAnnotation = (SPI)this.type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + this.type.getName() + ": " + Arrays.toString(names));
}

if(names.length == 1) {
this.cachedDefaultName = names[0];
}
}
}

Map<String, Class<?>> extensionClasses = new HashMap();
this.loadFile(extensionClasses, "META-INF/dubbo/internal/");
this.loadFile(extensionClasses, "META-INF/dubbo/");
this.loadFile(extensionClasses, "META-INF/services/");
return extensionClasses;
}

可以看到,getExtensionClasses是怎么去获得一个接口(对应文中的Protocol)的实现类的呢?通过阅读loadExtensionClasses方法我们可以知道,是去加载三个文件: META-INF/dubbo/internal/,META-INF/dubbo/和META-INF/services/。

我们可以去观察一下META-INF/dubbo/internal/这个文件夹,下面有很多的文件,都是以类名问文件名的,然后我们来看loadFile方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + this.type.getName();

try {
ClassLoader classLoader = findClassLoader();
Enumeration urls;
if(classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}

if(urls != null) {
label269:
while(urls.hasMoreElements()) {
java.net.URL url = (java.net.URL)urls.nextElement();

try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));

try {
String line = null;

while(true) {
do {
if((line = reader.readLine()) == null) {
continue label269;
}

int ci = line.indexOf(35);
if(ci >= 0) {
line = line.substring(0, ci);
}

line = line.trim();
} while(line.length() <= 0);

try {
String name = null;
int i = line.indexOf(61);
if(i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}

if(line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader);
if(!this.type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " + this.type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface.");
}

if(clazz.isAnnotationPresent(Adaptive.class)) {
if(this.cachedAdaptiveClass == null) {
this.cachedAdaptiveClass = clazz;
} else if(!this.cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: " + this.cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName());
}
} else {
try {
clazz.getConstructor(new Class[]{this.type});
Set<Class<?>> wrappers = this.cachedWrapperClasses;
if(wrappers == null) {
this.cachedWrapperClasses = new ConcurrentHashSet();
wrappers = this.cachedWrapperClasses;
}

wrappers.add(clazz);
} catch (NoSuchMethodException var27) {
clazz.getConstructor(new Class[0]);
if(name == null || name.length() == 0) {
name = this.findAnnotationName(clazz);
if(name == null || name.length() == 0) {
if(clazz.getSimpleName().length() <= this.type.getSimpleName().length() || !clazz.getSimpleName().endsWith(this.type.getSimpleName())) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}

name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - this.type.getSimpleName().length()).toLowerCase();
}
}

String[] names = NAME_SEPARATOR.split(name);
if(names != null && names.length > 0) {
Activate activate = (Activate)clazz.getAnnotation(Activate.class);
if(activate != null) {
this.cachedActivates.put(names[0], activate);
}

String[] arr$ = names;
int len$ = names.length;

for(int i$ = 0; i$ < len$; ++i$) {
String n = arr$[i$];
if(!this.cachedNames.containsKey(clazz)) {
this.cachedNames.put(clazz, n);
}

Class<?> c = (Class)extensionClasses.get(n);
if(c == null) {
extensionClasses.put(n, clazz);
} else if(c != clazz) {
throw new IllegalStateException("Duplicate extension " + this.type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable var28) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + this.type + ", class line: " + line + ") in " + url + ", cause: " + var28.getMessage(), var28);
this.exceptions.put(line, e);
}
}
} finally {
reader.close();
}
} catch (Throwable var30) {
logger.error("Exception when load extension class(interface: " + this.type + ", class file: " + url + ") in " + url, var30);
}
}
}
} catch (Throwable var31) {
logger.error("Exception when load extension class(interface: " + this.type + ", description file: " + fileName + ").", var31);
}

}

方法很长,但是我们先看第一行:

1
String fileName = dir + this.type.getName();

通过文件夹名字+类名的方式去拼接一下文件名,文中的例子是Protocol,所以这个文件名就是META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol。我们可以在META-INF/dubbo/internal/文件夹下找到这个文件:

1
2
3
4
5
6
7
8
9
10
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
pb=com.alibaba.dubbo.rpc.protocol.pb.PbProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

可以看到这里和我们文章开头提到的ServiceLoader的加载方式很像了。

我们接着看loadFile方法的这一行:

1
2
3
4
5
6
7
if(clazz.isAnnotationPresent(Adaptive.class)) {
if(this.cachedAdaptiveClass == null) {
this.cachedAdaptiveClass = clazz;
} else if(!this.cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: " + this.cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName());
}
}

如果一个类被打上了Adaptive的注解,则将其赋值给cachedAdaptiveClass缓存。

好了,loadFile走完之后,我们回到上面关键的getAdaptiveExtensionClass方法的第二行:

1
return this.cachedAdaptiveClass != null?this.cachedAdaptiveClass:(this.cachedAdaptiveClass = this.createAdaptiveExtensionClass());

这里就出现了cachedAdaptiveClass这个缓存,如果cachedAdaptiveClass不为空就返回,如果为空,则调用createAdaptiveExtensionClass方法。

1
2
3
4
5
6
private Class<?> createAdaptiveExtensionClass() {
String code = this.createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
Compiler compiler = (Compiler)getExtensionLoader(Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

createAdaptiveExtensionClass方法中,通过createAdaptiveExtensionClassCode去生成一个文件,然后调用Compiler去动态创建这个class,这里的Compiler默认是用JavassistCompiler,我们来看生成的这个类具体是什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}

这里我直接debug住然后把String code给拷贝了出来,没有格式化过。可以看到基本就是做了一层带来,export方法和refer方法最终都是获取到对应实现的name,然后调用ExtensionLoader的getExtension去获取具体的实现,这个Protocol$Adpative基本就是一个工厂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public T getExtension(String name) {
if(name != null && name.length() != 0) {
if("true".equals(name)) {
return this.getDefaultExtension();
} else {
Holder<Object> holder = (Holder)this.cachedInstances.get(name);
if(holder == null) {
this.cachedInstances.putIfAbsent(name, new Holder());
holder = (Holder)this.cachedInstances.get(name);
}

Object instance = holder.get();
if(instance == null) {
synchronized(holder) {
instance = holder.get();
if(instance == null) {
instance = this.createExtension(name);
holder.set(instance);
}
}
}

return instance;
}
} else {
throw new IllegalArgumentException("Extension name == null");
}
}

getExtension没啥好说的就是去缓存里拿实例,没有的话就新建并放入缓存。

至此,dubbo的SPI机制就基本快速浏览完了,有以下几点需要注意:

  1. dubbo的SPI机制可以说是在java传统的SPI上做了一定的扩展,内部做了很多的缓存机制,并且通过holder,concurretHashMap等方式做了线程安全。
  2. dubbo的SPI中存在一个adaptive注解,用于标注一个工厂类去创建各种具体实现,如果一个接口没有adaptive注解的实现,则通过Javassist的方式去动态创建一个xxx$Adaptive.class。
  3. dubbo会从三个文件中加载所有的类:META-INF/dubbo/internal/,META-INF/dubbo/和META-INF/services/