我正在尝试从谷歌发布订阅中的主题中获取所有可用消息。但是在前进中,我无法找到一个配置,一旦Pub-Sub中没有更多消息剩余,就可以取消接收回调。
我认为一种方法是使用Google云监控API从Pub-Sub获取消息总数,该答案中描述了Google PubSub - 计算主题中的消息数,然后保留已读消息数的计数,如果计数等于该数字,则调用取消,但我不太确定这是否是正确的方法。
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
我也尝试过使用超时的上下文,即在取消之后,直到不满足此上下文截止日期为止。
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}
但话又说回来,这并不能确定所有消息都已得到处理。
请建议一个可以确保该订阅的解决方案。当 Pub-Sub 中不再有剩余消息时,接收功能将停止。
慕桂英546537
相关分类