go-micro请求处理过程
上文分析了我们自己创建的service(handler)是如何注册的,那么go-micro又如何知道一个服务查询请求应该怎么处理呢?
上文提到我们添加的service存放在server.rpcServer.handlers
map(类型map[string]server.Handler
)中。下面看一下相关的数据结构。
// server.Handler// server.handler.gotype Handler interface { Name() string Handler() interface{} Endpoints() []*registry.Endpoint Options() HandlerOptions }// server.rpcHandler, implements server.Handler interface// server.rpc_handler.gotype rpcHandler struct { name string handler interface{} endpoints []*registry.Endpoint opts HandlerOptions }
接上文go-micro service启动流程
,启动后go-micro不断接受用户的远程调用请求,然后调用server.server#ServeRequest
来处理请求,接下来流程如下图所示。
go-micro response for request
注明: 水平方向表示内部调用,垂直方向表示顺序调用。
其中需要说明的是service, mtype, ... := server.server#readRequest()
调用,该方法调用中从请求头中解析出请求的服务service以及请求的方法method(var mtype, reflect类型)。
然后调用mtype.method.Func()
完成远程方法原型的调用。
那么问题又来了,远程调用客户端如何知道自己需要的服务在那台机器上呢?或者说客户端是如何实现远程过程调用的呢?
回顾一下hello_world client的远程调用过程:
type helloWorldClient struct { c client.Client // micro.client.Client interface, implemented by micro.client.rpcClient serviceName string } service := micro.NewService( micro.Name("hello_world"), micro.Version("latest"), micro.Metadata(map[string]string{ "type": "helloworld", }), ) service.Init() greeter := hello_world.NewHelloWorldClient("hello_world", service.Client()) rsp, err := greeter.Hello(context.TODO(), &hello_world.HelloWorldRequest{Name: "Alice"}) if err != nil { fmt.Println(err) return }
与server端不一样的地方就是,初始化后我们会获取一个服务的client对象(类型为helloWorldClient
, 该结构体中有一个c micro.client.Client
,用来执行真正的方法调用)。
greeter := hello_world.NewHelloWorldClient("hello_world", service.Client())
紧接着我们会使用该client
对象greeter
进行远程过程调用:
rsp, err := greeter.Hello(context.TODO(), &hello_world.HelloWorldRequest{Name: "Alice"})
该方法具体实现如下:
func (c *helloWorldClient) Hello(ctx context.Context, in *HelloWorldRequest, opts ...client.CallOption) (*HelloWorldResponse, error) { req := c.c.NewRequest(c.serviceName, "HelloWorld.Hello", in) out := new(HelloWorldResponse) err := c.c.Call(ctx, req, out, opts...) if err != nil { return nil, err } return out, nil}
在该方法中,我们会创建一个micro.client.Request
(类型micro.client.rpcRequest
,实现interface micro.client.Request
接口)对象,并且传入service_name
, 远程方法名"HelloWorld.Hello"
作为参数,然后通过micro.client.rpcClient#call()
执行远程方法调用。
该方法具体实现如下:
// micro/go-micro/client/rpc_client.gofunc (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { // make a copy of call opts callOpts := r.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // get next nodes from the selector next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := WithRequestTimeout(d.Sub(time.Now())) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method rcall := r.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { rcall = callOpts.CallWrappers[i-1](rcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // set the address address := node.Address if node.Port > 0 { address = fmt.Sprintf("%s:%d", address, node.Port) } // make the call err = rcall(ctx, address, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err } ch := make(chan error, callOpts.Retries) var gerr error for i := 0; i < callOpts.Retries; i++ { go func() { ch <- call(i) }() select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, request, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }
可知该方法的第一步即是向service registry
进行服务查询:
r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
然后执行到目标service所在address
的远程过程调用。
至此,分析结束。
题外话: Dubbo和go-micro的服务注册的区别。
Dubbo服务注册和发现
服务容器负责启动,加载,运行服务提供者。
服务提供者在启动时,向注册中心注册自己提供的服务。
服务消费者在启动时,向注册中心订阅自己所需的服务。
注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
go-micro服务注册和发现
启动
service registry
服务(etcd, consul)服务提供者向
service registry
进行服务注册服务消费者向
service registry
进行服务查询service registry
返回服务消费者查询服务地址服务消费者想服务所在地址发起远程过程调用
作者:zouqilin
链接:https://www.jianshu.com/p/b6e68821fbd3