06--【重点】Dubbo源码【Dubbo的SPI机制】

tech2023-02-26  106

Java的SPI

java提供的SPI(service provider interface)在java运行时动态加载配置接口的实现类。

SPI的标准

1、在classpath目录下创建MATE-INF/service文件夹

2、创建properties文件,其中该文件的编码为UTF-8,文件名为要实现接口的全路径。文件内容为实现接口的类的全路径,实现类可为一个或多个。

3、调用java.util.ServiceLoader的加载机制

ServiceLoader<IService> serviceLoader  = ServiceLoader.load(IService.class);

for(IService service : serviceLoader) {

  System.out.println(service.getScheme()+"="+service.sayHello());

}

在写Dubbo的SPI时,先写下我个人对于系统软件的设计思路

如果让你写一个可扩展的功能,比如写一个网络协议,根据不同的参数去实现不同的网络协议,你要怎么写?

第一步:

先定义一个接口,用工厂模型,写很多的不同协议的类实现接口。调用的时候根据不同的参数,生产不同的协议类。

实现了可扩展功能,如果出现新的网络协议直接在不修改源码的基础上,在写一个新的类便可以扩展。【开闭原则】

第二步:

在第一步的完善。

如果有不同的工厂我要去怎么实现【动态工厂】

第三步:

在第二步的完成

如果在抽闲,其实说白了就是实现接口的方法,那能不能将这个接口的方法或接口作为一个扩展点去不断的完善呢?

者就是SPI规范。

Dubbo的SPI

@SPI注解在接口上表示类是扩展点

1、静态扩展

编写一个扩展接口的实现类,在resources文件夹下配置META-INF/dubbo/接口的全路径文件,在文件里面编写key=实现类的全路径

简单理解就是加载SPI实现扩展的配置文件,并实例化后存入EXTENSION_INSTANCES的Map缓存

关键代码如下:

第一步加载

ClassLoader classLoader = findClassLoader();//得到ClassLoader

Enumeration<java.net.URL> urls = classLoader.getResources(fileName);

while (urls.hasMoreElements()) {          java.net.URL resourceURL = urls.nextElement();     loadResource(extensionClasses, classLoader, resourceURL);      }

loadResource(extensionClasses, classLoader, resourceURL){

    BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))     String line;         while ((line = reader.readLine()) != null){         int i = line.indexOf('=');         name = line.substring(0, i).trim();                 line = line.substring(i + 1).trim();                 loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);     } }

loadClass(){//封装     

}

第二步放入缓存

Class<?> clazz = getExtensionClasses().get(name); T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); }

依赖注入:

如果当前的扩展点依赖其他的扩展点,则需要依赖注入

如:

public Xxxx{

    private OtherInterface yyy;

    public setOtherInterface(OtherInterface yyy){

       this.yyy=yyy

    }

}

ZookeeperDynamicConfigurationFactory类依赖注入扩展点案例

2、自适应扩展点

注解标识:@Adaptive

分两种:一种是@Adaptive在类上,二种@Adaptive在方法上

第一种@Adaptive在类上,以AdaptiveCompiler类为案例

@SPI("javassist") public interface Compiler { /** * Compile java source code. * * @param code Java source code * @param classLoader classloader * @return Compiled class */ Class<?> compile(String code, ClassLoader classLoader); }

@Adaptive public class AdaptiveCompiler implements Compiler {

    private static volatile String DEFAULT_COMPILER;

    public static void setDefaultCompiler(String compiler) {         DEFAULT_COMPILER = compiler;     }

    @Override     public Class<?> compile(String code, ClassLoader classLoader) {         Compiler compiler;         ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);

       //通过DEFAULT_COMPILER传入的值去静态加载,其中DEFAULT_COMPILER为@SPI(值)-->>ExtensionLoader#cachedDefaultName         String name = DEFAULT_COMPILER; // copy reference         if (name != null && name.length() > 0) {             compiler = loader.getExtension(name);         } else {             compiler = loader.getDefaultExtension();         }         return compiler.compile(code, classLoader);     }

}

在ExtensionLoader类的代码中体现

private void cacheDefaultExtensionName() { final SPI defaultAnnotation = type.getAnnotation(SPI.class); if (defaultAnnotation != null) { String value = defaultAnnotation.value(); if ((value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if (names.length > 1) { throw new IllegalStateException("More than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if (names.length == 1) { cachedDefaultName = names[0]; } } } }

第二种@Adaptive在方法上【重点】

为什么要加载在方法上?

根据用户的配置去灵活的处理

以Protocol类为案例

按照常理要配置一个扩展点的实例类,去动态的选择要加载的实现类,但是发现没有这样的类,所以动态的生成一个代理类,如

Protocol$Adaptive的类

在ExtensionLoader类中调用getAdaptiveExtension()方法里面的createAdaptiveExtension方法去实现

private T createAdaptiveExtension() { try { //依赖注入 //获得一个自适应扩展点的实例 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } } private Class<?> getAdaptiveExtensionClass() { getExtensionClasses(); //表示在类上@Adaptive if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } //表示在方法上@Adaptive return cachedAdaptiveClass = createAdaptiveExtensionClass(); } private Class<?> createAdaptiveExtensionClass() { //拼接一个类的字符串,也就是代理类采用javassite方式 String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); }

如:WebServiceProtocol类

3、激活扩展点

根据一些参数直接找到实现类如:

@SPI public interface Filter { /** * Does not need to override/implement this method. */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; /** * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener} * * @param appResponse * @param invoker * @param invocation * @return */ @Deprecated default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { return appResponse; } interface Listener { void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); void onError(Throwable t, Invoker<?> invoker, Invocation invocation); } }

@Activate(group = CommonConstants.PROVIDER, order = -30000) public class ClassLoaderFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { ClassLoader ocl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()); try { return invoker.invoke(invocation); } finally { Thread.currentThread().setContextClassLoader(ocl); } } }

 

实现:

URL url=new URL("","",0); url=url.addParameter("cache","cache"); List<Filter> list=ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(url,"cache"); System.out.println(list.size());

ExtensionLoader类的实现如下

public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> exts = new ArrayList<>(); List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values); if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { getExtensionClasses(); for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) { String name = entry.getKey(); Object activate = entry.getValue(); String[] activateGroup, activateValue; if (activate instanceof Activate) { activateGroup = ((Activate) activate).group(); activateValue = ((Activate) activate).value(); } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) { activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group(); activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value(); } else { continue; } if (isMatchGroup(group, activateGroup)) { T ext = getExtension(name); if (!names.contains(name) && !names.contains(REMOVE_VALUE_PREFIX + name) && isActive(activateValue, url)) { exts.add(ext); } } } exts.sort(ActivateComparator.COMPARATOR); } List<T> usrs = new ArrayList<>(); for (int i = 0; i < names.size(); i++) { String name = names.get(i); if (!name.startsWith(REMOVE_VALUE_PREFIX) && !names.contains(REMOVE_VALUE_PREFIX + name)) { if (DEFAULT_KEY.equals(name)) { if (!usrs.isEmpty()) { exts.addAll(0, usrs); usrs.clear(); } } else { T ext = getExtension(name); usrs.add(ext); } } } if (!usrs.isEmpty()) { exts.addAll(usrs); } return exts; }

可以借鉴这遍博文 :https://blog.csdn.net/wangchengming1/article/details/105978182 

最新回复(0)