3.2 gRPC
3.2.1 gRPC介绍
Jupiter
微服务目前支持gRPC
,Jupiter
对gRPC
服务提供了很多可观察性的手段。
内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC
服务的可观测。
通过govern
的治理端口,能够查看监控、HTTP实时信息
3.2.2 配置规范
3.2.3 直连的gRPC
3.2.3.1 启动gRPC服务
配置项
[jupiter.server.grpc]
port = 9091
代码
func main() {
eng := NewEngine()
eng.SetGovernor("127.0.0.1:9092")
if err := eng.Run(); err != nil {
xlog.Panic(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct{}
func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
return &helloworld.HelloReply{
Message: "Hello Jupiter",
}, nil
}
运行指令go run main.go --config=config.toml
,可以看到以下运行结果 从图中可以看到,我们启动了一个gRPC
服务运行在9091
端口,接下来我们启动客户端
3.2.3.2 启动gRPC客户端
配置项
[jupiter.client.directserver]
address = "127.0.0.1:9091"
balancerName = "round_robin" # 默认值
block = false # 默认值
dialTimeout = "0s" # 默认值
代码
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) consumer() error {
conn := grpc.StdConfig("directserver").Build()
client := helloworld.NewGreeterClient(conn)
for {
resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
Name: "jupiter",
})
if err != nil {
xlog.Error(err.Error())
} else {
xlog.Info("receive response", xlog.String("resp", resp.Message))
}
time.Sleep(1 * time.Second)
}
return nil
}
我们的gRPC
客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC
服务端。运行指令go run main.go --config=config.toml
,可以看到以下运行结果 我们定时1s,发送hello
给gRPC
服务端,可以收到服务端响应的Hello Jupiter
3.2.4 注册ETCD的gRPC服务
参考gRPC注册ETCD示例 (opens new window)
3.2.4.2 启动gRPC服务
配置项
[jupiter.server.grpc]
port = 9091 # 服务端grpc绑定端口
[jupiter.registry.wh] # 注册grpc到etcd的配置
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"] # grpc注册到目标etcd中
secure = false
prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
代码
package main
import (
"context"
"fmt"
"github.com/douyu/jupiter"
compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
eng.SetRegistry(
compound_registry.New(
etcdv3_registry.StdConfig("wh").Build(),
),
)
//eng.SetGovernor("0.0.0.0:0")
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct {
server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
sd := &helloworld.HelloReply{
Message: "返回信息给client",
}
fmt.Println(fmt.Sprintf("name:%s",request.Name))
return sd,nil
}
运行指令go run main.go --config=config.toml
,可以看到以下运行结果 从图中可以看到,我们启动了一个gRPC
服务运行在9091
端口,在命令行的第四行,展示了我们注册的key
和value
信息。接下来我们在启动客户端。
3.2.4.2 启动gRPC客户端
配置项
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
[jupiter.client.etcdserver]
address = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
block = false # 默认值
dialTimeout = "0s" # 默认值
grpc客户端代码demo
package main
import (
"context"
"fmt"
"time"
"github.com/douyu/jupiter"
"github.com/douyu/jupiter/pkg/client/grpc"
"github.com/douyu/jupiter/pkg/client/grpc/balancer"
"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.initResolver,
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) initResolver() error {
resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
return nil
}
func (eng *Engine) consumer() error {
config := grpc.StdConfig("etcdserver")
//config.BalancerName = balancer.NameSmoothWeightRoundRobin
client := helloworld.NewGreeterClient(config.Build())
go func() {
i:=0
ghj := map[string]int{}
for {
i++
resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
Name: fmt.Sprintf("jupiter:%d",i),
})
if err != nil {
fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
xlog.Error(err.Error())
} else {
ghj[resp.Message] = ghj[resp.Message] + 1
fmt.Printf("resp.Message = %+v\n", ghj)
xlog.Info("receive response", xlog.String("resp", resp.Message))
}
time.Sleep(1 * time.Second)
}
}()
return nil
}
运行指令go run main.go --config=config.toml
,可以看到以下运行结果 我们的gRPC
客户端通过应用名称main
从ETCD
中获取到服务地址,并监听了/wsd-reg/main
,用于后续更新服务地址。
客户端会定时1s,发送hello
给gRPC
服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端
3.2.4.3 从零开始配置
环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的minerva就用1.8
生成potobuf
syntax = "proto3";
package pb;
service Hello {
// SayHello
rpc SayHello(SayHelloReq) returns (SayHelloRes);
}
message SayHelloReq{
string name = 1;
}
message SayHelloRes{
string resp = 1;
}
执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto
拿到对应pb包放入对应项目的路径
go build -o jupiter-demo main.go
记得把下面的demo编译成jupiter-demo,再执行 jupiter 客户端 使用demo
# 配置
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 前缀
[jupiter.client.etcdserver]
address = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
block = true # 默认值
dialTimeout = "0s" # 默认值
package main
import (
"clientb/pb"
"context"
"fmt"
"time"
"github.com/douyu/jupiter"
"github.com/douyu/jupiter/pkg/client/grpc"
"github.com/douyu/jupiter/pkg/client/grpc/balancer"
"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
)
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.initResolver,
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) initResolver() error {
resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
return nil
}
func (eng *Engine) consumer() error {
config := grpc.StdConfig("etcdserver")
config.BalancerName = balancer.NameSmoothWeightRoundRobin
client := pb.NewHelloClient(config.Build())
go func() {
i:=0
ghj := map[string]int{}
for {
i++
resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
Name: fmt.Sprintf("jupiter:%d",i),
})
if err != nil {
fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
xlog.Error(err.Error())
} else {
ghj[resp.Resp] = ghj[resp.Resp] + 1
fmt.Printf("resp.Message = %+v\n", ghj)
xlog.Info("receive response", xlog.String("resp", resp.Resp))
}
time.Sleep(1 * time.Second)
}
}()
return nil
}
minerva 客户端 demo
# 配置
[app]
[app.registry.etcd]# 使用etcd作为服务发现
endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
timeout="2s"
[minerva]
[minerva.grpc]
[minerva.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
debug = true # Debug开关
enableMetric = true # 指标采集开关
enableAccessLog = true # 访问日志开关
addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
dialTimeout = "1s" # 拨超时
readTimeout = "1s" # 读超时
enableTrace = false # 链路追踪开关
balancerName = "round_robin" # 默认为round_robin
level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
direct = false # 直连服务,不经过负载均衡器
slowThreshold = "1s" # slow日志门限值
package main
import (
"context"
"demomimi/pb"
"fmt"
"time"
)
import "git.xxxx.com/vega/minerva/client/gusty"
// 新建demo客户端
var (
DemoClient pb.HelloClient
)
// 读取demo的grpc配置,并初始化
func init() {
DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
}
func main() {
sdfg := map[string]int{}
for{
resp,err:=SayHello()
if err !=nil{
fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
continue
}
sdfg[resp.Resp] = sdfg[resp.Resp] + 1
fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
time.Sleep(time.Second * 1)
}
}
// SayHello 调用该grpc的方法
func SayHello() (*pb.SayHelloRes, error) {
ctx := context.Background()
ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
defer grpcTimeOut()
helloRes, err := DemoClient.SayHello(
ctx,
&pb.SayHelloReq{
Name: fmt.Sprintf("word"),
},
)
if err != nil {
return &pb.SayHelloRes{}, err
}
return helloRes, nil
}
jupiter 服务端调用
[jupiter.server.grpc]
port = 20102
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 这个前缀记得跟client保持一致
package main
import (
"context"
"fmt"
"github.com/douyu/jupiter"
compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
eng.SetRegistry(
compound_registry.New(
etcdv3_registry.StdConfig("wh").Build(),
),
)
//eng.SetGovernor("0.0.0.0:0")
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct {
server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
sd := &helloworld.HelloReply{
Message: "我是client_b",
}
fmt.Println(fmt.Sprintf("name:%s",request.Name))
return sd,nil
}
minerva 服务端 demo
[app]
mode="local"
[app.registry]
[app.registry.etcd]
endPoints = ["127.0.0.1:2379"]
timeout = "2s"
[server.grpc]
port=9001
name="demo"
[server.grpc.labels]
group = "default" # default: default
weight = "10" # default: 100
enable = "true" # default: true
package main
import (
"context"
"fmt"
"git.xxxx.com/vega/minerva"
"git.xxxx.com/vega/minerva/application"
"git.xxxx.com/vega/minerva/server"
"git.xxxx.com/vega/minerva/server/yell"
"mimi/pb"
)
// 定义handler
type HelloHandler struct {
yell.Handler
}
// 定义方法
func (s *HelloHandler) SayHello(ctx context.Context, in *pb.SayHelloReq) (out *pb.SayHelloRes, err error) {
fmt.Println(in.Name,"nnnnnnnn")
return &pb.SayHelloRes{Resp:"hello1"},nil
}
// 定义一个grpc server,应用微服务的理念,一个服务只能定义一个server
type GrpcServer struct {
*yell.Server
}
// 将HelloServer注册到server
func (s *GrpcServer) Mux() {
s.Register(pb.RegisterHelloServer, new(HelloHandler))
// 这里可以继续注册其他的Handler
}
// main函数中启动grpc server
func main() {
fmt.Println(application.BuildFlags())
app := minerva.NewAPP()
app.Serve(
new(GrpcServer),
server.StdConfig("grpc"),
server.Host("127.0.0.1"),
)
app.Run()
}