熔断与Hystrix

在本节,我们将讨论”熔断”方案的思路及其在微服务架构下的落地。

“熔断”这个词来源于电路保护。如果一条线路上的的电压过高,就会将保险丝烧断,从而切断该条线路上的电流,防止其影响其他线路。

我们将上述场景对应到微服务上,当调用某个微服务频繁发生故障(相当于电压过高),会触发熔断(相当于保险丝烧断),微服务将直接返回一个降级的结果,防止影响其他业务。

故障的类型可能有很多种,最常见的是抛出了异常或者调用超时。

你可能会有疑问:返回一个降级的结果,不就是错误了么?

是的,降级结果是错误的。但你可以降低错误的影响范围,例如,返回上一次成功执行的结果。

Hystrix的基本用法

Hystrix是由Netflex开源的一款开源组件,提供了基础的熔断功能。

Hystrix将降级策略封装在Commend中,不同的Commend根据group分割开。Commend内置了run和fallback两个方法,内置方法。

正常情况下,会先执行run方法(正常执行逻辑),若发生了故障,再执行fallback方法并返回其结果。若发生多次故障会在一定时间范围内触发短路,即跳过run方法,直接执行fallback方法。

关于Hystrix的更详细的原理,可以参考Hystrix工作原理(官方文档翻译),这里不再赘述。

有几个涉及Hystrix的关键参数,这里做一些简要介绍:首先是几个key

  • groupKey: 区分不同降级环境,相同的groupKey下处在相同的降级环境。
  • commendKey: 区分不同命令。
  • threadPoolKey: 相同的key将运行在相同的线程池下。然后是几个通用配置
  • executionTimeoutInMilliseconds: 执行超时时间,单位毫秒
  • circuitBreakerEnabled: 发生多次故障后,是否会触发短路。默认是false,即总是先执行run方法,不会主动跳过。最后是线程池
  • coreSize: 线程池常驻线程数量
  • maximumSize: 线程池最大线程数量
  • allowMaximumSizeToDivergeFromCoreSize: 仅当设置为true时,上述maxiumSize才生效。
  • maxQueueSize: 最多允许多少个任务堆积
  • queueSizeRejectionThreshold: 多少个任务堆积会处罚降级。堆积的任务太多,说明处理速度跟不上需求了,也会被认为是故障并处罚降级。

Hystrix的基本封装

上述概念固然重要,但每次做熔断时如果都要仔细考虑,未免太过繁琐,为此,我们做了一个抽象的基类,实现了上述的默认的配置:

  1. package com.coder4.lmsia.hystrix;
  2. import com.coder4.sbmvt.trace.TraceIdContext;
  3. import com.coder4.sbmvt.trace.TraceIdUtils;
  4. import com.netflix.hystrix.HystrixCommand;
  5. import com.netflix.hystrix.HystrixCommandGroupKey;
  6. import com.netflix.hystrix.HystrixCommandKey;
  7. import com.netflix.hystrix.HystrixCommandProperties;
  8. import com.netflix.hystrix.HystrixThreadPoolKey;
  9. import com.netflix.hystrix.HystrixThreadPoolProperties;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.util.StringUtils;
  13. import java.util.function.Supplier;
  14. /**
  15. * @author coder4
  16. */
  17. public class BaseHystrixCommend<R> extends HystrixCommand<R> {
  18. private Logger LOG = LoggerFactory.getLogger(getClass());
  19. private final Supplier<R> realSupplier;
  20. private final Supplier<R> fallbackSupplier;
  21. public BaseHystrixCommend(String key, Supplier<R> realSupplier, Supplier<R> fallbackSupplier) {
  22. this(key, new BaseHytrixConfig(), realSupplier, fallbackSupplier);
  23. }
  24. public BaseHystrixCommend(String key, BaseHytrixConfig config, Supplier<R> realSupplier, Supplier<R> fallbackSupplier) {
  25. super(Setter
  26. // 3个key合一
  27. .withGroupKey(HystrixCommandGroupKey.Factory.asKey(key))
  28. .andCommandKey(HystrixCommandKey.Factory.asKey(key))
  29. .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(key))
  30. .andCommandPropertiesDefaults(
  31. HystrixCommandProperties.Setter()
  32. .withExecutionTimeoutInMilliseconds(config.getExecutionTimeoutInMilliseconds())
  33. .withCircuitBreakerEnabled(config.isCircuitBreakerEnabled())
  34. .withFallbackIsolationSemaphoreMaxConcurrentRequests(config.getFallbackIsolationSemaphoreMaxConcurrentRequests())
  35. )
  36. .andThreadPoolPropertiesDefaults(
  37. HystrixThreadPoolProperties.defaultSetter()
  38. .withAllowMaximumSizeToDivergeFromCoreSize(config.isAllowMaximumSizeToDivergeFromCoreSize())
  39. .withCoreSize(config.getCorePoolSize())
  40. .withMaximumSize(config.getMaxPoolSize())
  41. .withMaxQueueSize(config.getMaxQueueSize())
  42. .withQueueSizeRejectionThreshold(config.getMaxQueueSize())
  43. ));
  44. this.realSupplier = realSupplier;
  45. this.fallbackSupplier = fallbackSupplier;
  46. }
  47. @Override
  48. protected R run() throws Exception {
  49. if (StringUtils.isEmpty(TraceIdContext.getTraceId())) {
  50. TraceIdContext.setTraceId(TraceIdUtils.getTraceId());
  51. }
  52. R r = this.realSupplier.get();
  53. TraceIdContext.removeTraceId();
  54. return r;
  55. }
  56. @Override
  57. protected R getFallback() {
  58. try {
  59. LOG.error("enter fallback because ", getExecutionException());
  60. return this.fallbackSupplier.get();
  61. } finally {
  62. TraceIdContext.removeTraceId();
  63. }
  64. }
  65. }

如上所示,在构造函数中,对上述参数进行了配置,此外还结合了分布式追踪中介绍的TraceIdContext,注入并销毁traceId。

Hystrix的默认值在另一个文件中进行了配置:

  1. package com.coder4.lmsia.hystrix;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Builder;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. /**
  7. * @author coder4
  8. */
  9. @Data
  10. @Builder
  11. @NoArgsConstructor
  12. @AllArgsConstructor
  13. public class BaseHytrixConfig {
  14. private static int DEFAULT_EXECUTION_TIMEOUT_IN_MILLISECONDS = 1000;
  15. private static int DEFAULT_FALL_BACK_ISOLATION_SEMAPHORE_MAX_CON_CURRENT_REQUESTS = 512;
  16. private static int DEFAULT_CORE_POOL_SIZE = 64;
  17. private static int DEFAULT_MAX_POOL_SIZE = 512;
  18. private static int DEFAULT_MAX_QUEUE_SIZE = 32;
  19. // 执行时限(毫秒)
  20. private int executionTimeoutInMilliseconds
  21. = DEFAULT_EXECUTION_TIMEOUT_IN_MILLISECONDS;
  22. // 启动断路器
  23. private boolean circuitBreakerEnabled = true;
  24. // (信号量隔离时)降级调用最大并发数
  25. private int fallbackIsolationSemaphoreMaxConcurrentRequests
  26. = DEFAULT_FALL_BACK_ISOLATION_SEMAPHORE_MAX_CON_CURRENT_REQUESTS;
  27. // 允许线程数峰值超过coreSize
  28. private boolean allowMaximumSizeToDivergeFromCoreSize = true;
  29. // 核心线程数
  30. private int corePoolSize = DEFAULT_CORE_POOL_SIZE;
  31. // 最大线程数量
  32. private int maxPoolSize = DEFAULT_MAX_POOL_SIZE;
  33. // 最大队列等待数量
  34. private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
  35. }

有了上述默认设置后,我们将上述两个类封装成独立的项目lmsia-hystrix中,方便其他微服务的调用。

Hystrix在微服务中的用法

最后,我们来看一下如何在微服务中使用hystrix。

首先在依赖中引入lmsia-hystrix:

  1. compile 'com.github.liheyuan:lmsia-hystrix:0.0.4'

然后定义一个Commend,并execute

  1. @GetMapping(value = "/")
  2. public String hello() {
  3. return new BaseHystrixCommend<String>("abc", this::helloReal, this::helloFallback).execute();
  4. }
  5. private String helloReal() {
  6. LOG.info("hello real");
  7. if (true) {
  8. throw new RuntimeException("haha");
  9. }
  10. return abcLogic.getHello();
  11. }
  12. private String helloFallback() {
  13. LOG.info("hello fb");
  14. return "Hello, fallback";
  15. }

如上所示,我们定义了两个函数helloReal是正常逻辑,但这个方法会抛异常并触发降级。helloFallback是降级逻辑。

我们构造的Commend会根据情况执行上述两个方法。

在未启用Commend前,rest请求会直接500错误,因为抛出了异常。

启用Commend后,返回总是200。前几次,会发现日志先打印”hello real”,再打印”hello fb”,这说明只是出发降级未触发短路。当多执行几次,就会发现不再输出”hello real”,这时就是真正触发了短路。

至此,我们已经借助Hystrix实现了微服务的降级功能。

拓展与思考

  1. 返回一个固定的降级结果,可能会影响产品体验。如果想返回上一次执行成功的结果,该如何进行修改呢,Hystrix中有没有内置这个功能呢?
  2. Hystrix中默认触发短路的阈值是多少,默认短路时间又是多少呢, 如果想进行修改,需要如何进行配置呢?