上一篇我们介绍了暴露服务,这一篇我们来说引用服务。首先我们看下应用层引用服务所做的
@Bean public ReferenceBean<LogService> logService(){ ReferenceBean<LogService> referenceBean=new ReferenceBean<>(); referenceBean.setInterface(LogService.class); return referenceBean; }
在使用的地方,声明了一个ReferenceBean,然后指明了要引用的类型,就可以了,他内部其实封装了大量的内容,我们开始吧。ReferenceBean也是一个InitializingBean,所以看他的声明周期方法是afterPropertiesSet,基本上和ServiceBean是一致的,同时他也是FactoryBean类的实例,所以他的getObject方法也是在生命周期过程中被主动调用到的,他里面引用的是get()方法,他的实现也比较简单
public synchronized T get() { if (destroyed){ throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }
那么看来最核心的是init(),他里面做的就是装配Dubbo所依赖的属性,最后我们着眼于
ref = createProxy(map);
基于上面提供的属性放到map中,根据这个map创建一个代理对象作为ref。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //默认情况下如果本地有服务暴露,则引用本地服务. isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 通过注册中心配置拼装URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最后一个registry url } } if (registryURL != null) { // 有 注册中心协议的URL // 对有注册中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 注册中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 创建服务代理 return (T) proxyFactory.getProxy(invoker); }
我们从List<URL> us = loadRegistries(false)
开始说起,从注册中心获取注册的URL,例如registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user&dubbo=2.5.3&pid=2836®istry=zookeeper×tamp=1525835334588
。这是原始的链接,随后增加REFER_KEY
,他的内容就是传入的数据组成的map。拼接之后是encode后的内容,为了方便观察,我们在文中列出的是decode后的内容registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user&dubbo=2.5.3&pid=2836&refer=application=user&default.check=false&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=2836&revision=1.0-SNAPSHOT&side=consumer×tamp=1525835306361®istry=zookeeper×tamp=1525835306402
.URL拼装完成后,我们就进入到我们的主角refprotocol.refer(interfaceClass, url)
,开始自己的引用逻辑。如果大家还有印象,就知道我们refprotocol的处理是经过了ProtocolFilterWrapper,ProtocolListenerWrapper,这两个类里的处理,和export是相同的,见到registry直接忽略,最后进入了RegistryProtocol中,为啥是这个协议,不是其他的协议,可以看下我们之前的文章,里面会有答案。RegistryProtocol中的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0 ) { if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1 || "*".equals( group ) ) { return doRefer( getMergeableCluster(), registry, type, url ); } } return doRefer(cluster, registry, type, url); }
url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY)
第一件事就是替换协议,过来的时候的协议是registry协议,我们重新设置协议,变成注册中心的协议,例如我们例子中使用的zookeeper.替换协议后,删除了key为registry的内容。替换协议的目的是为了下一步,从registryFactory挑选合适的注册器,我们使用的是zk的协议,那么注册器就是zk的注册器。我们需要的数据都有了,我们就可以进行真正的引用逻辑了doRefer了
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); }
首先我们想一个问题,这个cluster是怎么来的?答案在我的上一篇文章依赖注入里面。随后我们看下subscribeUrl长啥样。consumer://172.16.197.200/com.linyang.test.service.LogService?application=user&category=providers,configurators,routers&default.check=false&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=2836&revision=1.0-SNAPSHOT&side=consumer×tamp=1525835306361
。随后进行了订阅registry.subscribe(url, this);
首先这个URL的内容就是上面那个,这是this就是指RegistryDirectory的实例,也就是说他也是NotifyListener的实例。我们知道这里的registry是ZookeeperRegistry,所以他的subscribe方法
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服务器端发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }
super.subscribe(url, listener);
往本地存储映射关系,key是url,value是一个Set<Listener>。最关键的无疑是doSubscribe(url, listener);
,下面是ZookeeperRegistry的doSubscribe的部分实现
List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls);
ChildListener是对关注的路径的有他下一级的节点的增减,就会触发的监听器。所以我们看到listeners.putIfAbsent
里放置的是一个匿名类,他里面的实现就是notify。也就是说当关注的路径的下增减节点,就会触发回调,然后通过notify方法,进行业务数据的变更逻辑。当我们,都注册好了之后,自己触发了notify,他的具体的逻辑在doNotify里面。
protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.size() == 0) && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); } }
忽略里面的所有的细节,我们看下主干就是listener.notify。这里的listener就是我们之前在接口中传入的RegistryDirectory的实例对象,所以调用的就是他的notify方法。
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && configuratorUrls.size() >0 ){ this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && routerUrls.size() >0 ){ List<Router> routers = toRouters(routerUrls); if(routers != null){ // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合并override参数 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); }
这里面的操作就是根据传入的url就行创建或者更新invoker.重点放在 refreshInvoker(invokerUrls)
,我们看一下传入的urldubbo://172.16.197.200:20880/com.linyang.test.service.LogService?anyhost=true&application=log&default.proxy=javassist&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=2747&side=provider&threads=100×tamp=1525835283678
这个就是provider的信息,后面会根据这个provider的url创建出来一个代理,给consumer端的项目使用。下面我们看看refreshInvoker。
private void refreshInvoker(List<URL> invokerUrls){ if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止访问 this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 关闭所有Invoker } else { this.forbidden = false; // 允许访问 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){ invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比 } if (invokerUrls.size() ==0 ){ return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表 // state change //如果计算错误,则不进行处理. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){ logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString())); return ; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try{ destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker }catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
其中有一个toInvokers的方法,就是讲传入的invokerUrls转成具体的invoker,也就是providerurl转成invoker的过程。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if(urls == null || urls.size() == 0){ return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //如果reference端配置了protocol,则只选择匹配的protocol if (queryProtocols != null && queryProtocols.length() >0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL参数是排序的 if (keys.contains(key)) { // 重复URL continue; } keys.add(key); // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 缓存中没有,重新refer try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = ! url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t); } if (invoker != null) { // 将新的引用放入缓存 newUrlInvokerMap.put(key, invoker); } }else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
划重点!protocol.refer(serviceType, url)
因为我们传过来的协议是dubbo,所以使用的最后处理的就是DubboProtocol,当然少不了的是warpper类的包裹,这个和export是一致的,可以自己进行回顾。下面是DubboProtocl的refer
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
会根据url来获得一个clients
private ExchangeClient[] getClients(URL url){ //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
不管是getSharedClient(url)
还是initClient(url)
最终都会调用initClient(url)
,所以我们看看他是怎么初始化的。
private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在严重性能问题,暂时不允许使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //设置连接应该是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
出现了我们的Exchanges层client = Exchangers.connect(url ,requestHandler)
也就是说对于网络的封装都是通过Exchangers作为入口,剩下的代码基本就和export端的代码相同了,只不过,consumer端的操作一直就是连接上服务端的ip,端口,发送消息罢了。后面就是getExchanger(url).connect(url, handler)
getExchanger(url)获得的是HeaderExchanger,调用他的connect方法,他的内部是通过Transporters来连接的
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
那么他的操作也和exchange是相同的getTransporter().connect(url, handler);
getTransporter默认使用的是NettyTransporter,所以调用的NettyTransporter的connect方法进行的具体的链接,他的内部创建了一个NettyClient
public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); }
返回DubboProtocol的refer方法,将创建的NettyClient封装成DubboInvoker返回。
toInvokers要说的就结束了,我们看下他的下一个方法。toInvokers是将URL列表转成url和对应的invoker的映射关系,就是一个map。toMethodInvokers是将toInvokers的返回值作为入参,返回方法名称作为key,value是可以被执行的invoker的列表,这里说的可以被执行,不是指全部,而是只经过route过滤后的。我们看下toMethodInvokers
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) { Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>(); // 按提供者URL所声明的methods分类,兼容注册中心执行路由过滤掉的methods List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>(); if (invokersMap != null && invokersMap.size() > 0) { for (Invoker<T> invoker : invokersMap.values()) { String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY); if (parameter != null && parameter.length() > 0) { String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter); if (methods != null && methods.length > 0) { for (String method : methods) { if (method != null && method.length() > 0 && ! Constants.ANY_VALUE.equals(method)) { List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null) { methodInvokers = new ArrayList<Invoker<T>>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); } } newMethodInvokerMap.put(Constants.ANY_VALUE, invokersList); if (serviceMethods != null && serviceMethods.length > 0) { for (String method : serviceMethods) { List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null || methodInvokers.size() == 0) { methodInvokers = invokersList; } newMethodInvokerMap.put(method, route(methodInvokers, method)); } } // sort and unmodifiable for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) { List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers)); } return Collections.unmodifiableMap(newMethodInvokerMap); }
从invoker中的url获取到provider提供的方法,根据方法名称作为key,可以为此方法提供远程调用的invoker的列表作为value,将上面的映射放入到methodInvokers,然后有一个invokersList是一个调用者的列表,可以看做是传入的invokersMap中的values,他也是newMethodInvokerMap中的value,key呢就是*
.填充完整个map后。目前为止是全量的数据,我们前面说的过滤呢,请看这一行newMethodInvokerMap.put(method, route(methodInvokers, method));
里面的这个route方法就是所谓的过滤。我们详细的看下
private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) { Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); List<Router> routers = getRouters(); if (routers != null) { for (Router router : routers) { if (router.getUrl() != null && ! router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } } return invokers; }
getRouters
可以看做寻找所有的invoker的过滤器,随后遍历这些过滤器,将传入的所有的invokers进行过滤router.route
,过滤完了在返回过滤后的invokers.Router分为两种,一种是条件ConditionRouter,表示满足某种条件才是我们需要的Router,另外一种是ScriptRouter,通过具体的表达式判断,应用不广泛。
到目前为止Registry.doRefer中的directory.subscribe已经执行完了,后面就是cluster.join(directory)
返回一个invoker.我们再回到最初的地方ReferenceConfig中的createProxy中refprotocol.refer(interfaceClass, url)
创建的invokers,可以暴露成一个invoker给上层调用,凭借的就是invoker = cluster.join(new StaticDirectory(invokers))
,随后将这个集成的这个invoker进行代理,生产代理对象proxyFactory.getProxy(invoker)
,这个代理对象就是ref,也就是我们需要使用的接口的代理对象。
到目前为止,服务引用,基本上就说完了。我们再花少量的时间,说一下具体使用这个代理对象,发起远程调用的过程。现在我们有了一个动态代理产生的对象。
JDK的动态代理,创建代理的时候
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); }
Handler是InvokerInvocationHandler的实例,并且他的构造函数需要传入invoker。所以当应用被代理的类的接口的时候,实际上回走到InvokerInvocationHandler.invoke,这个invoker是MockClusterInvoker的实例,他里面做了真正的invoker和directory的封装,从directory中获取到url,从里面搜索要调用的方法是否是mock的类型,如果不是,就直接发起调用,如果是的话,就进行mock,这个设计主要是为了熔断。当被调用的服务已经不堪重负了,在调用也处理不过来,就不调用了,使用策略进行mock。当然,这不是我们关注的重点。真正的invoker是FailoverClusterInvoker的实例,所以看下他的invoke方法。
public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
首先我们看下list(invocation)
做了什么
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
将这个地址簿中所有可以提供调用的invoker都列出来了。不管我们有多少个invoker,最后我们使用的肯定只有一个,我们通过负载均衡LoadBalance选出。如果是多个,我们通过URL中的信息得到具体的负载均衡器,否则使用默认的,默认的是random,也就是随机。我们所有的组件都有了,我们看下下一个方法doInvoke(invocation, invokers, loadbalance)
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
我们最为关注的就是Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
看看选择出一个具体的invoker来进行后面的流程的。
/** * 使用loadbalance选择invoker.</br> * a)先lb选择,如果在selected列表中 或者 不可用且做检验时,进入下一步(重选),否则直接返回</br> * b)重选验证规则:selected > available .保证重选出的结果尽量不在select中,并且是可用的 * * @param availablecheck 如果设置true,在选择的时候先选invoker.available == true * @param selected 已选过的invoker.注意:输入保证不重复 * */ protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ; { //ignore overloaded method if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){ stickyInvoker = null; } //ignore cucurrent problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){ if (availablecheck && stickyInvoker.isAvailable()){ return stickyInvoker; } } } Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); if (sticky){ stickyInvoker = invoker; } return invoker; }
这个方法的主体就是Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // 如果只有两个invoker,退化成轮循 if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试. if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if(rinvoker != null){ invoker = rinvoker; }else{ //看下第一次选的位置,如果不是最后,选+1位置. int index = invokers.indexOf(invoker); try{ //最后在避免碰撞 invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; }catch (Exception e) { logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); } } }catch (Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t); } } return invoker; }
这个方法中的关键点是Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
通过具体的负载均衡的算法得到一个invoker,最后调用invoker.invoke(invocation)发起远程请求。
测试源码
END
作者:数齐
链接:https://www.jianshu.com/p/9098bdef5448