Limiter (concurrent control)

Overview

Restricted fluids are a service governance capability that is used to limit the combined use of services to protect their stability.Only the use of restricted fluids for rest,grpc services is described in this subdocument, without a limited flow algorithm.

Limit flow is typically single-node flows, cluster limit flow (average amount of limit flow value for cluster nodes, its essence or single nodal flow), distributive stream.

The current presentation is a single-node flow that is more appropriate than a single-point flow.

Preparation

  1. Create a go mod project
  1. $ mkdir -p ~/workspace/demo && cd ~/workspace/demo
  2. $ go mod init demo

rest service

Limit Configuration

  1. RestConf struct {
  2. ...
  3. MaxConns int `json:",default=10000"` // 最大并发连接数,默认值为 10000 qps
  4. ...
  5. }

Sample

We use a demo to describe the use of the lower limit streamer.

1 Create new directory demo project rest-limit-demo

  1. $ cd ~/workspace/demo
  2. $ mkdir rest-limit-demo && cd rest-limit-demo

2 Create a limit.api file, copy the following to the file

limit.api

  1. syntax = "v1"
  2. service limit {
  3. @handler ping
  4. get /ping
  5. }
  6. ```
  7. 3 Generate rest code
  8. ```shell
  9. $ cd ~/workspace/demo/rest-limit-demo
  10. $ goctl api go -api limit.api -dir .

4 View directory structure

  1. $ tree
  2. .
  3. ├── etc
  4. └── limit.yaml
  5. ├── internal
  6. ├── config
  7. └── config.go
  8. ├── handler
  9. ├── pinghandler.go
  10. └── routes.go
  11. ├── logic
  12. └── pinglogic.go
  13. ├── svc
  14. └── servicecontext.go
  15. └── types
  16. └── types.go
  17. ├── limit.api
  18. └── limit.go

We modify the configuration, limit qps to 100, and then add a logic of blocking logic.

  1. Edit configuation

    Change to <code> maxConns in ~/workspace/demo/etc/limit.yaml to 100

  2. Add Logic Code

Add a blocking logic to~/workspace/demo/rest-limit-demo/internal/logic/pinglogic.go 中的 Ping

Final code content below

  • limit.yaml
  • pinglogic.go

limit.yaml

  1. Name: limit
  2. Host: 0.0.0.0
  3. Port: 8888
  4. MaxConns: 100
  1. package logic
  2. import (
  3. "context"
  4. "time"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "demo/rest-limit-demo/internal/svc"
  7. )
  8. type PingLogic struct {
  9. logx.Logger
  10. ctx context.Context
  11. svcCtx *svc.ServiceContext
  12. }
  13. func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic {
  14. return &PingLogic{
  15. Logger: logx.WithContext(ctx),
  16. ctx: ctx,
  17. svcCtx: svcCtx,
  18. }
  19. }
  20. func (l *PingLogic) Ping() error {
  21. time.Sleep(50 * time.Millisecond)
  22. return nil
  23. }

Let’s run this easiest rest service first. We use the hey tool to simply pressure interface.

Start service

  1. $ cd ~/workspace/demo/rest-limit-demo
  2. $ go run limit.go

Separate terminal pressure

  1. # 我们用 hey 工具来进行压测,压测 90 个并发,执行 1 秒
  2. $ hey -z 1s -c 90 -q 1 'http://localhost:8888/ping'
  3. Summary:
  4. Total: 1.1084 secs
  5. Slowest: 0.1066 secs
  6. Fastest: 0.0607 secs
  7. Average: 0.0890 secs
  8. Requests/sec: 81.1980
  9. Response time histogram:
  10. 0.061 [1] |■
  11. 0.065 [2] |■■■
  12. 0.070 [8] |■■■■■■■■■■■
  13. 0.074 [13] |■■■■■■■■■■■■■■■■■
  14. 0.079 [5] |■■■■■■■
  15. 0.084 [0] |
  16. 0.088 [0] |
  17. 0.093 [2] |■■■
  18. 0.097 [23] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  19. 0.102 [30] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  20. 0.107 [6] |■■■■■■■■
  21. Latency distribution:
  22. 10% in 0.0682 secs
  23. 25% in 0.0742 secs
  24. 50% in 0.0961 secs
  25. 75% in 0.0983 secs
  26. 90% in 0.0996 secs
  27. 95% in 0.1039 secs
  28. 0% in 0.0000 secs
  29. Details (average, fastest, slowest):
  30. DNS+dialup: 0.0054 secs, 0.0607 secs, 0.1066 secs
  31. DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
  32. req write: 0.0002 secs, 0.0000 secs, 0.0011 secs
  33. resp wait: 0.0832 secs, 0.0576 secs, 0.0942 secs
  34. resp read: 0.0001 secs, 0.0000 secs, 0.0012 secs
  35. Status code distribution:
  36. [200] 90 responses

In terms of pressure measurements, 90 requests have been successful, and we have stepped up and count to see what will happen.

  1. # 我们用 hey 工具来进行压测,压测 110 个并发,执行 1 秒
  2. $ hey -z 1s -c 110 -q 1 'http://127.0.0.1:8888/ping'
  3. Summary:
  4. Total: 1.0833 secs
  5. Slowest: 0.0756 secs
  6. Fastest: 0.0107 secs
  7. Average: 0.0644 secs
  8. Requests/sec: 101.5403
  9. Response time histogram:
  10. 0.011 [1] |■
  11. 0.017 [9] |■■■■■■■
  12. 0.024 [0] |
  13. 0.030 [0] |
  14. 0.037 [0] |
  15. 0.043 [0] |
  16. 0.050 [0] |
  17. 0.056 [0] |
  18. 0.063 [2] |■■
  19. 0.069 [45] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  20. 0.076 [53] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  21. Latency distribution:
  22. 10% in 0.0612 secs
  23. 25% in 0.0665 secs
  24. 50% in 0.0690 secs
  25. 75% in 0.0726 secs
  26. 90% in 0.0737 secs
  27. 95% in 0.0740 secs
  28. 99% in 0.0756 secs
  29. Details (average, fastest, slowest):
  30. DNS+dialup: 0.0040 secs, 0.0107 secs, 0.0756 secs
  31. DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
  32. req write: 0.0001 secs, 0.0000 secs, 0.0025 secs
  33. resp wait: 0.0602 secs, 0.0064 secs, 0.0741 secs
  34. resp read: 0.0000 secs, 0.0000 secs, 0.0001 secs
  35. Status code distribution:
  36. [200] 100 responses
  37. [503] 10 responses

Based on pressure measurements, our services can only support 100 combinations, and more than 100 concurrent requests will be restricted to return to the 503 status code. Flow limitation-related errors also appear in the service’s logs at:

  1. {"@timestamp":"2023-02-09T18:41:55.500+08:00","caller":"internal/log.go:62","content":"(/ping - 127.0.0.1:64163) concurrent connections over 100, rejected with code 503","level":"error","span":"08fa61d49b694e63","trace":"622b2287f32ff2d45f4dfce3eec8e62c"}

gRPC service

grpc is an Intranet service that does not provide services externally but only for other services in-house, so we do not need to limit the flow of services by default.

If you really want to limit the flow, this can be done by grpc intermediaries, for example below.

We use one of the simplest grpc server to show.

Create new directory demo project grpc-limit-demo

  1. $ cd ~/workspace/demo
  2. $ mkdir grpc-limit-demo && cd grpc-limit-demo

Create a limit.proto file, copy the following to the file

limit.proto

  1. syntax = "proto3";
  2. package proto;
  3. option go_package = "./proto";
  4. message PingReq{}
  5. message PingResp{}
  6. service limit{
  7. rpc Ping(PingReq) returns (PingResp);
  8. }

Generate grpc code

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. $ goctl rpc protoc limit.proto --go_out=. --go-grpc_out=. --zrpc_out=.

View Directory

  1. $ tree
  2. .
  3. ├── etc
  4. └── limit.yaml
  5. ├── internal
  6. ├── config
  7. └── config.go
  8. ├── logic
  9. └── pinglogic.go
  10. ├── server
  11. └── limitserver.go
  12. └── svc
  13. └── servicecontext.go
  14. ├── limit
  15. └── limit.go
  16. ├── limit.go
  17. ├── limit.proto
  18. └── proto
  19. ├── limit.pb.go
  20. └── limit_grpc.pb.go
  21. 8 directories, 10 files

We have implemented ~/workspace/demo/grpc-limit-demo/internal/logic/limitlogic.go 中实现 Ping below:

imitlogic.go

  1. package logic
  2. import (
  3. "context"
  4. "time"
  5. "demo/grpc-limit-demo/internal/svc"
  6. "demo/grpc-limit-demo/proto"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. )
  9. type PingLogic struct {
  10. ctx context.Context
  11. svcCtx *svc.ServiceContext
  12. logx.Logger
  13. }
  14. func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic {
  15. return &PingLogic{
  16. ctx: ctx,
  17. svcCtx: svcCtx,
  18. Logger: logx.WithContext(ctx),
  19. }
  20. }
  21. func (l *PingLogic) Ping(in *proto.PingReq) (*proto.PingResp, error) {
  22. time.Sleep(50*time.Millisecond)
  23. return &proto.PingResp{}, nil
  24. }

Add middleware in ~/workspace/demo/grpc-limit-demo/limit.go

limit.go

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net/http"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "github.com/zeromicro/go-zero/core/syncx"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "demo/grpc-limit-demo/internal/config"
  12. "demo/grpc-limit-demo/internal/server"
  13. "demo/grpc-limit-demo/internal/svc"
  14. "demo/grpc-limit-demo/proto"
  15. "github.com/zeromicro/go-zero/core/conf"
  16. "github.com/zeromicro/go-zero/core/service"
  17. "github.com/zeromicro/go-zero/zrpc"
  18. "google.golang.org/grpc"
  19. "google.golang.org/grpc/reflection"
  20. )
  21. var configFile = flag.String("f", "etc/limit.yaml", "the config file")
  22. func main() {
  23. flag.Parse()
  24. var c config.Config
  25. conf.MustLoad(*configFile, &c)
  26. ctx := svc.NewServiceContext(c)
  27. s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
  28. proto.RegisterLimitServer(grpcServer, server.NewLimitServer(ctx))
  29. if c.Mode == service.DevMode || c.Mode == service.TestMode {
  30. reflection.Register(grpcServer)
  31. }
  32. })
  33. var n = 100
  34. l := syncx.NewLimit(n)
  35. s.AddUnaryInterceptors(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  36. if l.TryBorrow() {
  37. defer func() {
  38. if err := l.Return(); err != nil {
  39. logx.Error(err)
  40. }
  41. }()
  42. return handler(ctx, req)
  43. } else {
  44. logx.Errorf("concurrent connections over %d, rejected with code %d",
  45. n, http.StatusServiceUnavailable)
  46. return nil, status.Error(codes.Unavailable, "concurrent connections over limit")
  47. }
  48. })
  49. defer s.Stop()
  50. fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
  51. s.Start()
  52. }
  53. }

We will generate profiles ~/workspace/demo/grpc-limit-demo/etc/limit.yaml to delete the etcd configuration in the file and start grpc server using direct connections.

limit.yaml

  1. Name: limit.rpc
  2. ListenOn: 0.0.0.0:8080

Start service

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. $ go run limit.go

Now that the grpc server is available, we use ghz to pressure it.

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. # 压测 90 qps,共请求 110 次
  3. $ ghz --insecure --proto=limit.proto --call=proto.limit.Ping -d '{}' -c 90 -n 110 127.0.0.1:8080
  4. Summary:
  5. Count: 110
  6. Total: 108.94 ms
  7. Slowest: 60.51 ms
  8. Fastest: 50.24 ms
  9. Average: 55.73 ms
  10. Requests/sec: 1009.75
  11. Response time histogram:
  12. 50.240 [1] |∎∎
  13. 51.266 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  14. 52.293 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  15. 53.320 [3] |∎∎∎∎∎∎
  16. 54.347 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  17. 55.374 [5] |∎∎∎∎∎∎∎∎∎∎
  18. 56.401 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  19. 57.427 [20] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  20. 58.454 [19] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  21. 59.481 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  22. 60.508 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  23. Latency distribution:
  24. 10 % in 50.88 ms
  25. 25 % in 53.40 ms
  26. 50 % in 56.81 ms
  27. 75 % in 57.96 ms
  28. 90 % in 58.93 ms
  29. 95 % in 59.52 ms
  30. 99 % in 60.26 ms
  31. Status code distribution:
  32. [OK] 110 responses

All requests can be seen to be successful, and we come back to 110 qps.

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. # 压测 110 qps,共请求 110 次
  3. $ ghz --insecure --proto=limit.proto --call=proto.limit.Ping -d '{}' -c 110 -n 110 127.0.0.1:8080
  4. Summary:
  5. Count: 110
  6. Total: 68.35 ms
  7. Slowest: 67.53 ms
  8. Fastest: 58.76 ms
  9. Average: 58.03 ms
  10. Requests/sec: 1609.33
  11. Response time histogram:
  12. 58.763 [1] |∎∎
  13. 59.640 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  14. 60.517 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  15. 61.394 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  16. 62.272 [10] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  17. 63.149 [15] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  18. 64.026 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  19. 64.903 [19] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  20. 65.780 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  21. 66.657 [3] |∎∎∎∎∎∎
  22. 67.534 [3] |∎∎∎∎∎∎
  23. Latency distribution:
  24. 10 % in 59.81 ms
  25. 25 % in 60.96 ms
  26. 50 % in 62.96 ms
  27. 75 % in 64.52 ms
  28. 90 % in 65.29 ms
  29. 95 % in 66.09 ms
  30. 99 % in 67.03 ms
  31. Status code distribution:
  32. [Unavailable] 10 responses
  33. [OK] 100 responses
  34. Error distribution:
  35. [10] rpc error: code = Unavailable desc = concurrent connections over limit

It can be seen, when the concurrent output exceeds 100, returns rpc error: code = Unavailable desc = concurrent connections over limit.

References

  1. Lightweight Pressure Tool hey
  2. grpc Pressure ghz