如何从阻塞 Recv() 调用中解楔出 gRPC 双向流服务器?

在 golang 的 gRPC 中提供双向流时,规范流处理程序如下所示:


func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {

    for {

        data, err := stream.Recv()

        if err == io.EOF {

            return nil // clean close

        }

        if err != nil {

            return err // some other error

        }

        // do things with data here

    }

}

具体来说,当双向 RPC 的处理程序返回时,即认为服务器端关闭的信号。

这是一个同步编程模型 - 服务器保持阻塞在此 goroutine(由 grpc 库创建)中,同时等待来自客户端的消息。

现在,我想取消阻止这个 Recv() 调用(最终在底层 grpc 上调用 RecvMsg()。服务器流,) 并返回/关闭流,因为服务器进程已决定使用此客户端完成此操作。

不幸的是,我找不到明显的方法来做到这一点:

  • 在为我的服务生成的双向服务器接口上没有关闭()或关闭发送()或关闭()或类似关闭的功能

  • 流中的上下文,我可以通过流获得。上下文(),不公开用户可访问的取消函数

  • 我找不到一种方法来传递“起始侧”的上下文,以便 grpc 接受新连接。服务器,我可以注入自己的取消功能

我可以关闭整个加仑。服务器通过调用 Stop(),但这不是我想做的 -- 只有这个特定的客户端连接 (grpc.服务器流)应已完成。

我可以向客户端发送一条消息,使客户端依次关闭连接。但是,如果客户端从网络上掉下来,这将不起作用,这将通过超时来解决,超时必须很长才能通常健壮。我现在想要它,因为我不耐烦,更重要的是,在规模上,悬而未决的无响应客户可能是一个高昂的成本。

我可以(也许)挖掘出grpc。服务器流与反射,直到我找到传输流,然后从中挖出取消函数并调用它。或者挖遍溪流。Context() 与反射,并制作我自己的取消函数引用来调用。对于未来的维护者来说,这些似乎都不是很好的建议。

但这些肯定不是唯一的选择吗?决定一个特定的客户不再需要连接并不是神奇的太空外星人科学。如何关闭此流,以便 Recv() 从服务器进程端调用 un-block,而不涉及到客户端的往返?


噜噜哒
浏览 87回答 1
1回答

弑天下

不幸的是,我不认为有一个很好的方法来做你所要求的。根据您的目标,我认为您有两种选择:在戈鲁廷中运行 Recv,并在需要时从 bidi 处理程序返回。这将关闭上下文并取消阻止 Recv。这显然是次优的,因为它需要小心,因为您现在有代码在处理程序的执行范围之外执行。然而,这似乎是我能找到的最接近的答案。如果您试图通过设置超时来减轻行为异常的客户端的影响,则可以使用 Keepalive 执行策略和/或 KeepaliveParams 将此工作卸载到框架中。如果这符合您希望关闭连接的原因,则这可能是可取的,但除此之外没有多大用处。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java