Golang grpc ServerStream 抛出错误:传输:流已完成或已调用

我的 grpc 服务器流功能出现了一个有趣的错误,我正在抓狂。通过阅读 grpc godoc 或在线其他地方无法找到任何可能的原因。希望更熟悉 Go 和 grpc 流的人能够为我指明正确的方向。


我的实现尝试遵循 grpc.io 网站上的基本 Server-Streaming 示例。


有问题的protobuf定义:


service ArrayBasedCache {

  ...

  rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}

}


message GetRecordRequest {

  string key = 1;

}


message MessageResponse {

  string message = 1;

}


grpc 服务器处理程序:


func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error {

    key := req.GetKey()

    if ctlr.inputChannels[key] == nil {

        return errors.New("Requested record has expired")

    }

    msgs, e1 := ctlr.client.ReadArrayRecord(key)

    if e1 != nil {

        panic(e1)

    }

    log.Printf("Messages: %v", msgs)

    for i := 0; i < len(msgs); i++ {

        log.Printf("trying to write message: %v", msgs[i])

        if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil {

            log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs))

            panic(e2)

        }

    }

    return nil

}


我的 grpc 客户端实现:


func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) {

    req := &svcgrpc.GetRecordRequest{Key: key}

    ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)

    defer cancelFunc()

    resp, err := s.grpcClient.GetRecord(ctx, req)

    if err != nil {

        return nil, err

    }

    return resp, nil

}


func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) {

    out := []string{}

    for {

        msg, err := stream.Recv()

        if err == io.EOF {

            break

        }

        if err != nil {

            return nil, errors.New("Unexpected error reading stream")

        }

        out = append(out, msg.Message)

    }

    return out, nil

}


撒科打诨
浏览 115回答 2
2回答

蝴蝶刀刀

我相信问题出在GetRecordgrpc 客户端实现中,特别是您正在使用的上下文实现。通过在同一方法中使用context.WithTimeout和调用,您基本上是在从方法defer cancelFunc()返回之前关闭流。GetRecord如果您仍想使用context.WithTimeout实现,请不要使用方法cancelFunc内部GetRecord,而是cancelFunc从它返回或传递ctxtoGetRecord方法。

RISEBY

由于流意味着长时间运行 - 实时返回结果 - 通常您不希望对流结果设置截止日期。如果您想对Dial建立流连接的操作设置超时,您可以通过DialOption WithTimeout执行此操作:grpc.Dial(addr, grpc.WithTimeout(defaultTimeout))或使用DialContext:ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)defer cancelFunc()grpc.DialContext(ctx, addr)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go