通过上一章的分析,spring cloud 定义了LoadBalancerClient作为负载均衡器的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是在具体实现客户端负载均衡还是通过Ribbon的接口ILoadBalancer实现的。下面我们看下ILoadBalancer接口的实现类是如何实现负载均衡的。
AbstractLoadBalancer
/**
* AbstractLoadBalancer contains features required for most loadbalancing
* implementations.
* 译文:AbstractLoadBalancer包含大多数负载平衡所需的功能实现
*
* An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
* that are potentially bucketed based on a specific criteria. 2. A Class that
* defines and implements a LoadBalacing Strategy via <code>IRule</code> 3. A
* Class that defines and implements a mechanism to determine the
* suitability/availability of the nodes/servers in the List.
* 典型LoadBalancer的解剖结构包括
* 1.服务器列表(节点) 根据特定标准可能会被删除。
* 2.通过<code> IRule </ code>定义和实现LoadBalacing策略的类
* 3.定义和实现确定机制的类列表中节点/服务器的适用性/可用性
*
* @author stonse
*
*/
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
/**
* delegate to {@link #chooseServer(Object)} with parameter null.
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* List of servers that this Loadbalancer knows about
*
* @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* Obtain LoadBalancer related Statistics
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer是ILoadBalancer接口的抽象实现类,在里面定义了关于服务实例分组的枚举类ServerGroup,包含三种类型:
- ALL 所有服务实例
- STATUS_UP 正在运行的服务实例
- STATUS_NOT_UP 非正在运行的实例
还定义了一个chooseServer()函数,这个函数调用的是接口中的chooseServer(Object key),固定传参为null,代表忽略key的影响。
还定义了两个抽象函数:
- getServerList(ServerGroup serverGroup):根据分组类型还取不同的服务实例列表
- getLoadBalancerStats():该函数返回了一个LoadBalancerStats 对象,该对象是用来存储负载均衡器中的各个实例当前的统计信息及属性。我们可以用这些统计信息来观察负载均衡器的运行情况,然后及时调整运行策略。
BaseLoadBalancer
BaseLoadBalancer 是负载均衡器的基础实现类,它定义了许多基础的内容:
- 定义了两个存储服务实例server的list,一个存储了所有服务实例,一个存储了正常使用的服务实例。
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
-
定义了存储负载均衡器中的各个实例的LoadBalancerStats对象。
-
定义了坚持服务是否正常的IPing对象,在BaseLoadBalancer中默认为null,需要构造时注入它的实现
-
定义了检查服务实例的执行策略对象IPingStrategy,默认使用了本类的静态内部类SerialPingStrategy实现,根据源码,我们可以看到该策略是使用线性轮询
ping服务实例的方法实现检查。但是如果IPing的实现速度不理想,或是server列表过大,都可能影响服务性能,这时候就需要通过重写pingServers函数扩展ping的执行策略。
/**
* Default implementation for <c>IPingStrategy</c>, performs ping
* serially, which may not be desirable, if your <c>IPing</c>
* implementation is slow, or you have large number of servers.
*/
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// NOTE: IFF we were doing a real ping
// assuming we had a large set of servers (say 15)
// the logic below will run them serially
// hence taking 15 times the amount of time it takes
// to ping each server
// A better method would be to put this in an executor
// pool
// But, at the time of this writing, we dont REALLY
// use a Real Ping (its mostly in memory eureka call)
// hence we can afford to simplify this design and run
// this
// serially
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
- 定义了负载均衡规则IRule对象,而BaseLoadBalancer的chooseServer函数正是由IRule的实现类RoundRobinRule实现的,而且是常规的线性负载均衡。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
- 启动ping任务,在BaseLoadBalancer默认构造函数中会默认启动一个定时检查server是否正常的任务,默认十秒钟一次。
protected int pingIntervalSeconds = 10;
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
String ruleClassName = (String) clientConfig
.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
String pingClassName = (String) clientConfig
.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);
IRule rule;
IPing ping;
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(
ruleClassName, clientConfig);
ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(
pingClassName, clientConfig);
} catch (Exception e) {
throw new RuntimeException("Error initializing load balancer", e);
}
initWithConfig(clientConfig, rule, ping);
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
setLoadBalancerStats(new LoadBalancerStats(clientName));
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
}
init();
}
public void setPing(IPing ping) {
if (ping != null) {
if (!ping.equals(this.ping)) {
this.ping = ping;
setupPingTask(); // since ping data changed
}
} else {
this.ping = null;
// cancel the timer task
lbTimer.cancel();
}
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
-
因为此类实现了ILoadBalancer接口 所以也实现了它的接口方法。
- addServers(List newServers),向负载均衡器添加新的服务实例,该函数会把新的服务实例和已经存在的服务实例都加到一个新的list里面然后通过setServersList对服务统一处理,
而后面将要介绍的几个扩展实现类都是重写setServersList方法对服务更新进行优化
/** * Add a list of servers to the 'allServer' list; does not verify * uniqueness, so you could give a server a greater share by adding it more * than once */ @Override public void addServers(List<Server> newServers) { if (newServers != null && newServers.size() > 0) { try { ArrayList<Server> newList = new ArrayList<Server>(); newList.addAll(allServerList); newList.addAll(newServers); setServersList(newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e); } } }
- chooseServer(Object key),通过具体的IRule实例,对服务进行负载均衡
/* * Get the alive server dedicated to key * * @return the dedicated server */ public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
- markServerDown(Server server),markServerDown(String id),标记具体的服务不可用,不是关闭。
public void markServerDown(Server server) { if (server == null || !server.isAlive()) { return; } logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId()); server.setAlive(false); // forceQuickPing(); notifyServerStatusChangeListener(singleton(server)); } public void markServerDown(String id) { boolean triggered = false; id = Server.normalizeId(id); if (id == null) { return; } Lock writeLock = upServerLock.writeLock(); writeLock.lock(); try { final List<Server> changedServers = new ArrayList<Server>(); for (Server svr : upServerList) { if (svr.isAlive() && (svr.getId().equals(id))) { triggered = true; svr.setAlive(false); changedServers.add(svr); } } if (triggered) { logger.error("LoadBalancer [{}]: markServerDown called for server [{}]", name, id); notifyServerStatusChangeListener(changedServers); } } finally { writeLock.unlock(); } }
- List getReachableServers(),获取可用实例列表。
@Override public List<Server> getReachableServers() { return Collections.unmodifiableList(upServerList); }
- List getAllServers(),获取所有服务实例
@Override public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); }
- addServers(List newServers),向负载均衡器添加新的服务实例,该函数会把新的服务实例和已经存在的服务实例都加到一个新的list里面然后通过setServersList对服务统一处理,
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer继承BaseLoadBalancer,它对基础负载均衡器进一步的扩展,它实现了服务在运行的时间可以更新状态的功能,还增加了服务实例的过滤功能,所以我们可以根据筛选条件选择哪些服务实例。
ServerList
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
volatile ServerList<T> serverListImpl;
}
我们可以看到,DynamicServerListLoadBalancer使用了泛型,泛型T继承server,是server的一个子类,代表是服务实例的扩展类。我们看下ServerList接口定义:
//定义用于获取服务器列表的方法的接口
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* 返回更新的服务器列表。 每30秒返回一次
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
- getInitialListOfServers:获取初始化服务实例
- getUpdatedListOfServers:获取更新之后的服务实例
根据上图可以看到ServiceList有多个实现,我们看下DynamicServerListLoadBalancer究竟用的哪一种实现,现在Ribbon和Eureka整合,如果想要实现负载均衡肯定需要
有获取服务实例的能力,我们去整合的包org.springframework.cloud.netflix.ribbon.eureka下面看看,发现一个配置类,叫EurekaRibbonClientConfiguration,看到如下该方法:
@Value("${ribbon.client.name}")
private String serviceId = "client";
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
该类默认了一个ServerList的分组,也可以通过注入覆盖,如果PropertiesFactory已经存在相应的ServerList就返回,否则自己通过Eureka客户端重新创建一个,而DomainExtractingServerList是ServerList的一个实现,并且实现了getInitialListOfServers和getUpdatedListOfServers方法。
/**
* @author Dave Syer
*/
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
private ServerList<DiscoveryEnabledServer> list;
private IClientConfig clientConfig;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.clientConfig = clientConfig;
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
}
我们看到这两个函数都是list也就是刚才通过构造方法传入的DiscoveryEnabledNIWSServerList调用自己的函数来获取不同的集合,我们掉头看一下DiscoveryEnabledNIWSServerList类。
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
我们发现这两个方法竟然同时调用obtainServersViaDiscovery函数,我们看下该函数的处理过程。
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
该函数主要依靠eurekaClient从服务注册中心获取到的服务实例InstanceInfo列表,然后对这些服务进行遍历,讲状态为UP的实例转换成DiscoveryEnabledServer对象,生成List返回。
返回的List通过DomainExtractingServerList类的setZones函数进行处理,把List转化为了DiscoveryEnabledServer类的子类DomainExtractingServer,通过该类对象的构造函数设置了Id、状态等服务信息。
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.clientConfig.getPropertyAsBoolean(
CommonClientConfigKey.IsSecure, Boolean.TRUE);
boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(
CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
ServerListUpdater
通过上面的分析,我们可出Ribbon和Eureka 整合之后是从Eureka Server里面拿到服务实例列表,然后通过自己实现的算法实现负载均衡,那么它是如何触发获取实例的动作呢?我们继续看下本节的负载均衡器DynamicServerListLoadBalancer,看到最上面定义了一个属性,
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
我们可以看出来这是实现了ServerListUpdater里面一个接口,从名字就可以看出来,这是一个更新服务列表的实现,这也从侧面体现出规范命名的重要性,我们看下ServerListUpdater接口:
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
* 一个用于执行服务器列表更新动作的接口
*/
public interface UpdateAction {
void doUpdate();
}
/**
* start the serverList updater with the given update action
* This call should be idempotent.
* 开始服务更新,根据UpdateAction具体实现更新逻辑
* @param updateAction
*/
void start(UpdateAction updateAction);
/**
* stop the serverList updater. This call should be idempotent
* 停止更新接口
*/
void stop();
/**
* @return the last update timestamp as a {@link java.util.Date} string
* 返回上次更新的时间
*/
String getLastUpdate();
/**
* @return the number of ms that has elapsed since last update
* 获取上次更新时间到现在的间隔
*/
long getDurationSinceLastUpdateMs();
/**
* @return the number of update cycles missed, if valid
* 返回无效的更新周期数
*/
int getNumberMissedCycles();
/**
* @return the number of threads used, if vaid
* 获取核心线程数
*/
int getCoreThreads();
}
我们可以看到,除了对服务列表进行更新,还定义了一大堆其他操作。
该类有两个实现类
- PollingServerListUpdater:动态服务列表更新的默认策略,也就是DynamicServerListLoadBalancer的默认实现,他通过定时任务定时更新
- EurekaNotificationServerListUpdater:该更新策略与PollingServerListUpdater不同,它需要利用Eureka的时间监听器来催动服务列表的更新。
我们先看下默认策略实现 PollingServerListUpdater,我们先看下如何启动更新的:
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
我们可以看到新建了Runnable的接口实现,再为这个线程启动一个定时任务进行更新。我们看到启动定时任务用到了两个参数
- initialDelayMs 查看构造函数调用链发现 默认1000
- refreshIntervalMs 查看构造函数调用链发现 30 * 1000;
说明start函数在初始化一秒之后,以每三十秒的间隔重复执行。其他函数就不在一一介绍。
ServerListFilter
刚才在DynamicServerListLoadBalancer实现了UpdateAction接口,调用了updateListOfServers函数:
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
我们看到该函数使用了之前介绍的ServerList的getUpdatedListOfServers,这个就是通过EurekaServer来获取可以使用的服务实例列表。
里面还使用到了filter,发现DynamicServerListLoadBalancer类里面还定义了一个泛型的ServerListFilter,而这个接口里面之有一个方法:
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
它主要实现了对服务列表的过滤,根据特定的规则返回特定的服务列表,通过idea还能看到他有五个实现,除了ZonePreferenceServerListFilter在Spring Cloud包下面,其他都在Netflix Ribbon包下面,
说明除了ZonePreferenceServerListFilter其他都是使用的原生实现。下面我们分析一下这五个过滤器。
- AbstractServerListFilter:这是一个抽象的过滤器
public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
private volatile LoadBalancerStats stats;
public void setLoadBalancerStats(LoadBalancerStats stats) {
this.stats = stats;
}
public LoadBalancerStats getLoadBalancerStats() {
return stats;
}
}
- ZoneAffinityServerListFilter :该过滤器是基于“区域感知”的方法进行实例过滤,他会根据提供服务的实例所处的区域(Zone)和消费者所处区域比较,过滤掉不是同一个区域的实例。
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
从上面源码看到,对服务列表的过滤是通过谷歌的一个迭代器方法Iterables.filter( servers, this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,他通过zoneAffinityPredicate实现的服务实例和传入的消费者服务实例的Zone比较,得到需要的服务实例,
而且得到之后并不会返回,而是加了一个是否启动“区域感知”的判断,我们看下该函数:
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
我们从上面函数可以看出,它是通过LoadBalancerStats的getZoneSnapshot函数来过去过滤后的服务实例一些基本信息(实例个数、断路器断开处、活动请求数、实例平均负载数等),根据设置的规则进行判断是否不启用“区域感知”的列表,这一算法实现当集群出现区域故障时,可以依靠其他
区域的实例提供服务,提升了高可用性。
- blackOutServerPercentage 故障实例百分比(断路器断开数/实例数量)>= 0.8 - blackOutServerPercentage 故障实例百分比(断路器断开数/实例数量)>= 0.8
- blackOutServerPercentage 故障实例百分比(断路器断开数/实例数量)>= 0.8 - activeReqeustsPerServer 实例平均负载 >= 0.6
- blackOutServerPercentage 故障实例百分比(断路器断开数/实例数量)>= 0.8 - availableServers 可用实例数(实例数量 - 断路器断开数) < 2
- DefaultNIWSServerListFilter 它完全继承于ZoneAffinityServerListFilter。
- ServerListSubsetFilter 该过滤器也继承与ZoneAffinityServerListFilter,但是他对功能做了一些强化和拓展,该过滤器非常适合于拥有大规模服务器集群(上百或更多)的系统,因为它可以产生一个“区域感知”的子集合,他还可以通过比较服务的通讯失败数量和并发连接数来判断服务的状态,
从而剔除那些服务相对不健康的实例。它实现过滤分为以下三步:
-
获取“区域感知”的过滤结果,作为候选的服务实例清单。
-
从当前消费者维护的服务列表中剔除相对不够健康的实例,同时也从候选清单中剔除,防止第三步选入,不够健康标准如下:
a. 服务实例的并发数超过客户端的配置的值,默认为0,配置参数为:ribbon.ServerListSubsetFilter.eliminationConnectionThresold
b. 服务实例的失败数超过客户端的配置的值,默认为0,配置参数为:ribbon.ServerListSubsetFilter.eliminationFailureThresold
c. 如果按照上面的规则剔除的实例比率小于客户端配置默认的百分比,就从剩下的实例列表进行健康排序,再把最不健康的剔除,直至达到配置的剔除百分比,默认百分之十,配置参数为:ribbon.ServerListSubsetFilter.forceEliminatePercent
-
在完成剔除后,清单已经少了百分之十(如果是默认配置)的实例,然后通过随机选择的方式筛选一批实例放到清单中,保证服务实例子集与原来的数量一致,默认的实例子集是20个,配置参数为:ribbon.ServerListSubsetFilter.size
- ZonePreferenceServerListFilter 该类继承与ZoneAffinityServerListFilter,这是SpringCloud整合Ribbon时新加的过滤器,如果使用SpringCloud整合Eureka和Ribbon会自动使用该过滤器,它实现了通过配置或者根据Eureka实例元数据的所属区域来筛选出同区域的服务实例:
@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
List<Server> output = super.getFilteredListOfServers(servers);
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<Server>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
根据源码显示,首先通过父类ZoneAffinityServerListFilter的过滤器获取服务实例列表,然后遍历,根据消费者配置预设的区域进行过滤,如果过滤后结果为空直接返回父类的结果,
如果不为空,则返回自己过滤的结果。
未完待续。。。。