dubbo源码阅读笔记

前言不得不说一下自己对dubbo源码的感想:著名的框架,其内在架构设计都非常漂亮并具有工程性

从调试的角度去看待源码

  • 代理工厂

无论是服务消费者还是服务提供者,都得对调用服务的interface进行proxy,代理工厂具有两个,分别为javassist和jdk proxy

1
2
3
org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory

org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory

其中它们都具有getProxy、getInvoker方法实现

getProxy:主要用于服务消费者对interface进行代理,生成实例提供程序调用。而InvokerInvocationHandler是实际调用对象,其对上层程序代码隐藏了远程调用的细节

1
2
3
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}

getInvoker:主要用于服务提供者对实际被调用实例进行代理包装,以实现实际对象方法被调用后,进行结果、异常的CompletableFuture的封装

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}

InvokerInvocationHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
if ("$destroy".equals(methodName) && parameterTypes.length == 0) {
invoker.destroy();
}

RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
rpcInvocation.setTargetServiceUniqueName(invoker.getUrl().getServiceKey());

return invoker.invoke(rpcInvocation).recreate();
}

AbstractProxyInvoker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
  • 服务提供者启动时,先创建相应选择的协议对象(Protocol),然后通过代理工厂创建Invoker对象,接着使用协议对象对Invoker进行服务注册至注册中心。
  • 服务消费者启动时,先创建相应选择的协议对象(Protocol),然后通过协议对象引用到服务提供者,得到Invoker对象,接着通过代理工厂创建proxy对象。
  • 服务消费者spring配置了dubbo:reference,在spring容器创建bean时,先创建FactoryBean(ReferenceBean extends ReferenceConfig),在getObject时调用ReferenceConfig的get方法,在get方法中init初始化,接着调用createProxy并传入相应参数创建代理对象,紧接着使用SPI方式得到代理工厂适配器、注册协议对象,默认获得javassist代理工厂创建代理,然后使用协议对象(ProtocolFilterWrapper-ProtocolListenerWrapper-QosProtocolWrapper-RegisterProtocol)调用doRefer,并且根据protocol得到spring配置的dubbo:registry使用的是zookeeper协议,通过SPI得到ZookeeperRegister的注册中心去注册订阅,接着生成RegisterDirectory封装服务相关数据,然后通过SPI加载FailoverCluster,生成Invoker对象,得到服务引用(MockClusterInvoker包装着FailoverCluster),最后通过代理工厂创建代理实例,个人认为Cluster是对服务提供者集群的高可用设计,在某个服务不可用时可以故障转移。
  • 服务消费者调用代理对象时,先判断是否一些Object方法,若是Object方法,则直接本地调用,否则创建RpcInvocation封装调用目标方法、接口、参数等信息,然后调用Invoker的invoke方法,一般情况下,invoker对象是一系列invoker的包装(默认MockClusterInvoker-FailoverClusterInvoker-InvokerWrapper-ListenerInvokerWrapper-ProtocolFilterWrapper-DubboInvoker)。
  • 服务集群,当使用服务集群时,在invoker执行远程调用时,会进行负载均衡选择集群中的某个invoker进行远程调用
  • FailoverClusterInvoker,故障转移集群invoker中会根据配置中的失败重试次数进行失败重试

从简单阅读的角度去看待源码

dubbo-container容器

  • dubbo-container:dubbo容器实现,它的容器并不是沙盒形式的容器。从dubbo-container-api可以看出,其具有main启动方法,根据系统变量dubbo.container或java启动参数读取配置指定需要启动的容器(英文逗号分隔),并使用系统变量dubbo.shutdown.hook去配置是否对启动的容器进行jvm shutdown的hook,以实现shutdown时对容器进行关停
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Main {

public static final String CONTAINER_KEY = "dubbo.container";

public static final String SHUTDOWN_HOOK_KEY = "dubbo.shutdown.hook";

private static final Logger logger = LoggerFactory.getLogger(Main.class);

private static final ExtensionLoader<Container> loader = ExtensionLoader.getExtensionLoader(Container.class);

private static final ReentrantLock LOCK = new ReentrantLock();

private static final Condition STOP = LOCK.newCondition();

public static void main(String[] args) {
try {
if (ArrayUtils.isEmpty(args)) {
String config = ConfigUtils.getProperty(CONTAINER_KEY, loader.getDefaultExtensionName());
args = COMMA_SPLIT_PATTERN.split(config);
}

final List<Container> containers = new ArrayList<Container>();
for (int i = 0; i < args.length; i++) {
containers.add(loader.getExtension(args[i]));
}
logger.info("Use container type(" + Arrays.toString(args) + ") to run dubbo serivce.");

if ("true".equals(System.getProperty(SHUTDOWN_HOOK_KEY))) {
Runtime.getRuntime().addShutdownHook(new Thread("dubbo-container-shutdown-hook") {
@Override
public void run() {
for (Container container : containers) {
try {
container.stop();
logger.info("Dubbo " + container.getClass().getSimpleName() + " stopped!");
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
LOCK.lock();
STOP.signal();
} finally {
LOCK.unlock();
}
}
}
});
}

for (Container container : containers) {
container.start();
logger.info("Dubbo " + container.getClass().getSimpleName() + " started!");
}
System.out.println(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss]").format(new Date()) + " Dubbo service server started!");
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
System.exit(1);
}
try {
LOCK.lock();
STOP.await();
} catch (InterruptedException e) {
logger.warn("Dubbo service server stopped, interrupted by other thread!", e);
} finally {
LOCK.unlock();
}
}

}
  • dubbo-container-spring:spring容器实现包,系统变量dubbo.spring.config读取spring配置文件位置或使用默认位置classpath:META-INF/spring/.xml读取,主要是读取spring文件并启动spring容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SpringContainer implements Container {

public static final String SPRING_CONFIG = "dubbo.spring.config";
public static final String DEFAULT_SPRING_CONFIG = "classpath*:META-INF/spring/*.xml";
private static final Logger logger = LoggerFactory.getLogger(SpringContainer.class);
static ClassPathXmlApplicationContext context;

public static ClassPathXmlApplicationContext getContext() {
return context;
}

@Override
public void start() {
String configPath = ConfigUtils.getProperty(SPRING_CONFIG);
if (StringUtils.isEmpty(configPath)) {
configPath = DEFAULT_SPRING_CONFIG;
}
context = new ClassPathXmlApplicationContext(configPath.split("[,\\s]+"), false);
context.refresh();
context.start();
}

@Override
public void stop() {
try {
if (context != null) {
context.stop();
context.close();
context = null;
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}

}
  • dubbo-container-log4j:log4j容器,主要是使用dubbo容器时,可以独立加入使用log4j打印日志,系统变量dubbo.log4j.file配置日志输出文件,系统变量dubbo.log4j.level配置日志输出level,系统变量dubbo.log4j.subdirectory设置日志输出目录其会修改当前所以logger appender的输出目录为此目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class Log4jContainer implements Container {

public static final String LOG4J_FILE = "dubbo.log4j.file";

public static final String LOG4J_LEVEL = "dubbo.log4j.level";

public static final String LOG4J_SUBDIRECTORY = "dubbo.log4j.subdirectory";

public static final String DEFAULT_LOG4J_LEVEL = "ERROR";

@Override
@SuppressWarnings("unchecked")
public void start() {
String file = ConfigurationUtils.getProperty(LOG4J_FILE);
if (file != null && file.length() > 0) {
String level = ConfigurationUtils.getProperty(LOG4J_LEVEL);
if (StringUtils.isEmpty(level)) {
level = DEFAULT_LOG4J_LEVEL;
}
Properties properties = new Properties();
properties.setProperty("log4j.rootLogger", level + ",application");
properties.setProperty("log4j.appender.application", "org.apache.log4j.DailyRollingFileAppender");
properties.setProperty("log4j.appender.application.File", file);
properties.setProperty("log4j.appender.application.Append", "true");
properties.setProperty("log4j.appender.application.DatePattern", "'.'yyyy-MM-dd");
properties.setProperty("log4j.appender.application.layout", "org.apache.log4j.PatternLayout");
properties.setProperty("log4j.appender.application.layout.ConversionPattern", "%d [%t] %-5p %C{6} (%F:%L) - %m%n");
PropertyConfigurator.configure(properties);
}
String subdirectory = ConfigurationUtils.getProperty(LOG4J_SUBDIRECTORY);
if (subdirectory != null && subdirectory.length() > 0) {
Enumeration<org.apache.log4j.Logger> ls = LogManager.getCurrentLoggers();
while (ls.hasMoreElements()) {
org.apache.log4j.Logger l = ls.nextElement();
if (l != null) {
Enumeration<Appender> as = l.getAllAppenders();
while (as.hasMoreElements()) {
Appender a = as.nextElement();
if (a instanceof FileAppender) {
FileAppender fa = (FileAppender) a;
String f = fa.getFile();
if (f != null && f.length() > 0) {
int i = f.replace('\\', '/').lastIndexOf('/');
String path;
if (i == -1) {
path = subdirectory;
} else {
path = f.substring(0, i);
if (!path.endsWith(subdirectory)) {
path = path + "/" + subdirectory;
}
f = f.substring(i + 1);
}
fa.setFile(path + "/" + f);
fa.activateOptions();
}
}
}
}
}
}
}

@Override
public void stop() {
}

}
  • dubbo-container-logback:logback容器,与log4j容器差不多,系统变量dubbo.logback.file、dubbo.logback.level都分别是对输出文件、输出level的配置,而系统变量dubbo.logback.maxhistory配置log文件最大保存历史(文件以”.%d{yyyy-MM-dd}”结尾)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class LogbackContainer implements Container {

public static final String LOGBACK_FILE = "dubbo.logback.file";

public static final String LOGBACK_LEVEL = "dubbo.logback.level";

public static final String LOGBACK_MAX_HISTORY = "dubbo.logback.maxhistory";

public static final String DEFAULT_LOGBACK_LEVEL = "ERROR";

@Override
public void start() {
String file = ConfigUtils.getProperty(LOGBACK_FILE);
if (file != null && file.length() > 0) {
String level = ConfigUtils.getProperty(LOGBACK_LEVEL);
if (StringUtils.isEmpty(level)) {
level = DEFAULT_LOGBACK_LEVEL;
}
// maxHistory=0 Infinite history
int maxHistory = StringUtils.parseInteger(ConfigUtils.getProperty(LOGBACK_MAX_HISTORY));

doInitializer(file, level, maxHistory);
}
}

@Override
public void stop() {
}

/**
* Initializer logback
*
* @param file
* @param level
* @param maxHistory
*/
private void doInitializer(String file, String level, int maxHistory) {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Logger rootLogger = loggerContext.getLogger(Logger.ROOT_LOGGER_NAME);
rootLogger.detachAndStopAllAppenders();

// appender
RollingFileAppender<ILoggingEvent> fileAppender = new RollingFileAppender<ILoggingEvent>();
fileAppender.setContext(loggerContext);
fileAppender.setName("application");
fileAppender.setFile(file);
fileAppender.setAppend(true);

// policy
TimeBasedRollingPolicy<ILoggingEvent> policy = new TimeBasedRollingPolicy<ILoggingEvent>();
policy.setContext(loggerContext);
policy.setMaxHistory(maxHistory);
policy.setFileNamePattern(file + ".%d{yyyy-MM-dd}");
policy.setParent(fileAppender);
policy.start();
fileAppender.setRollingPolicy(policy);

// encoder
PatternLayoutEncoder encoder = new PatternLayoutEncoder();
encoder.setContext(loggerContext);
encoder.setPattern("%date [%thread] %-5level %logger (%file:%line\\) - %msg%n");
encoder.start();
fileAppender.setEncoder(encoder);

fileAppender.start();

rootLogger.addAppender(fileAppender);
rootLogger.setLevel(Level.toLevel(level));
rootLogger.setAdditive(false);
}

}

dubbo-registry注册中心

  • registry注册中心:在dubbo中主要负责服务暴露、引用,服务暴露主要是服务提供者启动时,告诉注册中心其要对外暴露一个服务,服务消费者可通过注册中心获取到所有服务提供者的信息(并实时监听数据变动),然后通过得到的信息完成远程调用。
  • RegistryProtocol:在服务提供端或服务消费端启动时,都会通过其进行暴露或引用服务。若使用了registry=zookeeper&protocol=dubbo,则在启动时,提供者会先使用ZookeeperRegistry把当前服务的信息注册至zookeeper,然后使用DubboProtocol对其服务进行暴露处理(若使用remoting实现为netty,则会建立端口监听)得到ExchangeServer,而消费者则先使用ZookeeperRegistry通过zookeeper订阅并获取提供者服务信息,然后使用DubboProtocol对其服务进行引用得到一个或多个ExchangeClient,接着用cluster进行包装,对外层隐藏细节,从而实现集群、故障转移等特性

dubbo-remoting远程通讯包

  • remoting远程通讯包:其封装了buffer、exchange、telnet、transport层,对上层隐藏细节,默认使用netty实现远程通讯
  • Exchanger:交换层,隐藏下一层传输层的细节,对下层包装Request、Response,下层仅需关心是请求还是响应。bind方法为服务提供者在Protocol中对服务的bind,得到ExchangeServer。connect方法为服务消费者在Protocol中对服务的connect,得到ExchangeClient。而无论是服务的提供者还是消费者,在bind、connect时,都需要提供ExchangeHandler,其被用于解码请求,对于提供者,其为请求接收处理,其在Protocol层,不关心remoting实现细节,只管接收到Request对象,然后invoke服务实现并返回Result。
  • ExchangeServer:服务提供者对服务暴露时,使用Protocol对象进行export,export中对其进行Exchangers.bind得到ExchangeServer,其重点为第二个参数ExchangeHandler,其被多个handler进行包装,进行了多层的处理,其为最外层,进行实际实例方法的调用invoke,然后返回Result
  • ExchangeClient:服务消费者对服务引用时,使用Protocol对象进行refer,refer中中对其进行Exchangers.connect得到ExchangeClient,然后把其封装在Invoker中,接着Invoker被proxy,当消费者执行Proxy对象方法时,其会通过InvokeInvocationHandler对Invoker进行invoke,然后Invoker调用ExchangeClient进行request,其重点为第二个参数ExchangeHandler,其被多个handler进行包装,进行了多层的处理,其为最外层,对响应进行处理DefaultFuture.received
  • DefaultFuture:在消费者Invoker对ExchangeClient进行request时,都会创建DefaultFuture并放置在其内部static集合FUTURES中以表示正在进行的请求,然后立即返回,当有响应Response返回时调用received,会将其在FUTURES集合中删除,然后把相应结果放到CompletableFuture
  • Transporters:交换层Exchangers的bind、connect最终都交由传输层Transporters处理
  • HeaderExchanger:无论是bind还是connect都传入了多层handler,DecodeHandler-HeaderExchangeHandler-ExchangeHandler,对于服务双方来讲,都是Handler都是对接收消息的处理。DecodeHandler对接收消息进行判断,若是请求(即提供方),则对Data进行decode,若是响应(即消费者),则对Result进行decode。HeaderExchangeHandler对请求进行实际方法调用,此时区分request.isEvent、request.isTwoWay等,最终调用最外层的ExchangeHandler。
  • HeaderExchangeClient:默认ExchangeClient的实现,其由HeaderExchanger创建,构造方法中创建HeaderExchangeChannel,而其Client由Transporters(SPI可选,NettyTransporter)创建,当Invoker进行invoke时,通过其进行request并传入RpcInvocation,然后由HeaderExchangeChannel对其包装成Request对象,接着通过client进行send
  • HeaderExchangeServer:默认ExchangeServer的实现,请求经过DecodeHandler-HeaderExchangeHandler-ExchangeHandler多层处理后调用实际实例方法,然后调用其bind时由Transporters(SPI可选,NettyTransporter)创建的server的send方法进行发送响应结果。

dubbo-rpc

  • rpc上进行功能扩展的module
  • filter、interceptors、listener:功能扩展