将 Google PubSub 与 Golang 结合使用。轮询服务的最有效(成本)方式

我们正在从 AMQP 迁移到 Google 的 Pubsub。


文档表明pull 可能是我们的最佳选择,因为我们正在使用计算引擎并且无法打开我们的工作人员以通过推送服务接收。


它还表示拉动可能会根据使用情况产生额外费用:


如果使用轮询,如果您频繁打开连接并立即关闭它们,可能会导致高网络使用率。


我们在 go 中创建了一个测试订阅者,它在循环中运行,如下所示:


func main() {

    jsonKey, err := ioutil.ReadFile("pubsub-key.json")

    if err != nil {

        log.Fatal(err)

    }

    conf, err := google.JWTConfigFromJSON(

        jsonKey,

        pubsub.ScopeCloudPlatform,

        pubsub.ScopePubSub,

    )

    if err != nil {

        log.Fatal(err)

    }

    ctx := cloud.NewContext("xxx", conf.Client(oauth2.NoContext))


    msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{

        Data: []byte("hello world"),

    })


    if err != nil {

        log.Println(err)

    }


    log.Printf("Published a message with a message id: %s\n", msgIDs[0])


    for {

        msgs, err := pubsub.Pull(ctx, "subscription1", 1)

        if err != nil {

            log.Println(err)

        }


        if len(msgs) > 0 {

            log.Printf("New message arrived: %v, len: %d\n", msgs[0].ID, len(msgs))

            if err := pubsub.Ack(ctx, "subscription1", msgs[0].AckID); err != nil {

                log.Fatal(err)

            }

            log.Println("Acknowledged message")

            log.Printf("Message: %s", msgs[0].Data)

        }

    }

}

不过,我的问题实际上是这是否是提取消息的正确/推荐方式。


我们全天每秒收到大约 100 条消息。我不确定在无限循环中运行它是否会让我们破产并且找不到任何其他像样的 go 示例。


繁花如伊
浏览 191回答 1
1回答

慕姐8265434

一般而言,在 Cloud Pub/Sub 中拉取订阅者的关键是确保您始终至少有几个未完成的拉取请求,并且 max_messages 设置为适合以下情况的值:您发布消息的速度,这些消息的大小,以及您的订阅者可以处理消息的速率。一旦拉取请求返回,您应该发出另一个请求。这意味着异步处理和确认在拉取响应中返回给您的消息(或异步启动新的拉取请求)。如果您发现吞吐量或延迟不是您所期望的,那么首先要做的是添加更多并发拉取请求。如果您的发布率极低,则“如果使用轮询,如果您频繁打开连接并立即关闭它们可能会导致高网络使用率”这一声明适用。想象一下,您一天只发布两到三条消息,但您不断轮询拉取请求。这些拉取请求中的每一个都会产生发出请求的成本,但是除了少数实际收到消息的情况外,您不会得到任何消息进行处理,因此“每条消息的成本”相当高。如果您以相当稳定的速度发布并且您的拉取请求返回非零数量的消息,那么网络使用和成本将与消息速率一致。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go