手记

(五)Hystrix微服务容错

1. 背景

在微服务分布式环境下,服务被我们拆分成了许多服务单元,服务之间通过注册和订阅机制相互依赖。系统间的依赖十分的庞大和复杂,一个请求可能会经过多个依赖服务,最后完成调用。
分布式应用中存在错综复杂的相互依赖。

1.1 微服务面临的问题

当系统中某个服务出现延迟或者不可用时,那么整个用户请求都被阻塞,最终导致该用户功能不可用。依赖的服务越多,那么不可用的风险就越大。

高请求量情况下,由于网络原因或者是服务自身的不可用,导致出现故障或者延时。这些问题会导致服务调用方对外提供的服务出现延迟,此时调用方的外来请求不断增加,任务不断的积压,资源不断被占用,最后导致服务调用方自身服务瘫痪。

高并发情况下如果其中一个服务不可用,那么整个系统都可能会面临崩溃的风险。对比传统高可用,传统高可用相互独立,互不影响。

举个例子,在我们的模拟交易系统中,我们会将我们的系统拆分成学生、交易、股票、教学等一系列模块。学生端发出交易请求到交易中心,交易中心又会发请求到股票服务查询一些股票的基本信息。这个时候如果股票服务由于自身或者网络原因出现延迟,那么交易中心的请求就会阻塞等待股票服务的返回。漫长的等待之后,股票服务调用失败,返回信息给交易中心,交易中心又将失败结果返回给学生端。在高并发的情况下,这些阻塞的线程就会导致交易中心资源被大量占用,最终导致交易中心不可用。

1.2 断路器

在我们的家里,都会安装断路器或者是保险丝保护电路过载。当电流过大时自动跳闸或者熔断,避免设备损坏甚至火灾等严重后果。

那么在我们微服务体系中有没有这么一个“断路器”,当服务不可用时及时切断调用链路,快速响应失败,来保护我们服务的安全呢?

2.1 Hystrix简介

对于以上的问题,Spring Clould Hystrix实现了断路器、依赖隔离、监控等一系列功能。Hystrix是由Netflix开源的一个延迟和容错库,用于隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。Hystrix/test/advisor主要通过以下几点实现可用性与容错性。

  • 包裹请求:使用HystrixCommand(或HystrixObservableCommand)包裹对依赖的调用逻辑,每个命令在独立线程中执行。这里使用了设计模式中的“命令模式”。
  • 跳闸机制:当某服务的错误率超过一定阈值时,Hystrix可以自动或者手动跳闸,停止请求该服务一段时间。
  • 资源隔离:Hystrix为每个依赖都维护了一个小型的线程池(或者信号量)。如果该线程池已满,发往该依赖的请求就被立即拒绝,而不是排队等候,从而加速失败判定。
  • 监控:Hystrix可以近乎实时地监控运行指标和配置的变化,例如成功、失败、超时和被拒绝的请求等。
  • 回退机制:当请求失败、超时、被拒绝,或当断路器打开时,执行回退逻辑。回退逻辑可由开发人员自行提供,例如返回一个缺省值。

2.2 如何使用

2.2.1 单独使用

  1. 引入依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
  1. 编码
@RestController
@RequestMapping("/stock")
public class StockController {

	@Autowired
	private StockService stockService;

	@GetMapping()
	public Stock getStock() {
		return StockService.getStock();
	}
}


@Service
public class StockService {

	@Autowired
	private RestTemplate restTemplate;

	@HystrixCommand(fallbackMethod = "getStockFallback")
	public Stock getStock() {
        // 如果调用多次调用失败,直接触发熔断,调用getStockFallback()
		return restTemplate.getForObeject("http://STOCK-SERVICE/stock", Stock.class);
	}

	@GetMapping()
	public Stock getStockFallback() {
        // 降级逻辑不依赖网络等其他有风险的渠道
		return new Stock();
	}
}
  1. 详细配置参考spring-cloud-netflix-hystrix-x.x.x.x.jar/MATA-INF/spring-configuration-metadata.json

2.2.2 结合Feign使用

  1. 引入依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
  1. 编码
@RestController
@RequestMapping("/stock")
public class StockController {

	@Autowired
	private RestTemplate restTemplate;

	@GetMapping()
	public Stock getStock() {
		return StockService.getStock();
	}
}


@FeignClient(value = "STOCK-SERVICE", fallbackFactory = StockServiceFallback.class)
@RequestMapping("/stock")
public class StockService {

    @GetMapping()
	public Stock getStock();

}


@Component
public class StockServiceFallback implements StockService {

	public Stock getStock() {
		return new Stock();
	}
}
  1. 详细配置参考spring-cloud-openfeign-core-x.x.x.x.jar/MATA-INF/spring-configuration-metadata.json

3 原理分析

3.1 Hystrix在整个体系中的位置

3.2 集成Feign源码分析

在Feign源码分析中我们知道了,@EnableFeignClients注解中导入了FeignClientsRegistrar.classregisterBeanDefinitions()为入口函数,我们从registerBeanDefinitions()函数开始分析。

class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
	// 扫描到所有@FeignClient注解
	public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
		
        // 省略...
        
		for (String basePackage : basePackages) {
			Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
			for (BeanDefinition candidateComponent : candidateComponents) {
				if (candidateComponent instanceof AnnotatedBeanDefinition) {
                    
					// 省略...
                    
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());
					registerFeignClient(registry, annotationMetadata, attributes);
				}
			}
		}
	}

	// 解析@FeignClient,生产FeignClient工厂
	private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
		// 省略...
        // 定义FeignClientFactoryBean
		BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
		// 解析@FeignClient,读取将其中的属性配置到FeignClientFactoryBean
		definition.addPropertyValue("url", getUrl(attributes));
		definition.addPropertyValue("path", getPath(attributes));
		
        // 省略...

		// 注册
		BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
	}
}

FeignClientsRegistrar中我们注册了FeignClientFactoryBean,在分析这个类之前我们先看一下配置类。

@Configuration
public class FeignClientsConfiguration {
	@Configuration
	@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
	protected static class HystrixFeignConfiguration {
		@Bean
		@Scope("prototype")
		@ConditionalOnMissingBean
		@ConditionalOnProperty(name = "feign.hystrix.enabled")
		public Feign.Builder feignHystrixBuilder() {
			return HystrixFeign.builder();
		}
	}
}

@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
	@Configuration
	@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
	protected static class HystrixFeignTargeterConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public Targeter feignTargeter() {
			return new HystrixTargeter();
		}
	}

	@Configuration
	@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
	protected static class DefaultFeignTargeterConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public Targeter feignTargeter() {
			return new DefaultTargeter();
		}
	}
}

看完配置之后我们分析一下FeignClientFactoryBean这个类。

FeignClientFactoryBean实现了FactoryBean接口。实现了FactoryBean接口代表他是一个工厂类,最终会通过getObject()方法返回真正的类。关于FactoryBean的知识大家可以自行去扩展学习。

class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
    @Override
	public Object getObject() throws Exception {
		return getTarget();
	}

	<T> T getTarget() {
		FeignContext context = applicationContext.getBean(FeignContext.class);
        // 此处的Feign.Builde类就是配置类中调用HystrixFeign.builder()函数生成
		Feign.Builder builder = feign(context);
        
		// 省略...
        // 配置了hystrix则返回HystrixTargeter
		Targeter targeter = get(context, Targeter.class);
		return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
				this.type, this.name, url));
	}
}

确认了Feign.Builderh和Targeter之后我们看一下HystrixTargeter这个类。

class HystrixTargeter implements Targeter {

	@Override
	public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
						Target.HardCodedTarget<T> target) {
        // 两种处理方式对应@FeignClient注解
		// 降级方法
		Class<?> fallback = factory.getFallback();
		if (fallback != void.class) {
			return targetWithFallback(factory.getName(), context, target, builder, fallback);
		}
        // 降级方法工厂
		Class<?> fallbackFactory = factory.getFallbackFactory();
		if (fallbackFactory != void.class) {
			return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
		}
		return feign.target(target);
	}

	private <T> T targetWithFallback(String feignClientName, FeignContext context,
									 Target.HardCodedTarget<T> target,
									 HystrixFeign.Builder builder, Class<?> fallback) {
		T fallbackInstance = getFromContext("fallback", feignClientName, context, fallback, target.type());
		return builder.target(target, fallbackInstance);
	}
}

最终builder.target(target, fallbackInstance)会生成一个HystrixInvocationHandler动态代理对象。HystrixInvocationHandler对象实现了InvocationHandler接口,该接口的invoke方法就是后续我们触发hystrix逻辑的入口。

3.3 Hystrix原理分析

  1. 创建HystrixCommand 或者 HystrixObservableCommand 对象,将需要的参数和其他信息包装成一个命令(命令模式)。

    • HystrixCommand:依赖服务返回单个对象。
  • HystrixObservableCommand :依赖服务返回多个对象。hystrix有一个组合请求的功能,它可以把某一个时间段内的请求组合成一次请求发送给服务提供者。减少请求次数,但是增加了部分请求的等待时间。

    // 此处沿着feign的流程走下来,我们只看HystrixCommand的执行
    final class HystrixInvocationHandler implements InvocationHandler {
      @Override
      public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
        // 省略...
        HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {
              @Override
              protected Object run() throws Exception {
                try {
                  //  第6步,最终我们的方法会在这里执行
                  return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
                } catch (Exception e) {
                  throw e;
                } catch (Throwable t) {
                  throw (Error) t;
                }
              }
              // 省略...
            };
    	// 第9步,根据我们的使用方式会有下列的一些返回情况
        if (Util.isDefault(method)) {
          return hystrixCommand.execute();
        } else if (isReturnsHystrixCommand(method)) {
          return hystrixCommand;
        } else if (isReturnsObservable(method)) {
          return hystrixCommand.toObservable();
        } else if (isReturnsSingle(method)) {
          return hystrixCommand.toObservable().toSingle();
        } else if (isReturnsCompletable(method)) {
          return hystrixCommand.toObservable().toCompletable();
        }
        return hystrixCommand.execute();
      }
    }
    
  1. 执行命令。四种执行方式。

    • execute():同步执行,返回单个结果对象。
    • queue() :返回一个Future对象
    • observe():返回一个可订阅的 Observable 对象。hot observable,不管是否有订阅者都会发布事件。
    • toObservable():返回一个可订阅的 Observable 对象。cold observable,等待直到有订阅者才会发布事件。详见RxJava。
    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
        public Observable<R> observe() {
            // us a ReplaySubject to buffer the eagerly subscribed-to Observable
            ReplaySubject<R> subject = ReplaySubject.create();
            // eagerly kick off subscription
            final Subscription sourceSubscription = toObservable().subscribe(subject);
            // return the subject that can be subscribed to later while the execution has already started
            return subject.doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    sourceSubscription.unsubscribe();
                }
            });
        }
        public Observable<R> toObservable() {
            // 省略...
        }
    }
    
    public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
        public R execute() {
            try {
                return queue().get();
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
        }
        public Future<R> queue() {
            // 省略...
        }
    }
    
  2. 缓存功能是否启用。如果启用并且命中缓存,那么直接返回缓存的Observable 对象。

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
        public Observable<R> toObservable() {
    		// 省略...
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
    				// 省略...
                    final boolean requestCacheEnabled =isRequestCachingEnabled();
                    final String cacheKey = getCacheKey();
                    // 先从缓存拿
                    if (requestCacheEnabled) {
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
                    // 执行
                    Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                    Observable<R> afterCache;
                    // 放入缓存
                    if (requestCacheEnabled && cacheKey != null) {
                        // wrap it for caching
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        // 省略...
                    } else {
                        afterCache = hystrixObservable;
                    }
                    return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
                }
            });
        }
    }
    
  3. 检查断路器是否打开。如果打开则执行fallback逻辑,否则进入下一步。

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
        private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            // mark that we're starting execution on the ExecutionHook
            // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
            executionHook.onStart(_cmd);
            // 判断是否短路
            if (circuitBreaker.allowRequest()) {
    			// 省略...
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        return executeCommandAndObserve(_cmd)
                                .doOnError(markExceptionThrown)
                                .doOnTerminate(singleSemaphoreRelease)
                                .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                    }
                } else {
                    return handleSemaphoreRejectionViaFallback();
                }
            } else {
                // 拒绝请求
                return handleShortCircuitViaFallback();
            }
        }
    }
    
  4. 线程池、队列、信号量是否已满。如果已满则执行fallback逻辑,否则执行命令。注意hystrix会为每个依赖服务单独创建线程池,线程池之间相互独立,互不干扰。

    public class HystrixContextScheduler extends Scheduler {
        private class HystrixContextSchedulerWorker extends Worker {
            @Override
            public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
                if (threadPool != null) {
                    // 判断线程池是否已满
                    if (!threadPool.isQueueSpaceAvailable()) {
                        // 拒绝提交
                        throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                    }
                }
                return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
            }
        }
    }
    
  5. 请求服务。该处执行的方式取决于我们编写的方法,HystrixCommand.run()方法对应单个结果,HystrixObservableCommand.construct()对应多个结果。此处请求失败或者超时都会执行fallback逻辑,请求成功则返回结果。

    public class HystrixContextScheduler extends Scheduler {
        private static class ThreadPoolWorker extends Worker {
            @Override
            public Subscription schedule(final Action0 action) {
    			// 省略...
                ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
                // 提交command
                FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
                sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
                return sa;
            }
        }
    }
    
  6. Hystrix向断路器报告成功、失败、拒绝和超时等信息。断路器维护一组计数器来统计这些数据。断路器会根据这些数据来控制开闭。

    public class HystrixCommandMetrics extends HystrixMetrics {
        public static class HealthCounts {
            private final long totalCount;
            private final long errorCount;
            private final int errorPercentage;
    		// 省略...
            // 统计数据信息
            public HealthCounts plus(long[] eventTypeCounts) {
                long updatedTotalCount = totalCount;
                long updatedErrorCount = errorCount;
    
                long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
                long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
                long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
                long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
                long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
    
                updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
                updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
                return new HealthCounts(updatedTotalCount, updatedErrorCount);
            }
        }
    }
    
  7. 执行失败时进入fallback逻辑。fallback逻辑为我们手动定义的逻辑,确保最终能够稳定的返回数据(不依赖网络等其他有风险的渠道)。如果降级逻辑抛出异常,那么呢对应第2布的四种执行方式会有不同的处理逻辑,一般为抛出异常或者通知调用者终止(订阅onError())。

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
        private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    		// 省略...
            // 失败回退
            final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = getExceptionFromThrowable(t);
                    executionResult = executionResult.setExecutionException(e);
                    // 各种失败回退情况
                    if (e instanceof RejectedExecutionException) {
                        return handleThreadPoolRejectionViaFallback(e);
                    } else if (t instanceof HystrixTimeoutException) {
                        return handleTimeoutViaFallback();
                    } else if (t instanceof HystrixBadRequestException) {
                        return handleBadRequestByEmittingError(e);
                    } else {
                        if (e instanceof HystrixBadRequestException) {
                            eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                            return Observable.error(e);
                        }
                        return handleFailureViaFallback(e);
                    }
                }
            };
            // 省略...
            return execution.doOnNext(markEmits)
                    .doOnCompleted(markOnCompleted)
                    .onErrorResumeNext(handleFallback)
                    .doOnEach(setRequestContext);
        }
    }
    
  8. 根据第2步中定义的命令返回不同的对象。

3.4 断路器原理分析

下面是图中对应的断路器接口,我们会按流程图来依次分析。

/**
 * 断路器的逻辑最原始调用对象为HystrixCommand类,如果失败数量超过阈值就会阻挡请求。每经过一个时间窗口后会有一次重试,直到重试成功后关闭断路器。
 */
public interface HystrixCircuitBreaker {

    /**
     * 通过配置文件中的配置,和`isOpen()`判断是否允许请求。
     * 调用isOpen()判断是否打开,如果没有返回true。如果打开,判断是否已经过了休眠时间,如果是返回true,否则返回false。
     */
    public boolean allowRequest();

    /**
     * 根据统计数据判断断路器是否打开。
     * 拿到最新一个时间窗口的统计数据判断断路器是否打开。
     */
    public boolean isOpen();

    /**
     * 增加成功数据,默认时间窗口为10。如果断路器打开,则关闭断路器。
     * 判断断路器是否打开,如果打开则关闭断路器,然后重置计数器。如果关闭则增加一次成功调用。
     */
    /* package */void markSuccess();
}

原理章节开头我们讲到toObservable()方法会返回一个Observable对象。Observable对象包含了命令执行过程中的回调函数,我们来看下这个对象。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    public Observable<R> toObservable() {
        // 所有工作完成后这个方法会被回调
        final Action0 terminateCommandCleanup = new Action0() {
            @Override
            public void call() {
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                    // 注意这个方法会统计调用数据,我们后面再分析
                    handleCommandEnd(true); //user code did run
                }
            }
        };
        
        // 执行入口
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };
    }
}

原理分析章节第4步,我们讲到执行命令会判断断路器是否打开,其实是我们执行入口中调用的applyHystrixSemantics函数。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 判断是否短路
        if (circuitBreaker.allowRequest()) {
			// 省略...
            // 执行
            return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
        } else {
            // 拒绝请求
            return handleShortCircuitViaFallback();
        }
    }
}

HystrixCircuitBreaker具体的实现在HystrixCircuitBreakerImpl中。我们先看allowRequest()isOpen()函数。

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    
    // 断路器是否打开
    private AtomicBoolean circuitOpen = new AtomicBoolean(false);
    // 短路器打开一段时间后允许单个请求做一次测试,该字段记录上次断路器打开或者上次测试的时间
    private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
    
    @Override
    public boolean allowRequest() {
        // 配置文件中断路器是否强制打开
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        // 配置文件中断路器是否强制关闭
        if (properties.circuitBreakerForceClosed().get()) {
            isOpen();
            return true;
        }
        // 判断断路器是否打开,打开情况下根据时间判断是否可做测试
        return !isOpen() || allowSingleTest();
    }
    
    @Override
    public boolean isOpen() {
        // 打开直接返回
        if (circuitOpen.get()) {
            return true;
        }
        // 关闭的情况下获取统计数据
        HealthCounts health = metrics.getHealthCounts();
        // 一个时间窗口(默认10秒)内访问的次数是否小于阈值(默认20次)。请求次数少的短路器就不会打开。
        if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
            return false;
        }
        // 判断失败率,默认为50%
        if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
            return false;
        } else {
            // 失败率过高时打开断路器
            if (circuitOpen.compareAndSet(false, true)) {
                // 记录打开时间
                circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                return true;
            } else {
                // 这个地方考虑多线程的因素
                return true;
            }
        }
    }
    
    public boolean allowSingleTest() {
        // 上次断路器打开的时间或者是上次测试的时间
        long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
        // 1) 断路器打开
        // 2) 上次断路器打开的时间或者是上次测试的时间大于阈值(默认5秒)
        if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
            // 更新时间,允许做一个测试
            if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                return true;
            }
        }
        return false;
    }
}

接下来再看markSuccess()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
		// 成功执行会回调此方法
        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    // 标记执行成功
                    circuitBreaker.markSuccess();
                }
            }
        };
    }
}

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private AtomicBoolean circuitOpen = new AtomicBoolean(false);
    public void markSuccess() {
        if (circuitOpen.get()) {
            // 关闭断路器
            if (circuitOpen.compareAndSet(true, false)) {
                metrics.resetStream();
            }
        }
    }
}

到这为止我们已经分析了短路器的所有接口,那么断路器是如何做统计的?

我们来看一下第一步中的handleCommandEnd(true)方法,通过handleCommandEnd(true) -> markCommandDone() -> executionDone()我们最终到executionDone()方法。

public class HystrixThreadEventStream {
	public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        // 此处会发送一个通知
        writeOnlyCommandCompletionSubject.onNext(event);
    }
}

public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {
	// 该处会接收到通知
    private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new 			Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
        @Override
        public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
            // 统计执行情况
            return healthCounts.plus(bucketEventCounts);
        }
    };
}

我们找到了统计入口,接下来我们看一下HealthCounts类。

    public static class HealthCounts {
        private final long totalCount;
        private final long errorCount;
        private final int errorPercentage;

        public HealthCounts plus(long[] eventTypeCounts) {
            long updatedTotalCount = totalCount;
            long updatedErrorCount = errorCount;
			// eventTypeCounts数组,不同的位置代表不同的统计数值。[EMIT(false), SUCCESS(true), FAILURE(false), TIMEOUT(false)]
            long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
            long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
            long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
            long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
            long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
			// 计算总请求数量
            updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            // 计算总错误数量
            updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            return new HealthCounts(updatedTotalCount, updatedErrorCount);
        }
    }

上面一秒代表一格,每个格子中是这一秒里的统计数据。总共10个格子,代表10秒。我们会以这10秒里的数据作为断路器是否打开的依据。

滑动窗口

每过一秒,最后一个桶被丢弃(黑色方块),白色方块为当前这一秒新建的一个桶。

3.3依赖隔离

hystrix会为每一个依赖的服务创建一个独立的线程池,线程池之间互不影响。如果其中一个服务出现故障,那么只会对这一个服务产生影响,不会影响其他依赖的服务。

  • THREAD (线程隔离) : 使用该方式,HystrixCommand将在单独的线程上执行,并发请求受到线程池中的线程数量的限制。

  • SEMAPHORE (信号量隔离) :使用该方式, HystrixCommand 将在调用线程上执行,
    开销相对较小,并发请求受到信号量个数的限制。

Hystrix 中默认并且推荐使用线程隔离(THREAD),因为这种方式有一个除网络超时以外的额外保护层。

指标 使用线程池(ms) 未使用线程池(ms) 差距(ms)
中位数 2 2 2
90% 5 8 3
99% 28 37 9

使用线程池会有性能损失,大部分的情况下9ms的延迟可以忽略。

一般来说, 只有当调用负载非常高时(例如每个实例每秒调用数百次)或者延迟要求非常低才需要使用信号
量隔离, 因为在这种场景下使用THREAD 开销会比较高。信号量隔离一般仅适用于非
网络调用的隔离。

4 监控

  • Hystrix暴露/actuator/hystrix.stream 端点开放监控数据。

  • 集成hystrix-dashboard 模块,将数据图形化显示。

  • 集成turbine模块聚合多个微服务的监控数据。

  • 可通过mq中间件收集监控数据。

数据解释

声明

本博客所有内容仅供学习,不为商用,如有侵权,请联系博主谢谢。

参考文献

[5] 《Spring Cloud与Docker微服务架构实战(第2版)》 周立 著

0人推荐
随时随地看视频
慕课网APP