GoのgRPC ServerのInterceptor(recovery/auth/zap/prometheus)

golanggrpc

grpc-goはInterceptor(Middleware)でhandlerの前後で処理を行うことができる。 UnaryとStreamでシグネチャが異なる。

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        resp, err := handler(newCtx, req)
        fmt.Println(resp)
        return resp, err
	}
}

今回は良く使うgo-grpc-middleware

Interceptorの挙動を確認する。

proto

UnaryなRPCとBidirectional streaming(client, server共にstream)なRPCを一つずつ用意する。

$ cat protos/sample/service.proto
syntax = "proto3";

package sample;

message SampleRequest {
  string sampleInput = 1;
}

message SampleResponse {
  string sampleOutput = 1;
}

service SampleService {
  rpc SampleUnary (SampleRequest) returns (SampleResponse) {}
  rpc SampleStream (stream SampleRequest) returns (stream SampleResponse) {}
}

$ go get -u github.com/golang/protobuf/protoc-gen-go
$ mkdir go
$ protoc --go_out=plugins=grpc:./go --proto_path=./protos protos/*/*.proto
$ ls go/sample
service.pb.go

Service

Unaryの方でcontextからユーザー名を取り出しているのと、 Streamの方では"panic"という入力が来たときにpanicになるようにしている。

type sampleService struct{}

type ctxKey string

var ctxKeyUserName = "userNames"

func (s *sampleService) SampleUnary(ctx context.Context, in *sample.SampleRequest) (*sample.SampleResponse, error) {
	return &sample.SampleResponse{SampleOutput: ctx.Value(ctxKeyUserName).(string)}, nil
}

func (s *sampleService) SampleStream(stream sample.SampleService_SampleStreamServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		if in.SampleInput == "panic" {
			panic("received panic")
		}
		inputRunes := []rune(in.SampleInput)
		for i := 0; i < len(inputRunes); i++ {
			if err := stream.Send(&sample.SampleResponse{
				SampleOutput: string(inputRunes[i]),
			}); err != nil {
				return err
			}
		}
	}
}

Server

NewServerでUnary/StreamそれぞれのInterceptorを登録する。 prometheusはさらにEnableHandlingTimeHistogram()してRegisterしている。

zapにはNewProduction()のloggerをそのまま渡しているのでログはstderrに出力される。 これをReplaceGrpcLogger()でも渡しているのでgRPCライブラリ内部のエラーも出る。

Golangの高速なロガーzapとlumberjackでログを出力してrotateさせる - sambaiz-net

package main

import (
	"context"
	"fmt"
	"io"
	"math"
	"net"
	"net/http"

	"github.com/grpc-ecosystem/go-grpc-middleware"
	"github.com/grpc-ecosystem/go-grpc-middleware/auth"
	"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
	"github.com/grpc-ecosystem/go-grpc-middleware/recovery"
	"github.com/grpc-ecosystem/go-grpc-prometheus"
	"github.com/sambaiz/grpc-interceptors/go/sample"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

func main() {
	zapLogger, err := zap.NewProduction()
	if err != nil {
		panic(err)
	}
	grpc_zap.ReplaceGrpcLogger(zapLogger)
	grpcServer := grpc.NewServer(
		grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
			grpc_recovery.StreamServerInterceptor(),
			grpc_zap.StreamServerInterceptor(zapLogger),
			grpc_auth.StreamServerInterceptor(auth),
			grpc_prometheus.StreamServerInterceptor,
		)),
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
			grpc_recovery.UnaryServerInterceptor(),
			grpc_zap.UnaryServerInterceptor(zapLogger),
			grpc_auth.UnaryServerInterceptor(auth),
			grpc_prometheus.UnaryServerInterceptor,
		)),
		grpc.MaxRecvMsgSize(math.MaxInt32))

	grpc_prometheus.EnableHandlingTimeHistogram()
	grpc_prometheus.Register(grpcServer)
	http.Handle("/metrics", promhttp.Handler())
	go func() {
		if err := http.ListenAndServe(":8081", nil); err != nil {
			panic(err)
		}
	}()

	sample.RegisterSampleServiceServer(grpcServer, &sampleService{})

	port := 8080
	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		return
	}
	fmt.Printf("GRPC server started on :%d\n", port)
	grpcServer.Serve(listener)
}

authに渡している関数ではAuthFromMD()でmetadataのauthorizationのトークンを取得しユーザー名に変換してcontextに入れている。

func auth(ctx context.Context) (context.Context, error) {
	token, err := grpc_auth.AuthFromMD(ctx, "basic")
	if err != nil {
		return nil, err
	}
	users := map[string]string{
		"aaaaaa": "sam",
	}
	userName, ok := users[token]
	if !ok {
		return nil, grpc.Errorf(codes.Unauthenticated, "invalid auth token")
	}
	newCtx := context.WithValue(ctx, ctxKeyUserName, userName)
	return newCtx, nil
}

Client

UnaryとStreamのRPCにそれぞれリクエストを送る。

WithPerRPCCredentials()で各RPCでauthorizationがmetadataとして渡るようにしている。 AppendToOutgoingContextで都度含めることもできる。

ctx = metadata.AppendToOutgoingContext(ctx, key, value)

メトリクスは出していないがprometheusのClientのInterceptorを付けている。 このようにIntercpetorによってはClient用のものも用意されている。

package main

import (
	"context"
	"fmt"

	"github.com/grpc-ecosystem/go-grpc-prometheus"
	"github.com/sambaiz/grpc-interceptors/go/sample"
	"google.golang.org/grpc"
)

type cred struct{}

func (c *cred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"authorization": "basic aaaaaa",
	}, nil
}

func (c *cred) RequireTransportSecurity() bool {
	return false
}

func main() {
	conn, err := grpc.Dial(
		"localhost:8080",
		grpc.WithInsecure(),
		grpc.WithPerRPCCredentials(&cred{}),
		grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
		grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
	)
	if err != nil {
		panic(err)
	}
	client := sample.NewSampleServiceClient(conn)

	fmt.Println("- Unary -")
	unaryResult, err := client.SampleUnary(context.Background(), &sample.SampleRequest{
		SampleInput: "hello",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println(unaryResult.SampleOutput)
	streamClient, err := client.SampleStream(context.Background())
	if err != nil {
		panic(err)
	}
	defer streamClient.CloseSend()

	fmt.Println("- Stream -")
	if err := streamClient.Send(&sample.SampleRequest{
		SampleInput: "he",
	}); err != nil {
		panic(err)
	}
	if err := streamClient.Send(&sample.SampleRequest{
		SampleInput: "llo",
	}); err != nil {
		panic(err)
	}
	if err := streamClient.Send(&sample.SampleRequest{
		SampleInput: "panic",
	}); err != nil {
		panic(err)
	}
	for {
		response, err := streamClient.Recv()
		if err != nil {
			fmt.Println(err)
			break
		} else {
			fmt.Println(response.SampleOutput)
		}
	}
}

実行結果

authによってユーザー名がcontextに入っている。 recoveryがrecoverしているのでpanicしてもサーバーは落ちていない。

$ go run client.go
- Unary -
sam
- Stream -
h
e
l
l
o
rpc error: code = Internal desc = received panic

$ go run main.go
- Unary -
sam
...

zapでログが出ている。

GRPC server started on :8080
{"level":"info","ts":1530020624.963142,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","grpc.start_time":"2018-06-26T22:43:44+09:00","system":"grpc","span.kind":"server","grpc.service":"sample.SampleService","grpc.method":"SampleUnary","grpc.code":"OK","grpc.time_ms":0.27799999713897705}
{"level":"info","ts":1530020624.9652505,"caller":"zap/grpclogger.go:41","msg":"transport: loopyWriter.run returning. connection error: desc = \"transport is closing\"","system":"grpc","grpc_log":true}

prometheusのInterceptorでCounterのメトリクスが追加されている。

grpc_server_handled_total{grpc_code="OK",grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary"} 1
grpc_server_msg_sent_total{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 5

EnableHandlingTimeHistogram()でHistogramsのメトリクスも追加されている。

grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.005"} 1
grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.01"} 1
grpc_server_handling_seconds_bucket{grpc_method="SampleUnary",grpc_service="sample.SampleService",grpc_type="unary",le="0.025"} 1
grpc_server_handling_seconds_sum{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 271.267611998
grpc_server_handling_seconds_count{grpc_method="SampleStream",grpc_service="sample.SampleService",grpc_type="bidi_stream"} 1