使用Go阅读来自谷歌酒吧订阅的所有可用消息

我正在尝试从谷歌发布订阅中的主题中获取所有可用消息。但是在前进中,我无法找到一个配置,一旦Pub-Sub中没有更多消息剩余,就可以取消接收回调。

我认为一种方法是使用Google云监控API从Pub-Sub获取消息总数,该答案中描述了Google PubSub - 计算主题中的消息数,然后保留已读消息数的计数,如果计数等于该数字,则调用取消,但我不太确定这是否是正确的方法。

var mu sync.Mutex

    received := 0

    sub := client.Subscription(subID)

    cctx, cancel := context.WithCancel(ctx)

    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {

            mu.Lock()

            defer mu.Unlock()

            fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))

            msg.Ack()

            received++

            if received == TotalNumberOfMessages {

                    cancel()

            }

    })

    if err != nil {

            return fmt.Errorf("Receive: %v", err)

    }

我也尝试过使用超时的上下文,即在取消之后,直到不满足此上下文截止日期为止。


ctx, cancel := context.WithTimeout(ctx, 100*time.Second)

defer cancel()

err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {

}

但话又说回来,这并不能确定所有消息都已得到处理。


请建议一个可以确保该订阅的解决方案。当 Pub-Sub 中不再有剩余消息时,接收功能将停止。


翻阅古今
浏览 97回答 1
1回答

慕桂英546537

我已经在我以前的公司中实现了它(可悲的是,我不再有代码,它是在我以前的公司git中...)。然而,它的工作原理。原则如下msg := make(chan *pubsub.Message, 1)sub := client.Subscription(subID)cctx, cancel := context.WithCancel(ctx)go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {&nbsp; &nbsp; msg <- m&nbsp; &nbsp; })for {&nbsp; select {&nbsp; &nbsp; case res := <-msg:&nbsp; &nbsp; &nbsp; fmt.Fprintf(w, "Got message: %q\n", string(res.Data))&nbsp; &nbsp; &nbsp; res.Ack()&nbsp;&nbsp;&nbsp; &nbsp; case <-time.After(3 * time.Second):&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("timeout")&nbsp; &nbsp; &nbsp; &nbsp; cancel()&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go