3.2 gRPC

3.2.1 gRPC介绍

Jupiter微服务目前支持gRPCJupitergRPC服务提供了很多可观察性的手段。

内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC服务的可观测。

通过govern的治理端口,能够查看监控、HTTP实时信息

3.2.2 配置规范

配置说明

3.2.3 直连的gRPC

参考gRPC直连示例3.2 gRPC - 图1 (opens new window)

3.2.3.1 启动gRPC服务

配置项

  1. [jupiter.server.grpc]
  2. port = 9091

代码

  1. func main() {
  2. eng := NewEngine()
  3. eng.SetGovernor("127.0.0.1:9092")
  4. if err := eng.Run(); err != nil {
  5. xlog.Panic(err.Error())
  6. }
  7. }
  8. type Engine struct {
  9. jupiter.Application
  10. }
  11. func NewEngine() *Engine {
  12. eng := &Engine{}
  13. if err := eng.Startup(
  14. eng.serveGRPC,
  15. ); err != nil {
  16. xlog.Panic("startup", xlog.Any("err", err))
  17. }
  18. return eng
  19. }
  20. func (eng *Engine) serveGRPC() error {
  21. server := xgrpc.StdConfig("grpc").Build()
  22. helloworld.RegisterGreeterServer(server.Server, new(Greeter))
  23. return eng.Serve(server)
  24. }
  25. type Greeter struct{}
  26. func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
  27. return &helloworld.HelloReply{
  28. Message: "Hello Jupiter",
  29. }, nil
  30. }

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,接下来我们启动客户端

3.2.3.2 启动gRPC客户端

配置项

  1. [jupiter.client.directserver]
  2. address = "127.0.0.1:9091"
  3. balancerName = "round_robin" # 默认值
  4. block = false # 默认值
  5. dialTimeout = "0s" # 默认值

代码

  1. func main() {
  2. eng := NewEngine()
  3. if err := eng.Run(); err != nil {
  4. xlog.Error(err.Error())
  5. }
  6. }
  7. type Engine struct {
  8. jupiter.Application
  9. }
  10. func NewEngine() *Engine {
  11. eng := &Engine{}
  12. if err := eng.Startup(
  13. eng.consumer,
  14. ); err != nil {
  15. xlog.Panic("startup", xlog.Any("err", err))
  16. }
  17. return eng
  18. }
  19. func (eng *Engine) consumer() error {
  20. conn := grpc.StdConfig("directserver").Build()
  21. client := helloworld.NewGreeterClient(conn)
  22. for {
  23. resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
  24. Name: "jupiter",
  25. })
  26. if err != nil {
  27. xlog.Error(err.Error())
  28. } else {
  29. xlog.Info("receive response", xlog.String("resp", resp.Message))
  30. }
  31. time.Sleep(1 * time.Second)
  32. }
  33. return nil
  34. }

我们的gRPC客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC服务端。运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们定时1s,发送hellogRPC服务端,可以收到服务端响应的Hello Jupiter

3.2.4 注册ETCD的gRPC服务

参考gRPC注册ETCD示例3.2 gRPC - 图4 (opens new window)

3.2.4.2 启动gRPC服务

配置项

  1. [jupiter.server.grpc]
  2. port = 9091 # 服务端grpc绑定端口
  3. [jupiter.registry.wh] # 注册grpc到etcd的配置
  4. connectTimeout = "1s"
  5. endpoints=["127.0.0.1:2379"] # grpc注册到目标etcd中
  6. secure = false
  7. prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致

代码

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/douyu/jupiter"
  6. compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
  7. etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
  8. "github.com/douyu/jupiter/pkg/server/xgrpc"
  9. "github.com/douyu/jupiter/pkg/xlog"
  10. "google.golang.org/grpc/examples/helloworld/helloworld"
  11. )
  12. func main() {
  13. eng := NewEngine()
  14. eng.SetRegistry(
  15. compound_registry.New(
  16. etcdv3_registry.StdConfig("wh").Build(),
  17. ),
  18. )
  19. //eng.SetGovernor("0.0.0.0:0")
  20. if err := eng.Run(); err != nil {
  21. xlog.Error(err.Error())
  22. }
  23. }
  24. type Engine struct {
  25. jupiter.Application
  26. }
  27. func NewEngine() *Engine {
  28. eng := &Engine{}
  29. if err := eng.Startup(
  30. eng.serveGRPC,
  31. ); err != nil {
  32. xlog.Panic("startup", xlog.Any("err", err))
  33. }
  34. return eng
  35. }
  36. func (eng *Engine) serveGRPC() error {
  37. server := xgrpc.StdConfig("grpc").Build()
  38. helloworld.RegisterGreeterServer(server.Server, new(Greeter))
  39. return eng.Serve(server)
  40. }
  41. type Greeter struct {
  42. server *xgrpc.Server
  43. }
  44. func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
  45. sd := &helloworld.HelloReply{
  46. Message: "返回信息给client",
  47. }
  48. fmt.Println(fmt.Sprintf("name:%s",request.Name))
  49. return sd,nil
  50. }

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,在命令行的第四行,展示了我们注册的keyvalue信息。接下来我们在启动客户端。

3.2.4.2 启动gRPC客户端

配置项

  1. [jupiter.registry.wh]
  2. connectTimeout = "1s"
  3. endpoints=["127.0.0.1:2379"]
  4. secure = false
  5. prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
  6. [jupiter.client.etcdserver]
  7. address = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
  8. balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
  9. block = false # 默认值
  10. dialTimeout = "0s" # 默认值

grpc客户端代码demo

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/douyu/jupiter"
  7. "github.com/douyu/jupiter/pkg/client/grpc"
  8. "github.com/douyu/jupiter/pkg/client/grpc/balancer"
  9. "github.com/douyu/jupiter/pkg/client/grpc/resolver"
  10. "github.com/douyu/jupiter/pkg/registry/etcdv3"
  11. "github.com/douyu/jupiter/pkg/xlog"
  12. "google.golang.org/grpc/examples/helloworld/helloworld"
  13. )
  14. func main() {
  15. eng := NewEngine()
  16. if err := eng.Run(); err != nil {
  17. xlog.Error(err.Error())
  18. }
  19. fmt.Printf("111 = %+v\n", 111)
  20. }
  21. type Engine struct {
  22. jupiter.Application
  23. }
  24. func NewEngine() *Engine {
  25. eng := &Engine{}
  26. if err := eng.Startup(
  27. eng.initResolver,
  28. eng.consumer,
  29. ); err != nil {
  30. xlog.Panic("startup", xlog.Any("err", err))
  31. }
  32. return eng
  33. }
  34. func (eng *Engine) initResolver() error {
  35. resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
  36. return nil
  37. }
  38. func (eng *Engine) consumer() error {
  39. config := grpc.StdConfig("etcdserver")
  40. //config.BalancerName = balancer.NameSmoothWeightRoundRobin
  41. client := helloworld.NewGreeterClient(config.Build())
  42. go func() {
  43. i:=0
  44. ghj := map[string]int{}
  45. for {
  46. i++
  47. resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
  48. Name: fmt.Sprintf("jupiter:%d",i),
  49. })
  50. if err != nil {
  51. fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
  52. xlog.Error(err.Error())
  53. } else {
  54. ghj[resp.Message] = ghj[resp.Message] + 1
  55. fmt.Printf("resp.Message = %+v\n", ghj)
  56. xlog.Info("receive response", xlog.String("resp", resp.Message))
  57. }
  58. time.Sleep(1 * time.Second)
  59. }
  60. }()
  61. return nil
  62. }

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们的gRPC客户端通过应用名称mainETCD中获取到服务地址,并监听了/wsd-reg/main,用于后续更新服务地址。

客户端会定时1s,发送hellogRPC服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端

3.2.4.3 从零开始配置

环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的minerva就用1.8

生成potobuf

  1. syntax = "proto3";
  2. package pb;
  3. service Hello {
  4. // SayHello
  5. rpc SayHello(SayHelloReq) returns (SayHelloRes);
  6. }
  7. message SayHelloReq{
  8. string name = 1;
  9. }
  10. message SayHelloRes{
  11. string resp = 1;
  12. }

执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto 拿到对应pb包放入对应项目的路径

go build -o jupiter-demo main.go 记得把下面的demo编译成jupiter-demo,再执行 jupiter 客户端 使用demo

  1. # 配置
  2. [jupiter.registry.wh]
  3. connectTimeout = "1s"
  4. endpoints=["127.0.0.1:2379"]
  5. secure = false
  6. prefix = "wsd-reg" # 前缀
  7. [jupiter.client.etcdserver]
  8. address = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
  9. block = true # 默认值
  10. dialTimeout = "0s" # 默认值
  11. package main
  12. import (
  13. "clientb/pb"
  14. "context"
  15. "fmt"
  16. "time"
  17. "github.com/douyu/jupiter"
  18. "github.com/douyu/jupiter/pkg/client/grpc"
  19. "github.com/douyu/jupiter/pkg/client/grpc/balancer"
  20. "github.com/douyu/jupiter/pkg/client/grpc/resolver"
  21. "github.com/douyu/jupiter/pkg/registry/etcdv3"
  22. "github.com/douyu/jupiter/pkg/xlog"
  23. )
  24. func main() {
  25. eng := NewEngine()
  26. if err := eng.Run(); err != nil {
  27. xlog.Error(err.Error())
  28. }
  29. fmt.Printf("111 = %+v\n", 111)
  30. }
  31. type Engine struct {
  32. jupiter.Application
  33. }
  34. func NewEngine() *Engine {
  35. eng := &Engine{}
  36. if err := eng.Startup(
  37. eng.initResolver,
  38. eng.consumer,
  39. ); err != nil {
  40. xlog.Panic("startup", xlog.Any("err", err))
  41. }
  42. return eng
  43. }
  44. func (eng *Engine) initResolver() error {
  45. resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
  46. return nil
  47. }
  48. func (eng *Engine) consumer() error {
  49. config := grpc.StdConfig("etcdserver")
  50. config.BalancerName = balancer.NameSmoothWeightRoundRobin
  51. client := pb.NewHelloClient(config.Build())
  52. go func() {
  53. i:=0
  54. ghj := map[string]int{}
  55. for {
  56. i++
  57. resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
  58. Name: fmt.Sprintf("jupiter:%d",i),
  59. })
  60. if err != nil {
  61. fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
  62. xlog.Error(err.Error())
  63. } else {
  64. ghj[resp.Resp] = ghj[resp.Resp] + 1
  65. fmt.Printf("resp.Message = %+v\n", ghj)
  66. xlog.Info("receive response", xlog.String("resp", resp.Resp))
  67. }
  68. time.Sleep(1 * time.Second)
  69. }
  70. }()
  71. return nil
  72. }

minerva 客户端 demo

  1. # 配置
  2. [app]
  3. [app.registry.etcd]# 使用etcd作为服务发现
  4. endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
  5. timeout="2s"
  6. [minerva]
  7. [minerva.grpc]
  8. [minerva.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
  9. debug = true # Debug开关
  10. enableMetric = true # 指标采集开关
  11. enableAccessLog = true # 访问日志开关
  12. addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
  13. dialTimeout = "1s" # 拨超时
  14. readTimeout = "1s" # 读超时
  15. enableTrace = false # 链路追踪开关
  16. balancerName = "round_robin" # 默认为round_robin
  17. level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
  18. wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
  19. direct = false # 直连服务,不经过负载均衡器
  20. slowThreshold = "1s" # slow日志门限值
  21. package main
  22. import (
  23. "context"
  24. "demomimi/pb"
  25. "fmt"
  26. "time"
  27. )
  28. import "git.xxxx.com/vega/minerva/client/gusty"
  29. // 新建demo客户端
  30. var (
  31. DemoClient pb.HelloClient
  32. )
  33. // 读取demo的grpc配置,并初始化
  34. func init() {
  35. DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
  36. }
  37. func main() {
  38. sdfg := map[string]int{}
  39. for{
  40. resp,err:=SayHello()
  41. if err !=nil{
  42. fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
  43. continue
  44. }
  45. sdfg[resp.Resp] = sdfg[resp.Resp] + 1
  46. fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
  47. time.Sleep(time.Second * 1)
  48. }
  49. }
  50. // SayHello 调用该grpc的方法
  51. func SayHello() (*pb.SayHelloRes, error) {
  52. ctx := context.Background()
  53. ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
  54. defer grpcTimeOut()
  55. helloRes, err := DemoClient.SayHello(
  56. ctx,
  57. &pb.SayHelloReq{
  58. Name: fmt.Sprintf("word"),
  59. },
  60. )
  61. if err != nil {
  62. return &pb.SayHelloRes{}, err
  63. }
  64. return helloRes, nil
  65. }

jupiter 服务端调用

  1. [jupiter.server.grpc]
  2. port = 20102
  3. [jupiter.registry.wh]
  4. connectTimeout = "1s"
  5. endpoints=["127.0.0.1:2379"]
  6. secure = false
  7. prefix = "wsd-reg" # 这个前缀记得跟client保持一致
  8. package main
  9. import (
  10. "context"
  11. "fmt"
  12. "github.com/douyu/jupiter"
  13. compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
  14. etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
  15. "github.com/douyu/jupiter/pkg/server/xgrpc"
  16. "github.com/douyu/jupiter/pkg/xlog"
  17. "google.golang.org/grpc/examples/helloworld/helloworld"
  18. )
  19. func main() {
  20. eng := NewEngine()
  21. eng.SetRegistry(
  22. compound_registry.New(
  23. etcdv3_registry.StdConfig("wh").Build(),
  24. ),
  25. )
  26. //eng.SetGovernor("0.0.0.0:0")
  27. if err := eng.Run(); err != nil {
  28. xlog.Error(err.Error())
  29. }
  30. }
  31. type Engine struct {
  32. jupiter.Application
  33. }
  34. func NewEngine() *Engine {
  35. eng := &Engine{}
  36. if err := eng.Startup(
  37. eng.serveGRPC,
  38. ); err != nil {
  39. xlog.Panic("startup", xlog.Any("err", err))
  40. }
  41. return eng
  42. }
  43. func (eng *Engine) serveGRPC() error {
  44. fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
  45. server := xgrpc.StdConfig("grpc").Build()
  46. helloworld.RegisterGreeterServer(server.Server, new(Greeter))
  47. return eng.Serve(server)
  48. }
  49. type Greeter struct {
  50. server *xgrpc.Server
  51. }
  52. func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
  53. sd := &helloworld.HelloReply{
  54. Message: "我是client_b",
  55. }
  56. fmt.Println(fmt.Sprintf("name:%s",request.Name))
  57. return sd,nil
  58. }

minerva 服务端 demo

  1. [app]
  2. mode="local"
  3. [app.registry]
  4. [app.registry.etcd]
  5. endPoints = ["127.0.0.1:2379"]
  6. timeout = "2s"
  7. [server.grpc]
  8. port=9001
  9. name="demo"
  10. [server.grpc.labels]
  11. group = "default" # default: default
  12. weight = "10" # default: 100
  13. enable = "true" # default: true
  14. package main
  15. import (
  16. "context"
  17. "fmt"
  18. "git.xxxx.com/vega/minerva"
  19. "git.xxxx.com/vega/minerva/application"
  20. "git.xxxx.com/vega/minerva/server"
  21. "git.xxxx.com/vega/minerva/server/yell"
  22. "mimi/pb"
  23. )
  24. // 定义handler
  25. type HelloHandler struct {
  26. yell.Handler
  27. }
  28. // 定义方法
  29. func (s *HelloHandler) SayHello(ctx context.Context, in *pb.SayHelloReq) (out *pb.SayHelloRes, err error) {
  30. fmt.Println(in.Name,"nnnnnnnn")
  31. return &pb.SayHelloRes{Resp:"hello1"},nil
  32. }
  33. // 定义一个grpc server,应用微服务的理念,一个服务只能定义一个server
  34. type GrpcServer struct {
  35. *yell.Server
  36. }
  37. // 将HelloServer注册到server
  38. func (s *GrpcServer) Mux() {
  39. s.Register(pb.RegisterHelloServer, new(HelloHandler))
  40. // 这里可以继续注册其他的Handler
  41. }
  42. // main函数中启动grpc server
  43. func main() {
  44. fmt.Println(application.BuildFlags())
  45. app := minerva.NewAPP()
  46. app.Serve(
  47. new(GrpcServer),
  48. server.StdConfig("grpc"),
  49. server.Host("127.0.0.1"),
  50. )
  51. app.Run()
  52. }