我们有一个流程,用户可以通过该流程请求我们需要从源头获取的文件。此来源不是最可靠的,因此我们使用 Amazon SQS 实施了一个队列。我们将下载 URL 放入队列中,然后使用我们用 Go 编写的一个小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的 S3。一旦所有这些都完成,它会回调一个服务,该服务将通过电子邮件通知用户,让他们知道文件已准备就绪。
最初我写这个是为了创建n 个通道,然后将 1 个 go-routine 附加到每个通道,并使 go-routine 处于无限循环中。这样我就可以确保我一次只处理固定数量的下载。
我意识到这不是应该使用通道的方式,如果我现在理解正确的话,实际上应该有一个带有n 个go-routines 的通道在该通道上接收。每个 go-routine 都处于无限循环中,等待一条消息,当它收到消息时,它将处理数据,做它应该做的一切,当它完成时,它将等待下一条消息。这让我可以确保我一次只处理n 个文件。我认为这是正确的做法。我相信这是扇出,对吧?
我并不需要做的,是要合并这些进程重新走到一起。下载完成后,它会回调远程服务,以便处理剩余的过程。该应用程序无需执行任何其他操作。
好的,所以一些代码:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
我在这附近的任何地方吗?我有n 个正在运行的 go-routines(其中MAX_CONCURRENT_ROUTINES= n)并且在循环中我们将继续将消息传递到单个通道。这是正确的方法吗?我需要关闭任何东西还是我可以无限期地运行它?
我注意到的一件事是 SQS 正在返回消息,但是一旦我将 10 条消息传入processMessage()(10 条是通道缓冲区的大小),实际上没有进一步处理消息。
汪汪一只猫
相关分类