我的 grpc 服务器流功能出现了一个有趣的错误,我正在抓狂。通过阅读 grpc godoc 或在线其他地方无法找到任何可能的原因。希望更熟悉 Go 和 grpc 流的人能够为我指明正确的方向。
我的实现尝试遵循 grpc.io 网站上的基本 Server-Streaming 示例。
有问题的protobuf定义:
service ArrayBasedCache {
...
rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}
}
message GetRecordRequest {
string key = 1;
}
message MessageResponse {
string message = 1;
}
grpc 服务器处理程序:
func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error {
key := req.GetKey()
if ctlr.inputChannels[key] == nil {
return errors.New("Requested record has expired")
}
msgs, e1 := ctlr.client.ReadArrayRecord(key)
if e1 != nil {
panic(e1)
}
log.Printf("Messages: %v", msgs)
for i := 0; i < len(msgs); i++ {
log.Printf("trying to write message: %v", msgs[i])
if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil {
log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs))
panic(e2)
}
}
return nil
}
我的 grpc 客户端实现:
func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) {
req := &svcgrpc.GetRecordRequest{Key: key}
ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
defer cancelFunc()
resp, err := s.grpcClient.GetRecord(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) {
out := []string{}
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, errors.New("Unexpected error reading stream")
}
out = append(out, msg.Message)
}
return out, nil
}
蝴蝶刀刀
RISEBY
相关分类