在本章节中,我们将之前介绍 HTTP Client&Server 的示例修改为 GRPC 微服务,并演示如何使用 GoFrame 框架开发一个简单的 GRPC 服务端和客户端,并且为 GRPC 微服务增加链路跟踪特性。

本章节的示例代码位于: https://github.com/gogf/gf/tree/master/example/trace/grpc_with_db

目录结构

链路跟踪-GRPC示例 - 图1

Protobuf

  1. syntax = "proto3";
  2. package user;
  3. option go_package = "protobuf/user";
  4. // User service for tracing demo.
  5. service User {
  6. rpc Insert(InsertReq) returns (InsertRes) {}
  7. rpc Query(QueryReq) returns (QueryRes) {}
  8. rpc Delete(DeleteReq) returns (DeleteRes) {}
  9. }
  10. message InsertReq {
  11. string Name = 1; // v: required#Please input user name.
  12. }
  13. message InsertRes {
  14. int32 Id = 1;
  15. }
  16. message QueryReq {
  17. int32 Id = 1; // v: min:1#User id is required for querying.
  18. }
  19. message QueryRes {
  20. int32 Id = 1;
  21. string Name = 2;
  22. }
  23. message DeleteReq {
  24. int32 Id = 1; // v:min:1#User id is required for deleting.
  25. }
  26. message DeleteRes {}

使用 gf gen pb 命令编译该 proto 文件,将会生成对应的 grpc 接口文件和数据结构文件。

GRPC Server

  1. package main
  2. import (
  3. _ "github.com/gogf/gf/contrib/drivers/mysql/v2"
  4. _ "github.com/gogf/gf/contrib/nosql/redis/v2"
  5. "github.com/gogf/gf/contrib/registry/etcd/v2"
  6. "github.com/gogf/gf/example/trace/grpc_with_db/protobuf/user"
  7. "context"
  8. "fmt"
  9. "time"
  10. "github.com/gogf/gf/contrib/rpc/grpcx/v2"
  11. "github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
  12. "github.com/gogf/gf/v2/database/gdb"
  13. "github.com/gogf/gf/v2/frame/g"
  14. "github.com/gogf/gf/v2/os/gcache"
  15. "github.com/gogf/gf/v2/os/gctx"
  16. )
  17. type Controller struct {
  18. user.UnimplementedUserServer
  19. }
  20. const (
  21. serviceName = "otlp-grpc-server"
  22. endpoint = "tracing-analysis-dc-bj.aliyuncs.com:8090"
  23. traceToken = "******_******"
  24. )
  25. func main() {
  26. grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))
  27. var ctx = gctx.New()
  28. shutdown, err := otlpgrpc.Init(serviceName, endpoint, traceToken)
  29. if err != nil {
  30. g.Log().Fatal(ctx, err)
  31. }
  32. defer shutdown()
  33. // Set ORM cache adapter with redis.
  34. g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))
  35. s := grpcx.Server.New()
  36. user.RegisterUserServer(s.Server, &Controller{})
  37. s.Run()
  38. }
  39. // Insert is a route handler for inserting user info into database.
  40. func (s *Controller) Insert(ctx context.Context, req *user.InsertReq) (res *user.InsertRes, err error) {
  41. result, err := g.Model("user").Ctx(ctx).Insert(g.Map{
  42. "name": req.Name,
  43. })
  44. if err != nil {
  45. return nil, err
  46. }
  47. id, _ := result.LastInsertId()
  48. res = &user.InsertRes{
  49. Id: int32(id),
  50. }
  51. return
  52. }
  53. // Query is a route handler for querying user info. It firstly retrieves the info from redis,
  54. // if there's nothing in the redis, it then does db select.
  55. func (s *Controller) Query(ctx context.Context, req *user.QueryReq) (res *user.QueryRes, err error) {
  56. err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
  57. Duration: 5 * time.Second,
  58. Name: s.userCacheKey(req.Id),
  59. Force: false,
  60. }).WherePri(req.Id).Scan(&res)
  61. if err != nil {
  62. return nil, err
  63. }
  64. return
  65. }
  66. // Delete is a route handler for deleting specified user info.
  67. func (s *Controller) Delete(ctx context.Context, req *user.DeleteReq) (res *user.DeleteRes, err error) {
  68. err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
  69. Duration: -1,
  70. Name: s.userCacheKey(req.Id),
  71. Force: false,
  72. }).WherePri(req.Id).Scan(&res)
  73. return
  74. }
  75. func (s *Controller) userCacheKey(id int32) string {
  76. return fmt.Sprintf(`userInfo:%d`, id)
  77. }

服务端代码简要说明:

1、首先,服务端需要通过 jaeger.Init 方法初始化 Jaeger

2、可以看到,业务逻辑和之前HTTP示例项目完全一致,只是接入层修改为了GRPC协议。

3、我们仍然通过缓存适配器的方式注入Redis缓存:

  1. g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))

5、这里也是通过 Cache 方法启用 ORM 的缓存特性,之前已经做过介绍,这里不再赘述。

GRPC Client

  1. package main
  2. import (
  3. "github.com/gogf/gf/contrib/registry/etcd/v2"
  4. "github.com/gogf/gf/contrib/rpc/grpcx/v2"
  5. "github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
  6. "github.com/gogf/gf/example/trace/grpc_with_db/protobuf/user"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/net/gtrace"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. )
  11. const (
  12. serviceName = "otlp-grpc-client"
  13. endpoint = "tracing-analysis-dc-bj.aliyuncs.com:8090"
  14. traceToken = "******_******"
  15. )
  16. func main() {
  17. grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))
  18. var ctx = gctx.New()
  19. shutdown, err := otlpgrpc.Init(serviceName, endpoint, traceToken)
  20. if err != nil {
  21. g.Log().Fatal(ctx, err)
  22. }
  23. defer shutdown()
  24. StartRequests()
  25. }
  26. func StartRequests() {
  27. ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
  28. defer span.End()
  29. client := user.NewUserClient(grpcx.Client.MustNewGrpcClientConn("demo"))
  30. // Baggage.
  31. ctx = gtrace.SetBaggageValue(ctx, "uid", 100)
  32. // Insert.
  33. insertRes, err := client.Insert(ctx, &user.InsertReq{
  34. Name: "john",
  35. })
  36. if err != nil {
  37. g.Log().Fatalf(ctx, `%+v`, err)
  38. }
  39. g.Log().Info(ctx, "insert id:", insertRes.Id)
  40. // Query.
  41. queryRes, err := client.Query(ctx, &user.QueryReq{
  42. Id: insertRes.Id,
  43. })
  44. if err != nil {
  45. g.Log().Errorf(ctx, `%+v`, err)
  46. return
  47. }
  48. g.Log().Info(ctx, "query result:", queryRes)
  49. // Delete.
  50. _, err = client.Delete(ctx, &user.DeleteReq{
  51. Id: insertRes.Id,
  52. })
  53. if err != nil {
  54. g.Log().Errorf(ctx, `%+v`, err)
  55. return
  56. }
  57. g.Log().Info(ctx, "delete id:", insertRes.Id)
  58. // Delete with error.
  59. _, err = client.Delete(ctx, &user.DeleteReq{
  60. Id: -1,
  61. })
  62. if err != nil {
  63. g.Log().Errorf(ctx, `%+v`, err)
  64. return
  65. }
  66. g.Log().Info(ctx, "delete id:", -1)
  67. }

客户端代码简要说明:

1、首先,客户端也是需要通过 jaeger.Init 方法初始化 Jaeger

2、客户端非常简单,内部初始化以及默认拦截器的设置已经由 Katyusha 框架封装好了,开发者只需要关心业务逻辑实现即可,

效果查看

启动服务端:

链路跟踪-GRPC示例 - 图2

启动客户端:

链路跟踪-GRPC示例 - 图3

这里客户端的执行最后报了一个错误,那是我们 故意为之,目的是演示 GRPC 报错时的链路信息展示。我们打开 jaeger 查看一下链路跟踪信息:

链路跟踪-GRPC示例 - 图4

可以看到本次请求涉及到两个服务: tracing-grpc-clienttracing-grpc-server,即客户端和服务端。整个请求链路涉及到 17span,客户端 5span,服务端 12span,并且产生了 2 个错误。我们点击查看详情:

链路跟踪-GRPC示例 - 图5

我们点击查看一下最后接口调用错误的 span 情况:

链路跟踪-GRPC示例 - 图6

看起来像个参数校验错误,点击查看 Events/Logs 中的请求参数:

链路跟踪-GRPC示例 - 图7

查看 Process 中的 Log 信息可以看到,是由于传递的参数为 -1,不满足校验规则,因此在数据校验的时候报错返回了。

GRPC Client

由于 ormredislogging 组件在之前的章节中已经介绍过链路信息,因此我们这里主要介绍 GRPC Client&Server 的链路信息。

Attributes

链路跟踪-GRPC示例 - 图8

Attribute/Tag说明
net.peer.ip请求的目标IP。
net.peer.port请求的目标端口。
rpc.grpc.status_codeGRPC 的内部状态码, 0 表示成功, 非0 表示失败。
rpc.serviceRPC 的服务名称,注意这里是 RPC 而不是 GRPC,因为这里是通用定义,客户端支持多种 RPC 通信协议, GRPC 只是其中一种。
rpc.methodRPC 的方法名称。
rpc.systemRPC 协议类型,如: grpc, thrift 等。

Events/Logs

链路跟踪-GRPC示例 - 图9

Event/Log说明
grpc.metadata.outgoingGRPC 客户端请求提交的 Metadata 信息,可能会比较大。
grpc.request.baggageGRPC 客户端请求提交的 Baggage 信息,用于服务间链路信息传递。
grpc.request.messageGRPC 客户端请求提交的 Message 数据,可能会比较大,最大只记录 512KB,如果超过该大小则忽略。仅对 Unary 请求类型有效。
grpc.response.messageGRPC 客户端请求接收返回的的 Message 信息,可能会比较大。仅对 Unary 请求类型有效。

GRPC Server

Attributes

链路跟踪-GRPC示例 - 图10

GRPC Server 端的 Attributes 含义同 GRPC Client,在同一请求中,打印的数据基本一致。

Events

链路跟踪-GRPC示例 - 图11

GRPC Server 端的 EventsGRPC Client 不同的是,在同一请求中,服务端接收到的 metadatagrpc.metadata.incoming,其他同 GRPC Client