go-micro RPC框架源码分析
最近由于辞职,心想着要好好研究下RPC的实现,于是乎,就拿go-micro开刀了...
首先回顾一下go-micro RPC service的开发和启动流程,以hello world demo为例。
service := micro.NewService( micro.Name("hello_world"), micro.Version("latest"), micro.Metadata(map[string]string{ "type": "helloworld", }), ) # 调用micro.NewService(调用micro.newService)来创建实现micro.Service interface的micro.service service.Init() # 调用micro.service#Init方法 hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld)) service.Run() # 调用micro.service#Run方法
与service启动相关struct和interface
// micro.Service interface// go-micro.gotype Service interface { Init(...Option) Options() Options Client() client.Client Server() server.Server Run() error String() string}// micro.service struct, implements interface micro.Service// service.gotype service struct { opts Options once sync.Once }// micro.Options struct// options.gotype Options struct { Broker broker.Broker Cmd cmd.Cmd Client client.Client Server server.Server // server/server.go, interface, implemented by server/rpc_server.go, rpcServer struct Registry registry.Registry Transport transport.Transport // Register loop interval RegisterInterval time.Duration // Before and After funcs BeforeStart []func() error BeforeStop []func() error AfterStart []func() error AfterStop []func() error // Other options for implementations of the interface // can be stored in a context Context context.Context} // server.Server interface// server/server.gotype Server interface { Options() Options Init(...Option) error Handle(Handler) error NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error Register() error Deregister() error Start() error Stop() error String() string}// server.rpcServer struct, implements interface server.Server// server/rpc_server.gotype rpcServer struct { rpc *server // server.server, server.rpc_service.go, struct exit chan chan error sync.RWMutex opts Options handlers map[string]Handler subscribers map[*subscriber][]broker.Subscriber // used for first registration registered bool // graceful exit wg sync.WaitGroup }// server.server struct// server/rpc_service.go// server represents an RPC Server.type server struct { name string mu sync.Mutex // protects the serviceMap serviceMap map[string]*service reqLock sync.Mutex // protects freeReq freeReq *request respLock sync.Mutex // protects freeResp freeResp *response hdlrWrappers []HandlerWrapper }
整个方法调用过程如图所示:
go-micro service启动流程图
注明: 水平方向表示内部调用,垂直方向表示顺序调用。
根据上面所述流程,我们自己开发的Service是怎么让go-micro知道的呢?以及go-micro如何知道一个服务查询请求应该怎么处理呢?
服务注册
我们在hello_world service中仅仅调用如下代码即可完成我们自己的service的注册,看起来很简单:
hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld))
原来,在我们创建hello_world.proto文件时,micro已经帮我们生成注册Handler方法了。
func RegisterHelloWorldHandler(s server.Server, hdlr HelloWorldHandler, opts ...server.HandlerOption) { s.Handle(s.NewHandler(&HelloWorld{hdlr}, opts...)) }
该方法调用时,我们传入了micro.Service#Server()
(返回值为接口类型server.Server
, 由server.rpcServer
实现)和实现了HelloWorldHandler interface
的对象作为参数,实际上内部通过调用micro.Service#Server()#handle
方法(即server.rpcServer#Handle
):
// server.rpcServer// server/rpc_server.gofunc (s *rpcServer) Handle(h Handler) error { s.Lock() defer s.Unlock() if err := s.rpc.register(h.Handler()); err != nil { return err } s.handlers[h.Name()] = h return nil}
该方法会将我们的Service
(此处也即是h Handler
)添加到server.rpcServer.handlers
map中,Service.Name()
作为key,值为h Handler
对象, s.handlers[h.Name()] = h
。
在启动过程中,我们创建了t := time.NewTicker()
, 该定时器会不断的调用micro.service.opts.Server#Register
, 我们看一下该方法:
// server.rpcServer// server/rpc_server.gofunc (s *rpcServer) Register() error { // parse address for host, port config := s.Options() var advt, host string var port int // check the advertise address first // if it exists then use it, otherwise // use the address if len(config.Advertise) > 0 { advt = config.Advertise } else { advt = config.Address } parts := strings.Split(advt, ":") if len(parts) > 1 { host = strings.Join(parts[:len(parts)-1], ":") port, _ = strconv.Atoi(parts[len(parts)-1]) } else { host = parts[0] } addr, err := addr.Extract(host) if err != nil { return err } // register service node := ®istry.Node{ Id: config.Name + "-" + config.Id, Address: addr, Port: port, Metadata: config.Metadata, } node.Metadata["transport"] = config.Transport.String() node.Metadata["broker"] = config.Broker.String() node.Metadata["server"] = s.String() node.Metadata["registry"] = config.Registry.String() s.RLock() // Maps are ordered randomly, sort the keys for consistency var handlerList []string for n, e := range s.handlers { // Only advertise non internal handlers if !e.Options().Internal { handlerList = append(handlerList, n) } } sort.Strings(handlerList) //...省略部分代码 var endpoints []*registry.Endpoint for _, n := range handlerList { endpoints = append(endpoints, s.handlers[n].Endpoints()...) } //...省略部分代码 s.RUnlock() service := ®istry.Service{ Name: config.Name, Version: config.Version, Nodes: []*registry.Node{node}, Endpoints: endpoints, } s.Lock() registered := s.registered s.Unlock() if !registered { log.Logf("Registering node: %s", node.Id) } // create registry options rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} if err := config.Registry.Register(service, rOpts...); err != nil { return err } // already registered? don't need to register subscribers if registered { return nil } s.Lock() defer s.Unlock() s.registered = true //...省略部分代码 return nil}
调用该server.rpcServer#Register()
,内部会创建一个registry.Service
对象service
,并且设置server.rpcServer.handlers
(类型为server.Handler, 由server.rpcHandler实现)中的Endpoints
,然后通过config.Registry.Register()
向Service Registry
来注册该service
。
作者:zouqilin
链接:https://www.jianshu.com/p/4d71da02ac43