grpc proxy是一个基于L7层的无状态的gRPC的etcd反向代理服务。这个L7指的是OSI模型中的第七层,会话层。它除了提供etcd client的基本功能之外,同样提供且优化了以下功能:
Watch API
grpc Pproxy提供监听机制,客户端可以监听某个key或者某些key的变更(v2和v3的机制不同,参看后面文章)。用于监听和推送变更。
它可以将多个客户端(c-watchers)对同一个key的监控合并到一个链接(s-watcher)到 etcd server的请求。同时它会广播从s-watcher收到的时间到所有的c-watchers。如上图所示, 3个client对同一个key A的watcher,注册到gRPC proxy中,gRPC proxy会合并生成一个s-watcher 注册到etcd server。
lease API
lease API支持续约机制,客户端通过定时刷新(heartbean)来实现续约(v2和v3的实现机制也不一样)。用于集群监控以及服务注册发现。
跟上图类似,为了减少etcd server的交互次数,gRPC proxy同样提供了合并功能:如上图所示, 3个client注册到gRPC proxy中(c-stream),通过心跳(heartbeat)来定时续约,gRPC proxy会合并生成一个s-stream 注册到etcd server。
缓存请求
gRPC proxy会缓存来自客户端的请求,保证etcd server 频繁的被客户端请求滥用。
1. 源码实现
1.1 startGRPCProxy解读
func startGRPCProxy(cmd *cobra.Command, args []string) { //1. 校验参数的合法性 checkArgs() //2.判断是否校验 https tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) if tlsinfo == nil && grpcProxyListenAutoTLS { host := []string{"https://" + grpcProxyListenAddr} dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy") autoTLS, err := transport.SelfCert(dir, host) if err != nil { plog.Fatal(err) } tlsinfo = &autoTLS } if tlsinfo != nil { plog.Infof("ServerTLS: %s", tlsinfo) } //3.生成 cmux路由 m := mustListenCMux(tlsinfo) //4. grpc cmux grpcl := m.Match(cmux.HTTP2()) defer func() { grpcl.Close() plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) }() //5. 生成一个向etcd服务 注册的stream链接,前面提到的合并链接也就是它产生的。 client := mustNewClient() //6. 一些性能和资源监控的封装,如Prometheus,PProf等 srvhttp, httpl := mustHTTPListener(m, tlsinfo, client) errc := make(chan error) //7. grpc 服务 go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }() //8. http 服务 go func() { errc <- srvhttp.Serve(httpl) }() //9. cmux serve go func() { errc <- m.Serve() }() //10.下面这个代码,跟上面srvhttp有些重复,只是它把监控信息给通过grpcProxyMetricsListenAddr给独立了出来 if len(grpcProxyMetricsListenAddr) > 0 { mhttpl := mustMetricsListener(tlsinfo) go func() { mux := http.NewServeMux() etcdhttp.HandlePrometheus(mux) grpcproxy.HandleHealth(mux, client) plog.Fatal(http.Serve(mhttpl, mux)) }() } // grpc-proxy is initialized, ready to serve notifySystemd() fmt.Fprintln(os.Stderr, <-errc) os.Exit(1) }
1.2 服务监控
在上面的代码的第6步中和第10步都是在监控服务器的性能,首先在5步中生成了一直指向etcd服务端口的client,然后启动了两个服务:
Prometheus:监控服务的资源
health: 监控etcd服务的健康状态
以上两个服务在第10步同样能生成,只要指定参数 metrics-addr
。
大概的效果,我们分别启动两个服务 $ etcd
,这个服务会默认暴露一个2379的服务端口,然后启动:./etcd grpc-proxy start --metrics-addr=http://127.0.0.1:6061 --enable-pprof=true
,这个也会默认的链接2379这个端口。
metrics资源监控
然后我们通过web浏览器分别打开
http://127.0.0.1:23790/metrics
和http://127.0.0.1:6061/metrics
出现两个一样的网页,大概的内容是:# HELP etcd_debugging_disk_backend_commit_rebalance_duration_seconds The latency distributions of commit.rebalance called by bboltdb backend. # TYPE etcd_debugging_disk_backend_commit_rebalance_duration_seconds histogram etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.001"} 0 etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.002"} 0 ......... .....
health监控(etcd服务健康状况)
再打开地址
http://127.0.0.1:6061/health
和http://127.0.0.1:23790/health
,会显示以下内容:{"health":"true"}
这个是显示在
--endpoints
或--discovery-srv
中所指定的etcd server是否正常存活。当终止etcd
服务之后,再次调用地址会返回false
。pprof 调试
上面有个参数是
--enable-pprof=true
,当指定该参数的时候,可以打开地址http://127.0.0.1:23790/debug/pprof/
,来分析程序性能。/debug/pprof/ profiles: 0 block 26 goroutine 3 heap 0 mutex 12 threadcreate full goroutine stack dump
总结一下,gRPC proxy 默认会在 --listen-addr
监控etcd服务的状态是否正常,同时也可以指定一个 metrics-addr
端口来监控服务。强调一下, --enable-pprof
这个参数只有在 listen-addr
这个地址打开才有效。
1.3 gRPC proxy 服务
到这个地方,就到了这个章节最重要的地方,开始介绍整个gRPC proxy所提供的服务API.
通过第7步进入到具体的代码流程中,newGRPCProxyServer
,代码如下:
func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) client.KV = ordering.NewKV(client.KV, vf) lg.Info("waiting for linearized read from cluster to recover ordering") for { _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly()) if err == nil { break } lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err)) time.Sleep(time.Second) } } if len(grpcProxyNamespace) > 0 { client.KV = namespace.NewKV(client.KV, grpcProxyNamespace) client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace) client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace) } if len(grpcProxyLeasing) > 0 { client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing) } kvp, _ := grpcproxy.NewKvProxy(client) watchp, _ := grpcproxy.NewWatchProxy(client) if grpcProxyResolverPrefix != "" { grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL) } clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix) leasep, _ := grpcproxy.NewLeaseProxy(client) mainp := grpcproxy.NewMaintenanceProxy(client) authp := grpcproxy.NewAuthProxy(client) electionp := grpcproxy.NewElectionProxy(client) lockp := grpcproxy.NewLockProxy(client) server := grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), grpc.MaxConcurrentStreams(math.MaxUint32), ) pb.RegisterKVServer(server, kvp) pb.RegisterWatchServer(server, watchp) pb.RegisterClusterServer(server, clusterp) pb.RegisterLeaseServer(server, leasep) pb.RegisterMaintenanceServer(server, mainp) pb.RegisterAuthServer(server, authp) v3electionpb.RegisterElectionServer(server, electionp) v3lockpb.RegisterLockServer(server, lockp) // set zero values for metrics registered for this grpc server grpc_prometheus.Register(server) return server }
上面主要是通过封装 etcd的client 提供各种服务,当接受到来自用户的请求时,通过复用client连接到etcd服务,后面我们来看一下gRPC proxy所封装的各种服务。
这里提一下grpcProxyEnableOrdering和grpcProxyNamespace两个参数的意义:
grpcProxyEnableOrdering: experimental-serializable-ordering
保证grpc proxy的Revision(版本号)小于或等于etcd服务器之间的Revision(版本号),后面再解释这个版本号grpcProxyNamespace:
为所有的key请求加上前缀空间
1.4 KvProxy
这个是对etcd client的 kv的封装,通过对其结构体创建方法 Newc(c *clientv3.Client)
的,我们可以看出:
func Newc(c *clientv3.Client) (pb.KVServer, <-chan struct{}) { kv := &kvProxy{ kv: c.KV, cache: cache.NewCache(cache.DefaultMaxEntries), } donec := make(chan struct{}) close(donec) return kv, donec }
方法上面通过一个 cache
的封装来缓存客户端请求,这个正好印证前面所说的gRPC proxy可以缓存客户端请求。我们来具体的看一下这个cache是如何处理的。
type Cache interface { ////添加查询请求到缓存中 Add(req *pb.RangeRequest, resp *pb.RangeResponse) //从缓存中获取请求结果 Get(req *pb.RangeRequest) (*pb.RangeResponse, error) Compact(revision int64) //判断缓存是否失效 Invalidate(key []byte, endkey []byte) //缓存长度 Size() int Close() }
cache接口主要提供以上的方法来缓存来子客户端的请求信息,看一下具体的cache类:
// cache implements Cachetype cache struct { mu sync.RWMutex lru *lru.Cache // a reverse index for cache invalidation cachedRanges adt.IntervalTree compactedRev int64}
上面有个lru的缓存信息(算法为最近最少未使用),同时实现了一个IntervalTree(线段树),用来缓存范围查询,具体的算法可以查看对应的源码。
总的来说就是在client的kv上面封装了一层代码,加上了一层cache。
1.5 WatchProxy
同上,这个也是封装了client的watch API,如上面所示,gRPC proxy 支持watch的链接复用,
type watchProxy struct { cw clientv3.Watcher ctx context.Context leader *leader ranges *watchRanges // mu protects adding outstanding watch servers through wg. mu sync.Mutex // wg waits until all outstanding watch servers quit. wg sync.WaitGroup // kv is used for permission checking kv clientv3.KV }
上面这个结构体实际上很明显能够告诉我们,如果watchProxy能够复用连接,那一定是在watchRanges中实现的。
我们先看一下watchProxy.Watch方法 由于方法太长,所以我简略的说下步骤:
1. 检查watchProxy是否退出2. wp.wg.Add(1): watch servers +1 3.生成一个watchProxyStream结构体 4.再次判断leader是否丢失链接 5.循环判断watchProxyStream的recvLoop以及sendLoop 方法
首先我们要知道leader的作用是什么?
打开 leader.go
文件中的发现有一个叫 recvLoop()
的方法,**这个方法的作用实际上就是通过对一个key(__lostleader)的监视来定时的判断client是否失效。**
所以实际上我们的真正的业务逻辑在watchProxyStream这个结构体中中。
先来看一下 watchProxyStream.recvLoop()这个方法:
func (wps *watchProxyStream) recvLoop() error { for { req, err := wps.stream.Recv() if err != nil { return err } switch uv := req.RequestUnion.(type) { case *pb.WatchRequest_CreateRequest: cr := uv.CreateRequest if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied { // Return WatchResponse which is caused by permission checking if and only if // the error is permission denied. For other errors (e.g. timeout or connection closed), // the permission checking mechanism should do nothing for preserving error code. wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true} continue } w := &watcher{ wr: watchRange{string(cr.Key), string(cr.RangeEnd)}, id: wps.nextWatcherID, wps: wps, nextrev: cr.StartRevision, progress: cr.ProgressNotify, prevKV: cr.PrevKv, filters: v3rpc.FiltersFromRequest(cr), } if !w.wr.valid() { w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true}) continue } wps.nextWatcherID++ w.nextrev = cr.StartRevision wps.watchers[w.id] = w wps.ranges.add(w) case *pb.WatchRequest_CancelRequest: wps.delete(uv.CancelRequest.WatchId) default: panic("not implemented") } } }
首先我们来明确一下 wps.stream这个流是属于与gRPC proxy连接的客户端。所以当收到来自客户端 WatchRequest_CreateRequest
请求时,会创建一个 watcher
,同时会在 wps.watchers
以及 wps.ranges
中添加该watcher,并且在收到这个请求的时候用 checkPermissionForWatch
会向etcd server 同时发起一个服务,判断是否允许接入链接。在收到来自etcd server的正确回答之后,会在 ranges
中 add
这个方法在加载这个watcher之后同样会向etcd server 发起请求,并且得到应答之后会广播,这里面实现了一个broadcast的结构体用来做广播。
上面说的这个ranges实际上是来源于watchProxy这个结构体,而watchers来源于同一个stream中,这说明,对于客户端来说,它同样可以复用stream流来处理watch。
我们总结一下,对一来自于同一个客户端的的watch是它的stream可以复用,不同客户端的链接都会被同一链接复用,gRPC只有在收到来之客户端stream的 WatchRequest_CreateRequest
请求的时候才会向 etcd server 发起请求。
1.6 lease proxy
我门先来看一下lease proxy的源码:
type leaseProxy struct { // leaseClient handles req from LeaseGrant() that requires a lease ID. leaseClient pb.LeaseClient lessor clientv3.Lease ctx context.Context leader *leader // mu protects adding outstanding leaseProxyStream through wg. mu sync.RWMutex // wg waits until all outstanding leaseProxyStream quit. wg sync.WaitGroup }
实际上大致的内容大同小异,leader的作用跟上面类似,leaseClient 和 lessor继承于clientv3,后面介绍。在接收到lease请求的时候,会生成一个 leaseProxyStream
结构体,这个结构体有三个方法 recvLoop
和 sendLoop
和上面类似,同样有一个 keepAliveLoop
,该方法是 核心方法,它是通过的一个 TTL的时间定时去跟etcd server 续约。
1.6 其它proxy
gRPC proxy 基本上实现etcd client的那一套API,通过对clientv3.Client的封装。
2 使用场景
gRPC proxy 通过对 etcd client 的封装,实现了与etcd连接,不同客户端请求复用同一个client,其目的是为了加少etcd server的负载。这个目的也决定了proxy的使用场景,即降低 etcd
负载。