猿问

如何在 grpc 中正确设计发布-订阅模式?

我正在尝试使用 grpc 实现 pub sub 模式,但我对如何正确执行它有点困惑。


我的原型: rpc call (google.protobuf.Empty) returns (stream Data);


客户:


asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {

         @Override

         public void onNext(Data value) {

           // process a data


         @Override

         public void onError(Throwable t) {


         }


         @Override

         public void onCompleted() {


         }

       });


   } catch (StatusRuntimeException e) {

     LOG.warn("RPC failed: {}", e.getStatus());

   }


   Thread.currentThread().join();

服务器服务:


public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {

  private final BlockingQueue<Data> queue;

  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();


  public Sender(BlockingQueue<Data> queue) {

    this.queue = queue;

  }


  @Override

  public void data(Empty request, StreamObserver<Data> responseObserver) {

    observers.add(responseObserver);

  }


  @Override

  public void run() {

    while (!Thread.currentThread().isInterrupted()) {

      try {

        // waiting for first element

        Data data = queue.take();

        // send head element

        observers.forEach(o -> o.onNext(data));


      } catch (InterruptedException e) {

        LOG.error("error: ", e);

        Thread.currentThread().interrupt();

      }

    }

  }

}


如何正确地从全局观察者中删除客户?连接断开时如何接收某种信号?

如何管理客户端-服务器重新连接?连接断开时如何强制客户端重新连接?


喵喵时光机
浏览 213回答 1
1回答

慕虎7371278

在实施您的服务时:&nbsp; @Override&nbsp; public void data(Empty request, StreamObserver<Data> responseObserver) {&nbsp; &nbsp; observers.add(responseObserver);&nbsp; }您需要获取当前请求的上下文,并监听取消。对于单请求、多响应调用(又名服务器流),gRPC 生成的代码被简化为直接传递请求。这意味着您无法直接访问底层ServerCall.Listener,这就是您通常如何监听客户端断开和取消的方式。相反,每个 gRPC 调用都有一个Context与之关联的,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察器。至于重新连接:如果连接断开,gRPC 客户端会自动重新连接,但通常不会重试 RPC,除非这样做是安全的。在服务器流式 RPC 的情况下,这样做通常不安全,因此您需要直接在客户端上重试 RPC。
随时随地看视频慕课网APP

相关分类

Python
我要回答