grpc-goはInterceptor(Middleware)でhandlerの前後で処理を行うことができる。 UnaryとStreamでシグネチャが異なる。
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
resp, err := handler(newCtx, req)
fmt.Println(resp)
return resp, err
}
}
今回は良く使うgo-grpc-middlewareの
Interceptorの挙動を確認する。
proto
UnaryなRPCとBidirectional streaming(client, server共にstream)なRPCを一つずつ用意する。
$ cat protos/sample/service.proto
syntax = "proto3";
package sample;
message SampleRequest {
string sampleInput = 1;
}
message SampleResponse {
string sampleOutput = 1;
}
service SampleService {
rpc SampleUnary (SampleRequest) returns (SampleResponse) {}
rpc SampleStream (stream SampleRequest) returns (stream SampleResponse) {}
}
$ go get -u github.com/golang/protobuf/protoc-gen-go
$ mkdir go
$ protoc --go_out=plugins=grpc:./go --proto_path=./protos protos/*/*.proto
$ ls go/sample
service.pb.go
Service
Unaryの方でcontextからユーザー名を取り出しているのと、 Streamの方では"panic"という入力が来たときにpanicになるようにしている。
type sampleService struct{}
type ctxKey string
var ctxKeyUserName = "userNames"
func (s *sampleService) SampleUnary(ctx context.Context, in *sample.SampleRequest) (*sample.SampleResponse, error) {
return &sample.SampleResponse{SampleOutput: ctx.Value(ctxKeyUserName).(string)}, nil
}
func (s *sampleService) SampleStream(stream sample.SampleService_SampleStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if in.SampleInput == "panic" {
panic("received panic")
}
inputRunes := []rune(in.SampleInput)
for i := 0; i < len(inputRunes); i++ {
if err := stream.Send(&sample.SampleResponse{
SampleOutput: string(inputRunes[i]),
}); err != nil {
return err
}
}
}
}
Server
NewServerでUnary/StreamそれぞれのInterceptorを登録する。 prometheusはさらにEnableHandlingTimeHistogram()してRegisterしている。
zapにはNewProduction()のloggerをそのまま渡しているのでログはstderrに出力される。 これをReplaceGrpcLogger()でも渡しているのでgRPCライブラリ内部のエラーも出る。
Golangの高速なロガーzapとlumberjackでログを出力してrotateさせる - sambaiz-net
package main
import (
"context"
"fmt"
"io"
"math"
"net"
"net/http"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sambaiz/grpc-interceptors/go/sample"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func main() {
zapLogger, err := zap.NewProduction()
if err != nil {
panic(err)
}
grpc_zap.ReplaceGrpcLogger(zapLogger)
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_recovery.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(zapLogger),
grpc_auth.StreamServerInterceptor(auth),
grpc_prometheus.StreamServerInterceptor,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_recovery.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(zapLogger),
grpc_auth.UnaryServerInterceptor(auth),
grpc_prometheus.UnaryServerInterceptor,
)),
grpc.MaxRecvMsgSize(math.MaxInt32))
grpc_prometheus.EnableHandlingTimeHistogram()
grpc_prometheus.Register(grpcServer)
http.Handle("/metrics", promhttp.Handler())
go func() {
if err := http.ListenAndServe(":8081", nil); err != nil {
panic(err)
}
}()
sample.RegisterSampleServiceServer(grpcServer, &sampleService{})
port := 8080
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return
}
fmt.Printf("GRPC server started on :%d\n", port)
grpcServer.Serve(listener)
}
authに渡している関数ではAuthFromMD()でmetadataのauthorizationのトークンを取得しユーザー名に変換してcontextに入れている。
func auth(ctx context.Context) (context.Context, error) {
token, err := grpc_auth.AuthFromMD(ctx, "basic")
if err != nil {
return nil, err
}
users := map[string]string{
"aaaaaa": "sam",
}
userName, ok := users[token]
if !ok {
return nil, grpc.Errorf(codes.Unauthenticated, "invalid auth token")
}
newCtx := context.WithValue(ctx, ctxKeyUserName, userName)
return newCtx, nil
}
Client
UnaryとStreamのRPCにそれぞれリクエストを送る。
WithPerRPCCredentials()で各RPCでauthorizationがmetadataとして渡るようにしている。 AppendToOutgoingContextで都度含めることもできる。
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
メトリクスは出していないがprometheusのClientのInterceptorを付けている。 このようにIntercpetorによってはClient用のものも用意されている。
package main
import (
"context"
"fmt"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sambaiz/grpc-interceptors/go/sample"
"google.golang.org/grpc"
)
type cred struct{}
func (c *cred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "basic aaaaaa",
}, nil
}
func (c *cred) RequireTransportSecurity() bool {
return false
}
func main() {
conn, err := grpc.Dial(
"localhost:8080",
grpc.WithInsecure(),
grpc.WithPerRPCCredentials(&cred{}),
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
)
if err != nil {
panic(err)
}
client := sample.NewSampleServiceClient(conn)
fmt.Println("- Unary -")
unaryResult, err := client.SampleUnary(context.Background(), &sample.SampleRequest{
SampleInput: "hello",
})
if err != nil {
panic(err)
}
fmt.Println(unaryResult.SampleOutput)
streamClient, err := client.SampleStream(context.Background())
if err != nil {
panic(err)
}
defer streamClient.CloseSend()
fmt.Println("- Stream -")
if err := streamClient.Send(&sample.SampleRequest{
SampleInput: "he",
}); err != nil {
panic(err)
}
if err := streamClient.Send(&sample.SampleRequest{
SampleInput: "llo",
}); err != nil {
panic(err)
}
if err := streamClient.Send(&sample.SampleRequest{
SampleInput: "panic",
}); err != nil {
panic(err)
}
for {
response, err := streamClient.Recv()
if err != nil {
fmt.Println(err)
break
} else {
fmt.Println(response.SampleOutput)
}
}
}
実行結果
authによってユーザー名がcontextに入っている。 recoveryがrecoverしているのでpanicしてもサーバーは落ちていない。
$ go run client.go
- Unary -
sam
- Stream -
h
e
l
l
o
rpc error: code = Internal desc = received panic
$ go run main.go
- Unary -
sam
...
zapでログが出ている。
GRPC server started on :8080
{"level":"info","ts":1530020624.963142,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","grpc.start_time":"2018-06-26T22:43:44+09:00","system":"grpc","span.kind":"server","grpc.service":"sample.SampleService","grpc.method":"SampleUnary","grpc.code":"OK","grpc.time_ms":0.27799999713897705}
{"level":"info","ts":1530020624.9652505,"caller":"zap/grpclogger.go:41","msg":"transport: loopyWriter.run returning. connection error: desc = \"transport is closing\"","system":"grpc","grpc_log":true}
prometheusのInterceptorでCounterのメトリクスが追加されている。
grpc_server_handled_total{grpc_code="OK",grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary"} 1
grpc_server_msg_sent_total{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 5
EnableHandlingTimeHistogram()
でHistogramsのメトリクスも追加されている。
grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.005"} 1
grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.01"} 1
grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.025"} 1
grpc_server_handling_seconds_sum{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 271.267611998
grpc_server_handling_seconds_count{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 1