手记

etcd 源码系列之 gRPC proxy解读 (五)

grpc proxy是一个基于L7层的无状态的gRPC的etcd反向代理服务。这个L7指的是OSI模型中的第七层,会话层。它除了提供etcd client的基本功能之外,同样提供且优化了以下功能:

  • Watch API
    grpc Pproxy提供监听机制,客户端可以监听某个key或者某些key的变更(v2和v3的机制不同,参看后面文章)。用于监听和推送变更。
    它可以将多个客户端(c-watchers)对同一个key的监控合并到一个链接(s-watcher)到 etcd server的请求。同时它会广播从s-watcher收到的时间到所有的c-watchers。

    如上图所示, 3个client对同一个key A的watcher,注册到gRPC proxy中,gRPC proxy会合并生成一个s-watcher 注册到etcd server。

  • lease API
    lease API支持续约机制,客户端通过定时刷新(heartbean)来实现续约(v2和v3的实现机制也不一样)。用于集群监控以及服务注册发现。
    跟上图类似,为了减少etcd server的交互次数,gRPC proxy同样提供了合并功能:

    如上图所示, 3个client注册到gRPC proxy中(c-stream),通过心跳(heartbeat)来定时续约,gRPC proxy会合并生成一个s-stream 注册到etcd server。

  • 缓存请求
    gRPC proxy会缓存来自客户端的请求,保证etcd server 频繁的被客户端请求滥用。

1. 源码实现

1.1 startGRPCProxy解读

func startGRPCProxy(cmd *cobra.Command, args []string) {    //1. 校验参数的合法性
    checkArgs()    //2.判断是否校验 https
    tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)    if tlsinfo == nil && grpcProxyListenAutoTLS {
        host := []string{"https://" + grpcProxyListenAddr}
        dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
        autoTLS, err := transport.SelfCert(dir, host)        if err != nil {
            plog.Fatal(err)
        }
        tlsinfo = &autoTLS
    }    if tlsinfo != nil {
        plog.Infof("ServerTLS: %s", tlsinfo)
    }    //3.生成 cmux路由
    m := mustListenCMux(tlsinfo)    //4. grpc cmux
    grpcl := m.Match(cmux.HTTP2())    defer func() {
        grpcl.Close()
        plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
    }()    //5. 生成一个向etcd服务 注册的stream链接,前面提到的合并链接也就是它产生的。
    client := mustNewClient()    //6. 一些性能和资源监控的封装,如Prometheus,PProf等
    srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
    errc := make(chan error)    //7. grpc 服务
    go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()    //8. http 服务
    go func() { errc <- srvhttp.Serve(httpl) }()    //9. cmux serve
    go func() { errc <- m.Serve() }()    //10.下面这个代码,跟上面srvhttp有些重复,只是它把监控信息给通过grpcProxyMetricsListenAddr给独立了出来
    if len(grpcProxyMetricsListenAddr) > 0 {
        mhttpl := mustMetricsListener(tlsinfo)        go func() {
            mux := http.NewServeMux()
            etcdhttp.HandlePrometheus(mux)
            grpcproxy.HandleHealth(mux, client)
            plog.Fatal(http.Serve(mhttpl, mux))
        }()
    }    // grpc-proxy is initialized, ready to serve
    notifySystemd()

    fmt.Fprintln(os.Stderr, <-errc)
    os.Exit(1)
}

1.2 服务监控

在上面的代码的第6步中和第10步都是在监控服务器的性能,首先在5步中生成了一直指向etcd服务端口的client,然后启动了两个服务:

  • Prometheus:监控服务的资源

  • health: 监控etcd服务的健康状态

以上两个服务在第10步同样能生成,只要指定参数 metrics-addr
大概的效果,我们分别启动两个服务 $ etcd ,这个服务会默认暴露一个2379的服务端口,然后启动:./etcd grpc-proxy start --metrics-addr=http://127.0.0.1:6061 --enable-pprof=true ,这个也会默认的链接2379这个端口。

  • metrics资源监控

    然后我们通过web浏览器分别打开 http://127.0.0.1:23790/metricshttp://127.0.0.1:6061/metrics 出现两个一样的网页,大概的内容是:

    # HELP etcd_debugging_disk_backend_commit_rebalance_duration_seconds The latency distributions of commit.rebalance called by bboltdb backend.
    # TYPE etcd_debugging_disk_backend_commit_rebalance_duration_seconds histogram
    etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.001"} 0
    etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.002"} 0
    .........
    .....
  • health监控(etcd服务健康状况)

    再打开地址 http://127.0.0.1:6061/healthhttp://127.0.0.1:23790/health,会显示以下内容:

    {"health":"true"}

    这个是显示在 --endpoints--discovery-srv中所指定的etcd server是否正常存活。当终止 etcd 服务之后,再次调用地址会返回 false

  • pprof 调试

    上面有个参数是 --enable-pprof=true,当指定该参数的时候,可以打开地址 http://127.0.0.1:23790/debug/pprof/,来分析程序性能。

    /debug/pprof/
    
    profiles:
    0   block
    26  goroutine
    3   heap
    0   mutex
    12  threadcreate
    
    full goroutine stack dump

总结一下,gRPC proxy 默认会在 --listen-addr 监控etcd服务的状态是否正常,同时也可以指定一个 metrics-addr端口来监控服务。强调一下, --enable-pprof 这个参数只有在 listen-addr 这个地址打开才有效。

1.3 gRPC proxy 服务

到这个地方,就到了这个章节最重要的地方,开始介绍整个gRPC proxy所提供的服务API.

通过第7步进入到具体的代码流程中,newGRPCProxyServer,代码如下:

func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {    if grpcProxyEnableOrdering {
        vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
        client.KV = ordering.NewKV(client.KV, vf)
        lg.Info("waiting for linearized read from cluster to recover ordering")        for {
            _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())            if err == nil {                break
            }
            lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
            time.Sleep(time.Second)
        }
    }    if len(grpcProxyNamespace) > 0 {
        client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
        client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
        client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
    }    if len(grpcProxyLeasing) > 0 {
        client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
    }

    kvp, _ := grpcproxy.NewKvProxy(client)
    watchp, _ := grpcproxy.NewWatchProxy(client)    if grpcProxyResolverPrefix != "" {
        grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
    }
    clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
    leasep, _ := grpcproxy.NewLeaseProxy(client)
    mainp := grpcproxy.NewMaintenanceProxy(client)
    authp := grpcproxy.NewAuthProxy(client)
    electionp := grpcproxy.NewElectionProxy(client)
    lockp := grpcproxy.NewLockProxy(client)

    server := grpc.NewServer(
        grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
        grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
        grpc.MaxConcurrentStreams(math.MaxUint32),
    )

    pb.RegisterKVServer(server, kvp)
    pb.RegisterWatchServer(server, watchp)
    pb.RegisterClusterServer(server, clusterp)
    pb.RegisterLeaseServer(server, leasep)
    pb.RegisterMaintenanceServer(server, mainp)
    pb.RegisterAuthServer(server, authp)
    v3electionpb.RegisterElectionServer(server, electionp)
    v3lockpb.RegisterLockServer(server, lockp)    // set zero values for metrics registered for this grpc server
    grpc_prometheus.Register(server)    return server
}

上面主要是通过封装 etcd的client 提供各种服务,当接受到来自用户的请求时,通过复用client连接到etcd服务,后面我们来看一下gRPC proxy所封装的各种服务。
这里提一下grpcProxyEnableOrdering和grpcProxyNamespace两个参数的意义:

  • grpcProxyEnableOrdering: experimental-serializable-ordering
    保证grpc proxy的Revision(版本号)小于或等于etcd服务器之间的Revision(版本号),后面再解释这个版本号

  • grpcProxyNamespace:
    为所有的key请求加上前缀空间

1.4 KvProxy

这个是对etcd client的 kv的封装,通过对其结构体创建方法 Newc(c *clientv3.Client)的,我们可以看出:

func Newc(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
    kv := &kvProxy{
        kv:    c.KV,
        cache: cache.NewCache(cache.DefaultMaxEntries),
    }
    donec := make(chan struct{})    close(donec)    return kv, donec
}

方法上面通过一个 cache 的封装来缓存客户端请求,这个正好印证前面所说的gRPC proxy可以缓存客户端请求。我们来具体的看一下这个cache是如何处理的。

type Cache interface {    ////添加查询请求到缓存中
    Add(req *pb.RangeRequest, resp *pb.RangeResponse)    //从缓存中获取请求结果
    Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
    Compact(revision int64)    //判断缓存是否失效
    Invalidate(key []byte, endkey []byte)    //缓存长度
    Size() int
    Close()
}

cache接口主要提供以上的方法来缓存来子客户端的请求信息,看一下具体的cache类:

// cache implements Cachetype cache struct {
    mu  sync.RWMutex
    lru *lru.Cache    // a reverse index for cache invalidation
    cachedRanges adt.IntervalTree

    compactedRev int64}

上面有个lru的缓存信息(算法为最近最少未使用),同时实现了一个IntervalTree(线段树),用来缓存范围查询,具体的算法可以查看对应的源码。
总的来说就是在client的kv上面封装了一层代码,加上了一层cache。

1.5 WatchProxy

同上,这个也是封装了client的watch API,如上面所示,gRPC proxy 支持watch的链接复用,

type watchProxy struct {
    cw  clientv3.Watcher
    ctx context.Context

    leader *leader

    ranges *watchRanges    // mu protects adding outstanding watch servers through wg.
    mu sync.Mutex    // wg waits until all outstanding watch servers quit.
    wg sync.WaitGroup    // kv is used for permission checking
    kv clientv3.KV
}

上面这个结构体实际上很明显能够告诉我们,如果watchProxy能够复用连接,那一定是在watchRanges中实现的。
我们先看一下watchProxy.Watch方法 由于方法太长,所以我简略的说下步骤:

1. 检查watchProxy是否退出2. wp.wg.Add(1): watch servers +1
3.生成一个watchProxyStream结构体
4.再次判断leader是否丢失链接
5.循环判断watchProxyStream的recvLoop以及sendLoop 方法

首先我们要知道leader的作用是什么?
打开 leader.go 文件中的发现有一个叫 recvLoop() 的方法,**这个方法的作用实际上就是通过对一个key(__lostleader)的监视来定时的判断client是否失效。**
所以实际上我们的真正的业务逻辑在watchProxyStream这个结构体中中。

先来看一下 watchProxyStream.recvLoop()这个方法:

func (wps *watchProxyStream) recvLoop() error {    for {
        req, err := wps.stream.Recv()        if err != nil {            return err
        }        switch uv := req.RequestUnion.(type) {        case *pb.WatchRequest_CreateRequest:
            cr := uv.CreateRequest            if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {                // Return WatchResponse which is caused by permission checking if and only if
                // the error is permission denied. For other errors (e.g. timeout or connection closed),
                // the permission checking mechanism should do nothing for preserving error code.
                wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}                continue
            }

            w := &watcher{
                wr:  watchRange{string(cr.Key), string(cr.RangeEnd)},
                id:  wps.nextWatcherID,
                wps: wps,

                nextrev:  cr.StartRevision,
                progress: cr.ProgressNotify,
                prevKV:   cr.PrevKv,
                filters:  v3rpc.FiltersFromRequest(cr),
            }            if !w.wr.valid() {
                w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})                continue
            }
            wps.nextWatcherID++
            w.nextrev = cr.StartRevision
            wps.watchers[w.id] = w
            wps.ranges.add(w)        case *pb.WatchRequest_CancelRequest:
            wps.delete(uv.CancelRequest.WatchId)        default:            panic("not implemented")
        }
    }
}

首先我们来明确一下 wps.stream这个流是属于与gRPC proxy连接的客户端。所以当收到来自客户端 WatchRequest_CreateRequest 请求时,会创建一个 watcher,同时会在 wps.watchers 以及 wps.ranges 中添加该watcher,并且在收到这个请求的时候用 checkPermissionForWatch 会向etcd server 同时发起一个服务,判断是否允许接入链接。在收到来自etcd server的正确回答之后,会在 rangesadd 这个方法在加载这个watcher之后同样会向etcd server 发起请求,并且得到应答之后会广播,这里面实现了一个broadcast的结构体用来做广播。

上面说的这个ranges实际上是来源于watchProxy这个结构体,而watchers来源于同一个stream中,这说明,对于客户端来说,它同样可以复用stream流来处理watch。

我们总结一下,对一来自于同一个客户端的的watch是它的stream可以复用,不同客户端的链接都会被同一链接复用,gRPC只有在收到来之客户端stream的 WatchRequest_CreateRequest请求的时候才会向 etcd server 发起请求。

1.6 lease proxy

我门先来看一下lease proxy的源码:

type leaseProxy struct {    // leaseClient handles req from LeaseGrant() that requires a lease ID.
    leaseClient pb.LeaseClient

    lessor clientv3.Lease

    ctx context.Context

    leader *leader    // mu protects adding outstanding leaseProxyStream through wg.
    mu sync.RWMutex    // wg waits until all outstanding leaseProxyStream quit.
    wg sync.WaitGroup
}

实际上大致的内容大同小异,leader的作用跟上面类似,leaseClient 和 lessor继承于clientv3,后面介绍。在接收到lease请求的时候,会生成一个 leaseProxyStream 结构体,这个结构体有三个方法 recvLoopsendLoop 和上面类似,同样有一个 keepAliveLoop,该方法是 核心方法,它是通过的一个 TTL的时间定时去跟etcd server 续约。

1.6 其它proxy

gRPC proxy 基本上实现etcd client的那一套API,通过对clientv3.Client的封装

2 使用场景

gRPC proxy 通过对 etcd client 的封装,实现了与etcd连接,不同客户端请求复用同一个client,其目的是为了加少etcd server的负载。这个目的也决定了proxy的使用场景,即降低 etcd 负载。

原文出处

3人推荐
随时随地看视频
慕课网APP