如何在 Go 中执行并发下载

我们有一个流程,用户可以通过该流程请求我们需要从源头获取的文件。此来源不是最可靠的,因此我们使用 Amazon SQS 实施了一个队列。我们将下载 URL 放入队列中,然后使用我们用 Go 编写的一个小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的 S3。一旦所有这些都完成,它会回调一个服务,该服务将通过电子邮件通知用户,让他们知道文件已准备就绪。


最初我写这个是为了创建n 个通道,然后将 1 个 go-routine 附加到每个通道,并使 go-routine 处于无限循环中。这样我就可以确保我一次只处理固定数量的下载。


我意识到这不是应该使用通道的方式,如果我现在理解正确的话,实际上应该有一个带有n 个go-routines 的通道在该通道上接收。每个 go-routine 都处于无限循环中,等待一条消息,当它收到消息时,它将处理数据,做它应该做的一切,当它完成时,它将等待下一条消息。这让我可以确保我一次只处理n 个文件。我认为这是正确的做法。我相信这是扇出,对吧?


我并不需要做的,是要合并这些进程重新走到一起。下载完成后,它会回调远程服务,以便处理剩余的过程。该应用程序无需执行任何其他操作。


好的,所以一些代码:


func main() {

    queue, err := ConnectToQueue() // This works fine...

    if err != nil {

        log.Fatalf("Could not connect to queue: %s\n", err)

    }


    msgChannel := make(chan sqs.Message, 10)


    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {

        go processMessage(msgChannel, queue)

    }


    for {

        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)


        for _, m := range response.Messages {

            msgChannel <- m

        }

    }

}


func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {

    for {

        m := <-ch

        // Do something with message m


        // Delete message from queue when we're done

        queue.DeleteMessage(&m)

    }

}

我在这附近的任何地方吗?我有n 个正在运行的 go-routines(其中MAX_CONCURRENT_ROUTINES= n)并且在循环中我们将继续将消息传递到单个通道。这是正确的方法吗?我需要关闭任何东西还是我可以无限期地运行它?


我注意到的一件事是 SQS 正在返回消息,但是一旦我将 10 条消息传入processMessage()(10 条是通道缓冲区的大小),实际上没有进一步处理消息。


慕哥6287543
浏览 161回答 1
1回答

汪汪一只猫

那看起来不错。一些注意事项:您可以通过限制您生成的工作程序例程数量以外的方式来限制工作并行度。例如,您可以为收到的每条消息创建一个 goroutine,然后让生成的 goroutine 等待限制并行度的信号量。当然有权衡取舍,但您不仅限于您所描述的方式。sem := make(chan struct{}, n)work := func(m sqs.Message) {&nbsp; &nbsp; sem <- struct{}{} // When there's room we can proceed&nbsp; &nbsp; // do the work&nbsp; &nbsp; <-sem // Free room in the channel}()for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {&nbsp; &nbsp; for _, m0 := range m {&nbsp; &nbsp; &nbsp; &nbsp; go work(m0)&nbsp; &nbsp; }}仅处理 10 条消息的限制是由堆栈中的其他地方造成的。可能您正在看到前 10 个填充通道的竞赛,然后工作没有完成,或者您可能不小心从工作程序例程中返回。如果您的员工按照您所描述的模式坚持不懈,您将希望确定他们不会回来。不清楚您是否希望在处理了一定数量的消息后返回该进程。如果您确实希望此进程退出,则需要等待所有工作人员完成其当前任务,并可能在之后通知他们返回。看看sync.WaitGroup同步他们的完成,并有另一个通道来表示没有更多的工作,或者 close msgChannel,并在你的工作人员中处理。(看看二元组返回通道接收表达式。)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go