限流的实现

与”熔断”类似,”限流”也是一种降级手段,但限流的思路更简单、直观: 它直接拒绝部分请求。

在微服务架构下,若大量请求超过微服务的处理能力时,可能会将服务打跨,甚至产生雪崩效应、影响系统的整体稳定性。

孙子兵法有一计”李代桃僵”,说的是当局势发展到必然有所损失时,应当舍得局部弱小兵力,以保全大局优势。

我们可以将这种战略应用到微服务中,在流量超出承受阈值时,直接进行”限流”、拒绝部分请求,从而保证系统的整体稳定性。

有的业务场景中,系统压力并不大,但也需要限制用户每秒的操作次数,例如:验证码的发送接口。

进行限流的方案有很多种,本节这里讨论两个层面上的限流:负载均衡器和微服务。

负载均衡层的限流

Nginx是一款高性能的反向代理服务器,是用户请求进入系统的第一道关卡。

在Nginx上配置限流策略,不仅可以保护系统稳定性,也能防范一部分恶意攻击。

我们来看一组最常见的策略。

(1) 按照IP地址, 限制每秒请求数量:

  1. limit_req_zone $binary_remote_addr zone=limit1:1m rate=20r/s;

如上所述,配置的是一个限流区域:

  • 区域名字是limit1,分配1MB的内存,大致可以追踪1.5万个IP地址
  • 每个IP地址,每秒钟的访问上限是20次。

(2) 支持突发缓存队列

  1. limit_req zone=limit1 burst=10 nodelay;

如上所述,细化了limit1这个区域上的具体策略:

  • burst建立了一个长度为10的缓冲区,若突发流量导致限流会先放到缓冲区中
  • nodelay当缓冲区已满了,丢弃请求,返回503

上面提到的缓存策略可以应用于全局,也可以应用于不同的url路径下。

此外,Nginx还提供了多种高级的限流配置手段,可以参考这篇博客Nginx Rate Limiting

微服务层的基础限流

由于Nginx无法解析业务逻辑,只能在IP层面进行”较为粗犷的限流”。

如果想结合业务逻辑或更复杂的策略,可以在微服务层面进行限流。

Guava是谷歌开源的Java库,其中提供了基于令牌桶算法的RateLimiter。我们将以此为基础,实现微服务层面的限流。

首先,来定义一个注解类

  1. package com.coder4.lmsia.ratelimit;
  2. import java.lang.annotation.Documented;
  3. import java.lang.annotation.ElementType;
  4. import java.lang.annotation.Retention;
  5. import java.lang.annotation.RetentionPolicy;
  6. import java.lang.annotation.Target;
  7. /**
  8. * 对方法限流,超限会抛出HTTP 429异常
  9. * @author coder4
  10. */
  11. @Retention(RetentionPolicy.RUNTIME)
  12. @Target({ElementType.METHOD})
  13. @Documented
  14. public @interface MethodRateLimit {
  15. // 每秒允许多少次请求
  16. double permitsPerSecond();
  17. }

如上所述,注解定义了一个参数permitsPerSecond,即每秒允许几次请求,支持非整数配置。

为了让注解生效,我们需要配合AOP使用:

  1. package com.coder4.lmsia.ratelimit.aspect;
  2. import com.coder4.lmsia.commons.http.exception.Http429TooManyRequestsException;
  3. import com.coder4.lmsia.ratelimit.MethodRateLimit;
  4. import com.coder4.lmsia.ratelimit.RateLimiterProvider;
  5. import com.google.common.util.concurrent.RateLimiter;
  6. import org.aspectj.lang.ProceedingJoinPoint;
  7. import org.aspectj.lang.annotation.Around;
  8. import org.aspectj.lang.annotation.Aspect;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.stereotype.Component;
  12. import java.util.Optional;
  13. /**
  14. * @author coder4
  15. */
  16. @Component
  17. @Aspect
  18. public class MethodRateLimitAspect {
  19. protected Logger LOG = LoggerFactory.getLogger(getClass());
  20. @Around(value = "(execution(* com.coder4..*(..))) && @annotation(methodLimit)", argNames = "joinPoint, methodLimit")
  21. public Object methodAround(ProceedingJoinPoint joinPoint, MethodRateLimit methodLimit)
  22. throws Throwable {
  23. // Get RateLimiter
  24. Optional<RateLimiter> rateLimiterOp = RateLimiterProvider.getInstance()
  25. .getRateLimiter(
  26. joinPoint.getSignature().toLongString(), methodLimit.permitsPerSecond());
  27. if (!rateLimiterOp.isPresent() || rateLimiterOp.get().tryAcquire()) {
  28. // allow
  29. return joinPoint.proceed();
  30. } else {
  31. // deny
  32. throw new Http429TooManyRequestsException();
  33. }
  34. }
  35. }

如上所述,我们对所有添加了MethodRateLimit注解的方法进行AOP注入:

  • 根据方法名获取一个RateLimiter,RateLimiterProvider稍后会介绍
  • 若可以获得令牌,则执行方法,否则抛出HTTP429(Too Mangy Requests)异常

再来看一下RateLimiterProvider:

  1. package com.coder4.lmsia.ratelimit;
  2. import com.google.common.cache.Cache;
  3. import com.google.common.cache.CacheBuilder;
  4. import com.google.common.util.concurrent.RateLimiter;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.Optional;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * @author coder4
  12. */
  13. public class RateLimiterProvider {
  14. private Logger LOG = LoggerFactory.getLogger(getClass());
  15. private static final RateLimiterProvider instance = new RateLimiterProvider();
  16. private static final int CAPACITY = 2000;
  17. private static final int TTL_SECS = 60;
  18. private Cache<String, RateLimiter> rateLimiterCache;
  19. private RateLimiterProvider() {
  20. rateLimiterCache = CacheBuilder.newBuilder()
  21. .maximumSize(CAPACITY)
  22. .expireAfterAccess(TTL_SECS, TimeUnit.SECONDS)
  23. .build();
  24. }
  25. public static RateLimiterProvider getInstance() {
  26. return instance;
  27. }
  28. public Optional<RateLimiter> getRateLimiter(String key, double permitsPerSecond) {
  29. // 未测试线程安全,但影响不大
  30. try {
  31. return Optional.ofNullable(
  32. rateLimiterCache.get(key, () -> RateLimiter.create(permitsPerSecond)));
  33. } catch (ExecutionException e) {
  34. LOG.error("getRateLimiter exception", e);
  35. return Optional.empty();
  36. }
  37. }
  38. }

如上所述,Provider的内部使用Guava的Cache机制:

  • 根据字符串key从Cache中尝试获取RateLimiter,获取不到则新建一个
  • Cache最高存储2000个、过期时间为60秒,以防不断膨胀导致过高的内存开销。

有了上述注解,在微服务中进行限流将异常简单:

  1. @MethodRateLimit(permitsPerSecond = 2.0)
  2. @GetMapping(value = "/")
  3. public String hello() {
  4. return new BaseHystrixCommend<String>("abc", this::helloReal, this::helloFallback).execute();
  5. }

如上,只需要一行代码即可搞定。

微服务层的高级限流

在一些复杂的业务场景下,我们可能希望根据不同用户或其他字段进行限流。

我们提供了另一款MethodParamRateLimit来满足这类需求:

  1. package com.coder4.lmsia.ratelimit;
  2. import java.lang.annotation.Documented;
  3. import java.lang.annotation.ElementType;
  4. import java.lang.annotation.Retention;
  5. import java.lang.annotation.RetentionPolicy;
  6. import java.lang.annotation.Target;
  7. /**
  8. * 根据方法+参数限流,超限会抛出HTTP 429异常
  9. *
  10. * @author coder4
  11. */
  12. @Retention(RetentionPolicy.RUNTIME)
  13. @Target({ElementType.METHOD})
  14. @Documented
  15. public @interface MethodParamRateLimit {
  16. // 每秒允许多少词请求
  17. double permitsPerSecond();
  18. // 参数下标(0开始)
  19. int paramIndex();
  20. }

新增的参数paramIndex稍后会作出解释,我们看一下AOP的Aspect:

  1. package com.coder4.lmsia.ratelimit.aspect;
  2. import com.coder4.lmsia.commons.http.exception.Http429TooManyRequestsException;
  3. import com.coder4.lmsia.ratelimit.MethodParamRateLimit;
  4. import com.coder4.lmsia.ratelimit.RateLimiterProvider;
  5. import com.google.common.util.concurrent.RateLimiter;
  6. import org.aspectj.lang.ProceedingJoinPoint;
  7. import org.aspectj.lang.annotation.Around;
  8. import org.aspectj.lang.annotation.Aspect;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.stereotype.Component;
  12. import java.util.Optional;
  13. /**
  14. * @author coder4
  15. */
  16. @Component
  17. @Aspect
  18. public class MethodParamRateLimitAspect {
  19. protected Logger LOG = LoggerFactory.getLogger(getClass());
  20. @Around(value = "(execution(* com.coder4..*(..))) && @annotation(methodParamLimit)", argNames = "joinPoint, methodParamLimit")
  21. public Object methodAround(ProceedingJoinPoint joinPoint, MethodParamRateLimit methodParamLimit)
  22. throws Throwable {
  23. // Get RateLimiter
  24. Optional<RateLimiter> rateLimiterOp = RateLimiterProvider.getInstance()
  25. .getRateLimiter(getRateLimiterKey(joinPoint, methodParamLimit), methodParamLimit.permitsPerSecond());
  26. if (!rateLimiterOp.isPresent() || rateLimiterOp.get().tryAcquire()) {
  27. // allow
  28. return joinPoint.proceed();
  29. } else {
  30. // deny
  31. throw new Http429TooManyRequestsException();
  32. }
  33. }
  34. private String getRateLimiterKey(ProceedingJoinPoint joinPoint, MethodParamRateLimit methodParamLimit) {
  35. // Get Param Value
  36. String paramValue = getParamLimit(joinPoint, methodParamLimit.paramIndex());
  37. return String.format("%s-%s", joinPoint.getSignature().toString(), paramValue);
  38. }
  39. private String getParamLimit(ProceedingJoinPoint joinPoint, int paramIndex) {
  40. Object[] args = joinPoint.getArgs();
  41. if (paramIndex < 0 || paramIndex >= args.length) {
  42. LOG.warn("paramIndex exceed length, use default");
  43. return "default_param";
  44. }
  45. return args[paramIndex].toString();
  46. }
  47. }

如上所述,进行切面处理时:

  • 从用方法和第paramIndex参数的值拼接为key来获取RateLimit。这有些抽象,我们稍后会举个例子。
  • 其他处理策略同MethodLimitAspect

看一下用法:

  1. @MethodParamRateLimit(permitsPerSecond = 1, paramIndex = 0)
  2. @GetMapping(value = "/ids/{id}")
  3. public String helloWithId(@PathVariable int id) {
  4. return helloFallback(id);
  5. }

如上所述,MethodParamRateLimit应用在此处,实现了根据不同的id进行限流,每个id每秒只能访问1次,不同id之间不会相互影响。

阅读与思考

  1. Nginx进行限流时,容易发生误伤,例如来自内网或者监控系统的IP。请自行查找资料,实现白名单配置,避免这种情况。
  2. 除了负载均衡、微服务层面的限流,你还能想到其他层面的限流么?