手记

微博motan客户端调用服务

通过阅读源码,可以了解开源框架的一些内部实现,也方便后续真正使用这些开源框架时候出现问题可以快速的定位。

还是从官方的一个demo入手

RefererConfig<MotanDemoService> motanDemoServiceReferer = new RefererConfig<MotanDemoService>();
        // 设置接口及实现类
        motanDemoServiceReferer.setInterface(MotanDemoService.class);
        // 配置服务的group以及版本号
        motanDemoServiceReferer.setGroup("motan-demo-rpc");
        motanDemoServiceReferer.setVersion("1.0");
        motanDemoServiceReferer.setRequestTimeout(1000);
        // 配置注册中心为zk
        List<RegistryConfig>registryConfigList=new ArrayList<>();
        RegistryConfig registry1 = new RegistryConfig();
        registry1.setRegProtocol("zookeeper");
        registry1.setAddress("192.168.88.129:2181");
        registryConfigList.add(registry1);
        motanDemoServiceReferer.setRegistries(registryConfigList);
        // 配置RPC协议
        ProtocolConfig protocol = new ProtocolConfig();
        protocol.setId("motan");
        protocol.setName("motan");
		//设置负载策略和Ha策略,使用默认
        /*protocol.setHaStrategy();
        protocol.setLoadbalance();*/
        motanDemoServiceReferer.setProtocol(protocol);
        // 请求服务
        MotanDemoService service = motanDemoServiceReferer.getRef();
        System.out.println(service.hello("motan"));
		
调用过程很简单,注释已经描述了整个的过程。

motanDemoServiceReferer.getRef()

在上面的demo中,大部分代码都是做一些配置,于是当我们看到这行代码,就明白是它没错!!!
根据动态代理的知识,可以感觉到此处返回的是一个代理类。
 public T getRef() {
        if (ref == null) {
            initRef();
        }
        return ref;
 }

initRef()

 public synchronized void initRef() {
 
    ......
	 clusterSupports = new ArrayList<>(protocols.size());
        List<Cluster<T>> clusters = new ArrayList<>(protocols.size());
        String proxy = null;

        //根据SPI机制获取到 SimpleConfigHandler
        ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

        //获取注册中心的URL
        List<URL> registryUrls = loadRegistryUrls();
        String localIp = getLocalHostAddress(registryUrls);

        for (ProtocolConfig protocol : protocols) {
            Map<String, String> params = new HashMap<>();
            params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
            params.put(URLParamType.version.getName(), URLParamType.version.getValue());
            params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));

            collectConfigParams(params, protocol, basicReferer, extConfig, this);
            collectMethodConfigParams(params, this.getMethods());

            //获取路径
            String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
            //设置调用服务的URL  motan  ip 0 interfaceClass.getName() params
            URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);

            //ClusterSupport 构造 Cluster
            ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);

            clusterSupports.add(clusterSupport);
            clusters.add(clusterSupport.getCluster());

			//获取代理类型,默认是JDK动态代理
            if (proxy == null) {
                String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
                proxy = refUrl.getParameter(URLParamType.proxy.getName(), defaultValue);
            }
        }

        ref = configHandler.refer(interfaceClass, clusters, proxy);
 }

createClusterSupport()

         ......
        //给注册中心的URL新增属性embed:请求的refUrl
        for (URL url : regUrls) {
            url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr()));
        }
        return configHandler.buildClusterSupport(interfaceClass, regUrls);

         ......
		 
configHandler.buildClusterSupport(interfaceClass, regUrls)
参数:
接口全类名称
注册中心的URL+embed属性

到SimpleConfigHandler 里面的buildClusterSupport方法
 public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
        //初始化ClusterSupport,会根据embed属性获取请求的url,并且会获取protocol的具体实现类DefaultRpcProtocol
        ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);
        //执行init方法
        clusterSupport.init();

        return clusterSupport;
}

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
        this.registryUrls = registryUrls;
        this.interfaceClass = interfaceClass;
        String urlStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
        this.url = URL.valueOf(urlStr);
        //获取protocol的具体实现类DefaultRpcProtocol
        protocol = getDecorateProtocol(url.getProtocol());
    }

我们来看init()方法里面的 prepareCluster()都做了什么
private void prepareCluster() {
        //根据url获取到一些配置,包括负载策略和Ha策略
        String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue());
        String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue());
        String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue());
        //通过SPI机制获取具体的实现类,根据SPI获取cluster的具体实现类:ClusterSpi
cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(clusterName);
        //根据SPI获取负载均衡策略和Ha策略的具体实现,在demo中我们设置的注释掉了,因此使用默认的配置
        LoadBalance<T> loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);
        HaStrategy<T> ha = ExtensionLoader.getExtensionLoader(HaStrategy.class).getExtension(haStrategyName);
        ha.setUrl(url);
        //装载到cluster中
        cluster.setLoadBalance(loadBalance);
        cluster.setHaStrategy(ha);
        cluster.setUrl(url);
    }

回到init()方法中

Registry registry = getRegistry(ru);
registry.subscribe(subUrl, this);

这里是进行服务的订阅,通过zk的watch机制,当服务发生变化,会调用ClusterSupport里面的notify方法进行刷新集群信息

让我们看一下notify方法做了什么
public synchronized void notify(URL registryUrl, List<URL> urls) {

......

List<Referer<T>> newReferers = new ArrayList<Referer<T>>();
       for (URL u : urls) {
           if (!u.canServe(url)) {
               continue;
           }
           Referer<T> referer = getExistingReferer(u, registryReferers.get(registryUrl));
           if (referer == null) {
               URL refererURL = u.createCopy();
               mergeClientConfigs(refererURL);
               //会去初始化netty客户端 client = endpointFactory.createClient(url);
               //先调用ProtocolFilterDecorator里面的方法,在调用到AbstractProtocol里面的refer方法
               referer = protocol.refer(interfaceClass, refererURL, u);
           }
           if (referer != null) {
               newReferers.add(referer);
           }
       }

       if (CollectionUtil.isEmpty(newReferers)) {
           onRegistryEmpty(registryUrl);
           return;
       }

       // 此处不销毁referers,由cluster进行销毁
       registryReferers.put(registryUrl, newReferers);
       //刷新最新的服务到cluster
       refreshCluster();

}

AbstractProtocol里面的refer方法:

 Referer<T> referer = createReferer(clz, url, serviceUrl);
 
 由于在初始化ClusterSupport的时候已经获取到具体的DefaultRpcProtocol实现
 因此我们去看DefaultRpcProtocol里面的Referer方法
 protected <T> Referer<T> createReferer(Class<T> clz, URL url, URL serviceUrl) {
        return new DefaultRpcReferer<T>(clz, url, serviceUrl);
 }

 public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
        super(clz, url, serviceUrl);

        endpointFactory =
                ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                        url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));

		//创建一个nettyClient				
        client = endpointFactory.createClient(url);
}
 
 可以看到初始化DefaultRpcReferer的时候会创建一个nettyClient	。
 通过上面我们指定其实每个Referer 都是一个DefaultRpcReferer对象。并且最终会被刷新到cluster中,
 被loadBalance使用,选择一个可用的Referer给我们进行请求调用。

createClusterSupport方法执行结束后回到initRef()方法

//获取JDK动态代理
String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
proxy = refUrl.getParameter(URLParamType.proxy.getName(), defaultValue);

ref = configHandler.refer(interfaceClass, clusters, proxy);
参数:
接口全类名称
构造的clusters
jdk

 走到SimpleConfigHandler里面的refer方法
 public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
        return proxyFactory.getProxy(interfaceClass, clusters);
    }

可以看到是通过SPI机制获取到代理类JdkProxyFactory,并且传入了请求的接口信息和cluster

public class JdkProxyFactory implements ProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clz, List<Cluster<T>> clusters) {
        return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
    }
}

根据代理的知识,我们知道后面的事情就要靠RefererInvocationHandler来完成了,OK让我们继续深入

接下来进行方法调用

RefererInvocationHandler.invoke方法
        //构造DefaultRequest请求信息
        DefaultRequest request = new DefaultRequest();
        request.setRequestId(RequestIdGenerator.getRequestId());
        request.setArguments(args);
        String methodName = method.getName();
        boolean async = false;
        if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
            methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
            async = true;
        }
        request.setMethodName(methodName);
        request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
        request.setInterfaceName(interfaceName);

        return invokeRequest(request, getRealReturnType(async, this.clz, method, methodName), async);

invokeRequest()方法
忽略掉一些降级的操作
我们可以看到真正的入口方法
response = cluster.call(request);

整个代理过程如下
创建和设置Request对象
代理给Cluster
处理异常

cluster.call() 其实是很重要的方法

他通过前面设置的Ha策略和LoadBalance策略对我们的服务进行调用。

实际上调用的是ClusterSpi里面的call方法:

public Response call(Request request) {
        if (available.get()) {
            try {
                return haStrategy.call(request, loadBalance);
            } catch (Exception e) {
                return callFalse(request, e);
            }
        }
        return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
    }

haStrategy在Motan实现了两种FailfastHaStrategy和FailoverHaStrategy
我们只看简单的一种FailfastHaStrategy,OK到这里我们可以看到
会通过负载策略获取一个可用的服务,然后进行调用
public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> {

    @Override
    public Response call(Request request, LoadBalance<T> loadBalance) {
	    //获取一个可用的服务
        Referer<T> refer = loadBalance.select(request);
		//发起调用
        return refer.call(request);
    }
}

refer.call(request)
最终会走到DefaultRpcReferer类,这个类我们在前面看到已经进行了初始化了

protected Response doCall(Request request) {
        try {
            // 为了能够实现跨group请求,需要使用server端的group。
            request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
            return client.request(request);
        } catch (TransportException exception) {
            throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
        }
 }

这里就是通过nettyClient进行调用了。

总结:

客户端调用的大体流程
首先 会 根据 请求的配置参数构建一个cluster
包含了Ha和Loadbalance策略,并且通过注册中心监听服务的变化。
最后进行调用。
0人推荐
随时随地看视频
慕课网APP