这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了。 SOFARPC是蚂
这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了。
SOFARPC是蚂蚁金服开源的一个RPC框架,相比DUBBO它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码。
那么为什么要去重写原生的SPI呢?官方给出了如下解释:
- 按需加载
- 可以有别名
- 可以有优先级进行排序和覆盖
- 可以控制是否单例
- 可以在某些场景下使用编码
- 可以指定扩展配置位置
- 可以排斥其他扩展点
整个流程如下:
我们以ConsumerBootstrap为例:
先要有一个抽象类:
@Extensible(singleton = false) public abstract class ConsumerBootstrap<T> { .... }
指定扩展实现类:
@Extension("sofa") public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T> { ... }
扩展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap
sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap
当这些准备完成后,直接调用即可。
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
接下来我们看看ExtensionLoaderFactory的源码
/** * All extension loader {Class : ExtensionLoader} * 这个map里面装的是所有ExtensionLoader */ private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>(); public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) { ExtensionLoader<T> loader = LOADER_MAP.get(clazz); if (loader == null) { //get不到则加上锁 synchronized (ExtensionLoaderFactory.class) { //防止其他线程操作再get一次 loader = LOADER_MAP.get(clazz); if (loader == null) { loader = new ExtensionLoader<T>(clazz, listener); LOADER_MAP.put(clazz, loader); } } } return loader; }
然后我们看一下ExtensionLoader这个类的构造器
protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) { //如果正在执行关闭,则将属性置空后直接返回 if (RpcRunningState.isShuttingDown()) { this.interfaceClass = null; this.interfaceName = null; this.listener = null; this.factory = null; this.extensible = null; this.all = null; return; } // 接口为空,既不是接口,也不是抽象类 if (interfaceClass == null || !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) { throw new IllegalArgumentException("Extensible class must be interface or abstract class!"); } //当前加载的接口类名 this.interfaceClass = interfaceClass; //接口名字 this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass); this.listener = listener; //接口上必须要有Extensible注解 Extensible extensible = interfaceClass.getAnnotation(Extensible.class); if (extensible == null) { throw new IllegalArgumentException( "Error when load extensible interface " + interfaceName + ", must add annotation @Extensible."); } else { this.extensible = extensible; } // 如果是单例,那么factory不为空 this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null; //这个属性里面是这个接口的所有实现类 this.all = new ConcurrentHashMap<String, ExtensionClass<T>>(); if (autoLoad) { //获取到扩展点加载的路径 List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH); for (String path : paths) { //根据路径加载文件 loadFromFile(path); } } }
拿到所有的扩展点加载的路径后进入到loadFromFile中进行文件的加载
protected synchronized void loadFromFile(String path) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path); } // 默认如果不指定文件名字,就是接口名 String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim(); String fullFileName = path + file; try { ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass()); loadFromClassLoader(classLoader, fullFileName); } catch (Throwable t) { if (LOGGER.isErrorEnabled()) { LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName, t); } } } protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable { Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName) : ClassLoader.getSystemResources(fullFileName); // 可能存在多个文件。 if (urls != null) { while (urls.hasMoreElements()) { // 读取一个文件 URL url = urls.nextElement(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}", interfaceName, classLoader, url); } BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8")); String line; while ((line = reader.readLine()) != null) { readLine(url, line); } } catch (Throwable t) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Failed to load extension of extensible " + interfaceName + " from classloader: " + classLoader + " and file:" + url, t); } } finally { if (reader != null) { reader.close(); } } } } }
接下来进入到readLine,这个方法主要是读取prop文件里面的每一行记录,并加载该实现类的类文件校验完后将文件添加到all属性中
protected void readLine(URL url, String line) { //读取文件里面的一行记录,并将这行记录用=号分割 String[] aliasAndClassName = parseAliasAndClassName(line); if (aliasAndClassName == null || aliasAndClassName.length != 2) { return; } //别名 String alias = aliasAndClassName[0]; //包名 String className = aliasAndClassName[1]; // 读取配置的实现类 Class tmp; try { tmp = ClassUtils.forName(className, false); } catch (Throwable e) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}", className, interfaceName, ExceptionUtils.toShortString(e, 2)); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e); } return; } if (!interfaceClass.isAssignableFrom(tmp)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", " + className + " is not subtype of interface."); } Class<? extends T> implClass = (Class<? extends T>) tmp; // 检查是否有可扩展标识 Extension extension = implClass.getAnnotation(Extension.class); if (extension == null) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", " + className + " must add annotation @Extension."); } else { String aliasInCode = extension.value(); if (StringUtils.isBlank(aliasInCode)) { // 扩展实现类未配置@Extension 标签 throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass + " from file:" + url + ", " + className + "'s alias of @Extension is blank"); } if (alias == null) { // spi文件里没配置,用代码里的 alias = aliasInCode; } else { // spi文件里配置的和代码里的不一致 if (!aliasInCode.equals(alias)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", aliases of " + className + " are " + "not equal between " + aliasInCode + "(code) and " + alias + "(file)."); } } // 接口需要编号,实现类没设置 if (extensible.coded() && extension.code() < 0) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", code of @Extension must >=0 at " + className + "."); } } // 不可以是default和* if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", alias of @Extension must not \"default\" and \"*\" at " + className + "."); } // 检查是否有存在同名的 ExtensionClass old = all.get(alias); ExtensionClass<T> extensionClass = null; if (old != null) { // 如果当前扩展可以覆盖其它同名扩展 if (extension.override()) { // 如果优先级还没有旧的高,则忽略 if (extension.order() < old.getOrder()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure, " + "cause by: order of old extension is higher", interfaceName, alias, old.getClazz(), implClass); } } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}", interfaceName, alias, old.getClazz(), implClass); } // 如果当前扩展可以覆盖其它同名扩展 extensionClass = buildClass(extension, implClass, alias); } } // 如果旧扩展是可覆盖的 else { if (old.isOverride() && old.getOrder() >= extension.order()) { // 如果已加载覆盖扩展,再加载到原始扩展 if (LOGGER.isInfoEnabled()) { LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}", interfaceName, alias, old.getClazz(), implClass); } } else { // 如果不能被覆盖,抛出已存在异常 throw new IllegalStateException( "Error when load extension of extensible " + interfaceClass + " from file:" + url + ", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass); } } } else { extensionClass = buildClass(extension, implClass, alias); } if (extensionClass != null) { // 检查是否有互斥的扩展点 for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) { ExtensionClass existed = entry.getValue(); if (extensionClass.getOrder() >= existed.getOrder()) { // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展 String[] rejection = extensionClass.getRejection(); if (CommonUtils.isNotEmpty(rejection)) { for (String rej : rejection) { existed = all.get(rej); if (existed == null || extensionClass.getOrder() < existed.getOrder()) { continue; } ExtensionClass removed = all.remove(rej); if (removed != null) { if (LOGGER.isInfoEnabled()) { LOGGER.info( "Extension of extensible {} with alias {}: {} has been reject by new {}", interfaceName, removed.getAlias(), removed.getClazz(), implClass); } } } } } else { String[] rejection = existed.getRejection(); if (CommonUtils.isNotEmpty(rejection)) { for (String rej : rejection) { if (rej.equals(extensionClass.getAlias())) { // 被其它扩展排掉 if (LOGGER.isInfoEnabled()) { LOGGER.info( "Extension of extensible {} with alias {}: {} has been reject by old {}", interfaceName, alias, implClass, existed.getClazz()); return; } } } } } } loadSuccess(alias, extensionClass); } }
加载完文件后我们再回到
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
进入到getExtension方法中
public ExtensionClass<T> getExtensionClass(String alias) { return all == null ? null : all.get(alias); } public T getExtension(String alias) { //从all属性中拿到加载的class ExtensionClass<T> extensionClass = getExtensionClass(alias); if (extensionClass == null) { throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!"); } else { //在加载class的时候,校验了是否是单例,如果是单例,那么factory不为null if (extensible.singleton() && factory != null) { T t = factory.get(alias); if (t == null) { synchronized (this) { t = factory.get(alias); if (t == null) { //实例化 t = extensionClass.getExtInstance(); //放入到factory,单例的class下次直接拿就好了,不需要重新创建 factory.put(alias, t); } } } return t; } else { //实例化 return extensionClass.getExtInstance(); } } }
我们进入到ExtensionClass看看getExtInstance方法
/** * 服务端实例对象(只在是单例的时候保留) * 用volatile修饰,保证了可见性 */ private volatile transient T instance; /** * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象 * * @param argTypes 构造函数参数类型 * @param args 构造函数参数值 * @return 扩展点对象实例 ext instance */ public T getExtInstance(Class[] argTypes, Object[] args) { if (clazz != null) { try { if (singleton) { // 如果是单例 if (instance == null) { synchronized (this) { if (instance == null) { //通过反射创建实例 instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args); } } } return instance; // 保留单例 } else { //通过反射创建实例 return ClassUtils.newInstanceWithArgs(clazz, argTypes, args); } } catch (Exception e) { throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e); } } throw new SofaRpcRuntimeException("Class of ExtensionClass is null"); }
看完了SOFARPC的扩展类实现后感觉代码写的非常的整洁,逻辑非常的清晰,里面有很多可以学习的地方,比如线程安全用到了双重检查锁和volatile保证可见性。