当前位置:网站首页>grpc系列-初探grpc 路由注册和转发实现

grpc系列-初探grpc 路由注册和转发实现

2022-08-09 10:51:00 ase2014

grpc

  1. grpc将按照系列来讲,本次博客,从大体来讲,主要包括路由注册、路由转发、unary和stream的差异点
  2. 后面主要会讲述优雅停机、binlog、channelz、HTTP2.0特性、intercepter、proto(包含编码、以及代码生成)

总结

  1. google实现的rpc框架,开发者定义proto文件,即接口文件,通过命令生成具体代码
  2. 路由通过method识别,前面一截是service name、后面一截是method name,通过.作为分隔
  3. unary是定义好整个请求流程,业务只需要执行自身的method即可
  4. stream是method里面自身去调用RecvMsg和SendMsg,自主性和灵活性高点
  5. 查看接口定义,搜索RegisterService即可,*.pb.go里面定义了service

struct介绍

type Server struct {
    opts serverOptions // 支持通过option方式定制的参数

    // 跟cv一起使用,是创建cv的入参
    mu       sync.Mutex // guards following
    // 支持多个listener
    lis      map[net.Listener]bool
    conns    map[transport.ServerTransport]bool
    // 确认是否开始serve,调用Serve函数,会设置该值为true
    serve    bool
    drain    bool // 连接是否耗尽,优雅停机的时候会将drain设置成true
    // 用作优雅停机,当前
    cv       *sync.Cond              // signaled when connections close for GracefulStop
    // 定义service,注入到services里面
    services map[string]*serviceInfo // service name -> service info
    // 跟踪请求,用作打印,通过设置EnableTracing为true启动埋点
    events   trace.EventLog

    // 退出监听,里面使用sync的Once函数,保证只有一次调用成功
    quit               *grpcsync.Event
    // 确认退出是否执行完成
    done               *grpcsync.Event
    // Stop和优雅停机的地方会调用,会设置curState的值,保证只执行一次,用作开启和关闭跟踪事件trace event
    channelzRemoveOnce sync.Once
    // 用作跟踪serve,调用Server时,会Add,退出时,会Done,Stop和GracefulStop会Wait
    serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

    channelzID int64 // channelz unique identification number
    czData     *channelzData // 记录调用记录信息的,如果channelz开启的话
    // 可以通过rpc接口获取,Channelz_ServiceDesc

    // 多个worker进行工作,每个chan用作消息传递
    serverWorkerChannels []chan *serverWorkerData
}
// 提供给外面注册使用的,通过Server里的register方法注册到Server里的serviceInfo
type ServiceDesc struct {
    // 定义的service name
    ServiceName string
    // The pointer to the service interface. Used to check whether the user
    // provided implementation satisfies the interface requirements.
    // 用作判断是否实现该接口,在注册的时候会通过反射检查
    HandlerType interface{}
    // 包含的method和stream
    // 定义的method
    Methods     []MethodDesc
    // 流式的方法
    Streams     []StreamDesc
    // 通用的metadata
    Metadata    interface{}
}
// 对应ServiceDesc,serviceInfo是在Server里面services
type serviceInfo struct {
    // Contains the implementation for the methods in this service.
    serviceImpl interface{}
    methods     map[string]*MethodDesc
    streams     map[string]*StreamDesc
    mdata       interface{}
}
type MethodDesc struct {
    // 方法名
    MethodName string
    // 对应的处理函数
    Handler    methodHandler
}

外置参数

type serverOptions struct {
    creds                 credentials.TransportCredentials
    codec                 baseCodec
    cp                    Compressor // 压缩算法
    dc                    Decompressor // 解压缩算法
    unaryInt              UnaryServerInterceptor
    streamInt             StreamServerInterceptor
    chainUnaryInts        []UnaryServerInterceptor
    chainStreamInts       []StreamServerInterceptor
    inTapHandle           tap.ServerInHandle
    // 进入的时候调用一次,返回的时候调用一次,用于trace使用
    statsHandler          stats.Handler
    maxConcurrentStreams  uint32
    maxReceiveMessageSize int
    maxSendMessageSize    int
    unknownStreamDesc     *StreamDesc
    keepaliveParams       keepalive.ServerParameters
    keepalivePolicy       keepalive.EnforcementPolicy
    initialWindowSize     int32
    initialConnWindowSize int32
    writeBufferSize       int
    readBufferSize        int
    connectionTimeout     time.Duration
    maxHeaderListSize     *uint32
    headerTableSize       *uint32
    numServerWorkers      uint32
}

在transport层的代表

type Stream struct {
    id           uint32
    st           ServerTransport    // nil for client side Stream Server端使用的,客户端为nil
    ct           *http2Client       // nil for server side Stream client端使用的,服务端为nil
    ctx          context.Context    // the associated context of the stream 跟stream关联的ctx
    cancel       context.CancelFunc // always nil for client side Stream 对于客户端大部分时间为nil
    done         chan struct{}      // closed at the end of stream to unblock writers. On the client side. 客户端使用
    doneFunc     func()             // invoked at the end of stream on client side. 客户端使用
    ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance) 服务端使用的
    method       string             // the associated RPC method of the stream
    // header里面的grpc-encoding,有gzip, deflate, snappy、proto等取值,在grpc的encoding目录下面有gzip、proto两种
    recvCompress string 

    sendCompress string
    buf          *recvBuffer
    trReader     io.Reader
    fc           *inFlow
    wq           *writeQuota

    // Callback to state application's intentions to read data. This
    // is used to adjust flow control, if needed.
    requestRead func(int)

    headerChan       chan struct{} // closed to indicate the end of header metadata.
    headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
    // headerValid indicates whether a valid header was received.  Only
    // meaningful after headerChan is closed (always call waitOnHeader() before
    // reading its value).  Not valid on server side.
    headerValid bool

    // hdrMu protects header and trailer metadata on the server-side.
    hdrMu sync.Mutex
    // On client side, header keeps the received header metadata.
    //
    // On server side, header keeps the header set by SetHeader(). The complete
    // header will merged into this after t.WriteHeader() is called.
    header  metadata.MD
    trailer metadata.MD // the key-value map of trailer metadata.

    noHeaders bool // set if the client never received headers (set only after the stream is done).

    // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
    headerSent uint32

    state streamState

    // On client-side it is the status error received from the server.
    // On server-side it is unused.
    status *status.Status

    bytesReceived uint32 // indicates whether any bytes have been received on this stream
    unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream

    // contentSubtype is the content-subtype for requests.
    // this must be lowercase or the behavior is undefined.
    contentSubtype string
}

路由注册

注册函数

Server的RegisterService提供对外注册service的能力

  1. 通过reflect检查传入的service是否实现HandlerType
  2. 然后注册service和method,内部有一个锁,因此支持并发

路由转发

  1. 多个worker进行工作,利用channel进行异步工作
func (s *Server) initServerWorkers() {
    s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
    for i := uint32(0); i < s.opts.numServerWorkers; i++ {
        s.serverWorkerChannels[i] = make(chan *serverWorkerData)
        go s.serverWorker(s.serverWorkerChannels[i])
    }
}
  1. Server的handleStream函数进行路由转发任务
    • 从Stream获取method,然后进行解析,grpc使用service/method的方式进行区分,最后一个/进行分割,
      前面是service name,后一个是method
    • 解析出service和method,先解析出services,然后获取对应的method
    • 优先UnaryRPC:processUnaryRPC,然后是StreamingRPC:processStreamingRPC
    • 支持设置unknownDesc,即没有对应路由的handler

UnaryRPC与StreamRPC

差异点

  1. Unary已经封装好了request的解析和response回写,业务放只要实现自身业务即可,不用管response的回写和request的处理
  2. Stream将对response和request的处理定义成接口ServerStream,业务放自身调用这些函数,serverStream实现该接口

UnaryRPC

  1. channelz的跟踪记录
  2. binlog记录信息
  3. 获取composer和decomposer进行请求request的解析
  4. 执行method,即业务逻辑
  5. 会写response和status
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
    // 执行stat handler,进入和返回的时候都会执行,返回通过defer执行
    sh := s.opts.statsHandler
    if sh != nil || trInfo != nil || channelz.IsOn() {
        if channelz.IsOn() {
            s.incrCallsStarted()
        }
        var statsBegin *stats.Begin
        if sh != nil {
            beginTime := time.Now()
            statsBegin = &stats.Begin{
                BeginTime: beginTime,
            }
            sh.HandleRPC(stream.Context(), statsBegin)
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(&trInfo.firstLine, false)
        }
        // The deferred error handling for tracing, stats handler and channelz are
        // combined into one function to reduce stack usage -- a defer takes ~56-64
        // bytes on the stack, so overflowing the stack will require a stack
        // re-allocation, which is expensive.
        //
        // To maintain behavior similar to separate deferred statements, statements
        // should be executed in the reverse order. That is, tracing first, stats
        // handler second, and channelz last. Note that panics *within* defers will
        // lead to different behavior, but that's an acceptable compromise; that
        // would be undefined behavior territory anyway.
        defer func() {
            if trInfo != nil {
                if err != nil && err != io.EOF {
                    trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
                    trInfo.tr.SetError()
                }
                trInfo.tr.Finish()
            }

            if sh != nil {
                end := &stats.End{
                    BeginTime: statsBegin.BeginTime,
                    EndTime:   time.Now(),
                }
                if err != nil && err != io.EOF {
                    end.Error = toRPCErr(err)
                }
                sh.HandleRPC(stream.Context(), end)
            }

            if channelz.IsOn() {
                if err != nil && err != io.EOF {
                    s.incrCallsFailed()
                } else {
                    s.incrCallsSucceeded()
                }
            }
        }()
    }

    // 按照method进行日志记录,主要是client header信息
    binlog := binarylog.GetMethodLogger(stream.Method())
    if binlog != nil {
        ctx := stream.Context()
        md, _ := metadata.FromIncomingContext(ctx)
        logEntry := &binarylog.ClientHeader{
            Header:     md,
            MethodName: stream.Method(),
            PeerAddr:   nil,
        }
        if deadline, ok := ctx.Deadline(); ok {
            logEntry.Timeout = time.Until(deadline)
            if logEntry.Timeout < 0 {
                logEntry.Timeout = 0
            }
        }
        if a := md[":authority"]; len(a) > 0 {
            logEntry.Authority = a[0]
        }
        if peer, ok := peer.FromContext(ctx); ok {
            logEntry.PeerAddr = peer.Addr
        }
        binlog.Log(logEntry)
    }

    // comp and cp are used for compression.  decomp and dc are used for
    // decompression.  If comp and decomp are both set, they are the same;
    // however they are kept separate to ensure that at most one of the
    // compressor/decompressor variable pairs are set for use later.
    var comp, decomp encoding.Compressor
    var cp Compressor
    var dc Decompressor
    // 主要获取压缩算法,compressor和decompressor,从header里面获取
    // If dc is set and matches the stream's compression, use it.  Otherwise, try
    // to find a matching registered compressor for decomp.
    if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
        dc = s.opts.dc
    } else if rc != "" && rc != encoding.Identity {
        decomp = encoding.GetCompressor(rc)
        if decomp == nil {
            st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
            t.WriteStatus(stream, st)
            return st.Err()
        }
    }

    // If cp is set, use it.  Otherwise, attempt to compress the response using
    // the incoming message compression method.
    //
    // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
    if s.opts.cp != nil {
        cp = s.opts.cp
        stream.SetSendCompress(cp.Type())
    } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
        // Legacy compressor not specified; attempt to respond with same encoding.
        comp = encoding.GetCompressor(rc)
        if comp != nil {
            stream.SetSendCompress(rc)
        }
    }

    var payInfo *payloadInfo
    if sh != nil || binlog != nil {
        payInfo = &payloadInfo{}
    }
    // 获取原始数据, 通过df unmarshal到v,调用函数就可以获取request信息了
    d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
    if err != nil {
        if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
            channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
        }
        return err
    }
    if channelz.IsOn() {
        t.IncrMsgRecv()
    }
    df := func(v interface{}) error {
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
            return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
        }
        if sh != nil {
            sh.HandleRPC(stream.Context(), &stats.InPayload{
                RecvTime:   time.Now(),
                Payload:    v,
                WireLength: payInfo.wireLength + headerLen,
                Data:       d,
                Length:     len(d),
            })
        }
        if binlog != nil {
            binlog.Log(&binarylog.ClientMessage{
                Message: d,
            })
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
        }
        return nil
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    // 进入函数处理流程
    // df,用于获取request信息,通过marshal的方式
    // md的service接口
    // ctx 里面包含stream结构体,已经创建server里面包含的unary intercept
    reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
    if appErr != nil {
        // FromError的判断方式值得学习,通过直接定义interface的方式判断,不用通过reflect的方式去判断
        // if se, ok := err.(interface {
        //     GRPCStatus() *Status
        // }); ok {
        //     return se.GRPCStatus(), true
        // }
        appStatus, ok := status.FromError(appErr)
        if !ok {
            // Convert appErr if it is not a grpc status error.
            appErr = status.Error(codes.Unknown, appErr.Error())
            appStatus, _ = status.FromError(appErr)
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
            trInfo.tr.SetError()
        }
        // 会写status code
        if e := t.WriteStatus(stream, appStatus); e != nil {
            channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
        }
        if binlog != nil {
            if h, _ := stream.Header(); h.Len() > 0 {
                // Only log serverHeader if there was header. Otherwise it can
                // be trailer only.
                binlog.Log(&binarylog.ServerHeader{
                    Header: h,
                })
            }
            binlog.Log(&binarylog.ServerTrailer{
                Trailer: stream.Trailer(),
                Err:     appErr,
            })
        }
        return appErr
    }
    if trInfo != nil {
        trInfo.tr.LazyLog(stringer("OK"), false)
    }
    opts := &transport.Options{Last: true}

    // 回写response
    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
        if err == io.EOF {
            // The entire stream is done (for unary RPC only).
            return err
        }
        if sts, ok := status.FromError(err); ok {
            if e := t.WriteStatus(stream, sts); e != nil {
                channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
            }
        } else {
            switch st := err.(type) {
            case transport.ConnectionError:
                // Nothing to do here.
            default:
                panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
            }
        }
        if binlog != nil {
            h, _ := stream.Header()
            binlog.Log(&binarylog.ServerHeader{
                Header: h,
            })
            binlog.Log(&binarylog.ServerTrailer{
                Trailer: stream.Trailer(),
                Err:     appErr,
            })
        }
        return err
    }
    if binlog != nil {
        h, _ := stream.Header()
        binlog.Log(&binarylog.ServerHeader{
            Header: h,
        })
        binlog.Log(&binarylog.ServerMessage{
            Message: reply,
        })
    }
    if channelz.IsOn() {
        t.IncrMsgSent()
    }
    if trInfo != nil {
        trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
    }
    // TODO: Should we be logging if writing status failed here, like above?
    // Should the logging be in WriteStatus?  Should we ignore the WriteStatus
    // error or allow the stats handler to see it?
    err = t.WriteStatus(stream, statusOK)
    if binlog != nil {
        binlog.Log(&binarylog.ServerTrailer{
            Trailer: stream.Trailer(),
            Err:     appErr,
        })
    }
    return err
}

processStreamingRPC

func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
    // 记录channelz,同unary
    if channelz.IsOn() {
        s.incrCallsStarted()
    }
    // 处理stat handler,同unary
    sh := s.opts.statsHandler
    var statsBegin *stats.Begin
    if sh != nil {
        beginTime := time.Now()
        statsBegin = &stats.Begin{
            BeginTime: beginTime,
        }
        sh.HandleRPC(stream.Context(), statsBegin)
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    // 与unary的不同点,unary是通过df函数,获取request参数,而stream是通过
    // 实现ServerStream接口,传入method,业务方自己调用ServerStream接口
    ss := &serverStream{
        ctx:                   ctx,
        t:                     t,
        s:                     stream,
        p:                     &parser{r: stream},
        codec:                 s.getCodec(stream.ContentSubtype()),
        maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
        maxSendMessageSize:    s.opts.maxSendMessageSize,
        trInfo:                trInfo,
        statsHandler:          sh,
    }

    // 同unary,记录metrics和stat的HandleRPC
    if sh != nil || trInfo != nil || channelz.IsOn() {
        // See comment in processUnaryRPC on defers.
        defer func() {
            if trInfo != nil {
                ss.mu.Lock()
                if err != nil && err != io.EOF {
                    ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
                    ss.trInfo.tr.SetError()
                }
                ss.trInfo.tr.Finish()
                ss.trInfo.tr = nil
                ss.mu.Unlock()
            }

            if sh != nil {
                end := &stats.End{
                    BeginTime: statsBegin.BeginTime,
                    EndTime:   time.Now(),
                }
                if err != nil && err != io.EOF {
                    end.Error = toRPCErr(err)
                }
                sh.HandleRPC(stream.Context(), end)
            }

            if channelz.IsOn() {
                if err != nil && err != io.EOF {
                    s.incrCallsFailed()
                } else {
                    s.incrCallsSucceeded()
                }
            }
        }()
    }

    // 记录binlog
    ss.binlog = binarylog.GetMethodLogger(stream.Method())
    if ss.binlog != nil {
        md, _ := metadata.FromIncomingContext(ctx)
        logEntry := &binarylog.ClientHeader{
            Header:     md,
            MethodName: stream.Method(),
            PeerAddr:   nil,
        }
        if deadline, ok := ctx.Deadline(); ok {
            logEntry.Timeout = time.Until(deadline)
            if logEntry.Timeout < 0 {
                logEntry.Timeout = 0
            }
        }
        if a := md[":authority"]; len(a) > 0 {
            logEntry.Authority = a[0]
        }
        if peer, ok := peer.FromContext(ss.Context()); ok {
            logEntry.PeerAddr = peer.Addr
        }
        ss.binlog.Log(logEntry)
    }

    // 获取压缩算法的composer和decomposer
    // If dc is set and matches the stream's compression, use it.  Otherwise, try
    // to find a matching registered compressor for decomp.
    if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
        ss.dc = s.opts.dc
    } else if rc != "" && rc != encoding.Identity {
        ss.decomp = encoding.GetCompressor(rc)
        if ss.decomp == nil {
            st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
            t.WriteStatus(ss.s, st)
            return st.Err()
        }
    }

    // If cp is set, use it.  Otherwise, attempt to compress the response using
    // the incoming message compression method.
    //
    // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
    if s.opts.cp != nil {
        ss.cp = s.opts.cp
        stream.SetSendCompress(s.opts.cp.Type())
    } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
        // Legacy compressor not specified; attempt to respond with same encoding.
        ss.comp = encoding.GetCompressor(rc)
        if ss.comp != nil {
            stream.SetSendCompress(rc)
        }
    }

    if trInfo != nil {
        trInfo.tr.LazyLog(&trInfo.firstLine, false)
    }
    var appErr error
    var server interface{}
    if info != nil {
        server = info.serviceImpl
    }
    // 调用业务放定义的method,有stream intercept和没有优点差异
    // 没有,直接调用Handler,有则把Handler传入intercept
    // intercept来调用Handler
    if s.opts.streamInt == nil {
        appErr = sd.Handler(server, ss)
    } else {
        info := &StreamServerInfo{
            FullMethod:     stream.Method(),
            IsClientStream: sd.ClientStreams,
            IsServerStream: sd.ServerStreams,
        }
        appErr = s.opts.streamInt(server, ss, info, sd.Handler)
    }
    // 回写status
    if appErr != nil {
        appStatus, ok := status.FromError(appErr)
        if !ok {
            appStatus = status.New(codes.Unknown, appErr.Error())
            appErr = appStatus.Err()
        }
        if trInfo != nil {
            ss.mu.Lock()
            ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
            ss.trInfo.tr.SetError()
            ss.mu.Unlock()
        }
        t.WriteStatus(ss.s, appStatus)
        if ss.binlog != nil {
            ss.binlog.Log(&binarylog.ServerTrailer{
                Trailer: ss.s.Trailer(),
                Err:     appErr,
            })
        }
        // TODO: Should we log an error from WriteStatus here and below?
        return appErr
    }
    if trInfo != nil {
        ss.mu.Lock()
        ss.trInfo.tr.LazyLog(stringer("OK"), false)
        ss.mu.Unlock()
    }
    err = t.WriteStatus(ss.s, statusOK)
    if ss.binlog != nil {
        ss.binlog.Log(&binarylog.ServerTrailer{
            Trailer: ss.s.Trailer(),
            Err:     appErr,
        })
    }
    return err
}

serverStream

实现ServerStream接口

type serverStream struct {
    ctx   context.Context
    t     transport.ServerTransport
    s     *transport.Stream
    p     *parser
    codec baseCodec

    cp     Compressor
    dc     Decompressor
    comp   encoding.Compressor
    decomp encoding.Compressor

    maxReceiveMessageSize int
    maxSendMessageSize    int
    trInfo                *traceInfo

    statsHandler stats.Handler

    binlog *binarylog.MethodLogger
    // serverHeaderBinlogged indicates whether server header has been logged. It
    // will happen when one of the following two happens: stream.SendHeader(),
    // stream.Send().
    //
    // It's only checked in send and sendHeader, doesn't need to be
    // synchronized.
    serverHeaderBinlogged bool

    mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
原网站

版权声明
本文为[ase2014]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u014763610/article/details/117175046