gRPC
Our transporter/grpc is developed upon gRPC, and implements Transporter
interface. You could use it for the communication between services on gRPC protocol.
Server
Options
Network()
To set communication protocol such as tcp.
Address()
To set server’s listening address.
Timeout()
To set the server-side timeout.
Logger()
To set logger.
Middleware()
To set middleware for gRPC server.
UnaryInterceptor()
To set interceptors for gRPC server.
Options()
To set some extra grpc.ServerOption
Implementation Details
NewServer()
func NewServer(opts ...ServerOption) *Server {
// grpc server default configuration
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
log: log.NewHelper(log.DefaultLogger),
}
// apply opts
for _, o := range opts {
o(srv)
}
// convert middleware to grpc interceptor
var ints = []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}
if len(srv.ints) > 0 {
ints = append(ints, srv.ints...)
}
// convert UnaryInterceptor to ServerOption
var grpcOpts = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(ints...),
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
// create grpc server
srv.Server = grpc.NewServer(grpcOpts...)
// create metadata server
srv.metadata = apimd.NewServer(srv.Server)
// register these internal API
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
return srv
}
unaryServerInterceptor()
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// merge two ctx
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// get metadata from ctx
md, _ := grpcmd.FromIncomingContext(ctx)
// bind some information into ctx
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
header: headerCarrier(md),
})
// set timeout
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
return h(ctx, req)
}
}
Usage
These are some basic usage of gRPC, you could refer to gRPC Docs for advanced examples.
Register gRPC Server
gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)
Set middleware in gRPC Server
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
logging.Server(),
),
)
Process Request in gRPC Middleware
if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}
client
Options
WithEndpoint()
To set the endpoint which the client will connect to.
WithTimeout()
To set the client-side timeout.
WithMiddleware()
To set middleware.
WithDiscovery()
To set the discovery for gRPC client.
WithUnaryInterceptor()
To set interceptors for gRPC client.
WithOptions()
To set some extra grpc.ClientOption
Implementation Details
dial()
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// default options
options := clientOptions{
timeout: 500 * time.Millisecond,
}
// apply opts
for _, o := range opts {
o(&options)
}
// convert middleware to grpc interceptor
var ints = []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
var grpcOpts = []grpc.DialOption{
// client side balancer
grpc.WithBalancerName(roundrobin.Name),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// To use service discovery
grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
}
if insecure {
// to disable transport security for connection
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
unaryClientInterceptor()
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// bind some information into ctx
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
header: headerCarrier{},
})
if timeout > 0 {
// set the timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
keys := tr.Header().Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, tr.Header().Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
_, err := h(ctx, req)
return err
}
}
Usage
Client Connection
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)
Middleware
conn, err := grpc.DialInsecure(
context.Background(),
transport.WithEndpoint("127.0.0.1:9000"),
transport.WithMiddleware(
recovery.Recovery(),
),
)
Service Discovery
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)