缓冲区为空后关闭“工人”程序

我希望我的 go 例程工作人员(ProcessToDo()在下面的代码中)等到所有“排队”的工作都处理完毕后再关闭。


工作程序有一个“待办事项”通道(缓冲),工作通过该通道发送给它。它有一个“完成”通道告诉它开始关闭。文档说,如果满足多个选择,通道上的选择将选择一个“伪随机值”……这意味着在所有缓冲工作完成之前触发关闭(返回)。


在下面的代码示例中,我希望打印所有 20 条消息...


package main


import (

    "time"

    "fmt"

)



func ProcessToDo(done chan struct{}, todo chan string) {

    for {

        select {

        case work, ok := <-todo:

            if !ok {

                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")

                return

            }

            fmt.Printf("todo: %q\n", work)

            time.Sleep(100 * time.Millisecond)

        case _, ok := <-done:

            if ok {

                fmt.Printf("Shutting down ProcessToDo - done message received!\n")

            } else {

                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")

            }

            close(todo)

            return

        }

    }

}


func main() {


    done := make(chan struct{})

    todo := make(chan string, 100)


    go ProcessToDo(done, todo)


    for i := 0; i < 20; i++ {

        todo <- fmt.Sprintf("Message %02d", i)

    }


    fmt.Println("*** all messages queued ***")

    time.Sleep(1 * time.Second)

    close(done)

    time.Sleep(4 * time.Second)

}


30秒到达战场
浏览 167回答 3
3回答

Smart猫小萌

done在您的情况下,频道是完全没有必要的,因为您可以通过关闭todo频道本身来发出关闭信号。并for range在通道上使用,它将迭代直到通道关闭且其缓冲区为空。你应该有一个done通道,但只是为了让 goroutine 本身可以发出它完成工作的信号,以便主 goroutine 可以继续或退出。这个变体与你的等价,更简单,不需要time.Sleep()调用等待其他 goroutines(这将太错误和不确定)。在Go Playground上试试:func ProcessToDo(done chan struct{}, todo chan string) {&nbsp; &nbsp; for work := range todo {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("todo: %q\n", work)&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(100 * time.Millisecond)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")&nbsp; &nbsp; done <- struct{}{} // Signal that we processed all jobs}func main() {&nbsp; &nbsp; done := make(chan struct{})&nbsp; &nbsp; todo := make(chan string, 100)&nbsp; &nbsp; go ProcessToDo(done, todo)&nbsp; &nbsp; for i := 0; i < 20; i++ {&nbsp; &nbsp; &nbsp; &nbsp; todo <- fmt.Sprintf("Message %02d", i)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("*** all messages queued ***")&nbsp; &nbsp; close(todo)&nbsp; &nbsp; <-done // Wait until the other goroutine finishes all jobs}还要注意,worker goroutines 应该使用信号完成,defer这样如果主 goroutine 以某种意外的方式返回或恐慌,它就不会卡在等待 worker 中。所以它应该像这样开始:defer func() {&nbsp; &nbsp; done <- struct{}{} // Signal that we processed all jobs}()您还可以使用sync.WaitGroup将主 goroutine 同步到工作程序(以等待它)。事实上,如果你打算使用多个工作 goroutines,那比从done通道读取多个值更干净。此外,发出完成信号更简单,WaitGroup因为它有一个Done()方法(这是一个函数调用),因此您不需要匿名函数:defer wg.Done()有关完整示例,请参阅JimB 的 anwserWaitGroup。使用for range信道同步,因此你不需要任何额外的代码,将同步访问:如果你想使用多工作够程也是地道的todo通道或在收到该职位。如果您关闭 中的todo通道main(),这将正确地向所有工作程序 goroutine 发出信号。但当然,所有排队的作业都将被接收和处理一次。现在采用WaitGroup 用于使主 goroutine 等待 worker的变体(JimB 的回答):如果您想要 1 个以上的 worker goroutine 怎么办;并发(并且很可能是并行)处理您的工作?您唯一需要在代码中添加/更改的是:真正启动其中的多个:for i := 0; i < 10; i++ {&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go ProcessToDo(todo)}无需更改任何其他内容,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutines 接收和处理您的作业。并且我们没有使用任何“丑陋” time.Sleep()(我们使用了一个但只是为了模拟慢速处理,而不是等待其他 goroutine),并且您不需要任何额外的同步。

慕莱坞森

让通道的使用者关闭它通常是一个坏主意,因为在关闭的通道上发送是一种恐慌。在这种情况下,如果您不想在所有消息发送之前中断消费者,只需使用for...range循环并在完成后关闭通道。您还需要一个类似 a 的信号WaitGroup来等待 goroutine 完成(而不是使用 time.Sleep)http://play.golang.org/p/r97vRPsxEbvar wg sync.WaitGroupfunc ProcessToDo(todo chan string) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for work := range todo {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("todo: %q\n", work)&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(100 * time.Millisecond)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")}func main() {&nbsp; &nbsp; todo := make(chan string, 100)&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go ProcessToDo(todo)&nbsp; &nbsp; for i := 0; i < 20; i++ {&nbsp; &nbsp; &nbsp; &nbsp; todo <- fmt.Sprintf("Message %02d", i)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("*** all messages queued ***")&nbsp; &nbsp; close(todo)&nbsp; &nbsp; wg.Wait()}

拉莫斯之舞

我认为接受的答案对于这个特定的例子非常有效。然而,要回答“缓冲区为空后关闭“工人”程序”的问题 - 一个更优雅的解决方案是可能的。worker 可以在缓冲区为空时返回,而无需通过关闭通道来发出信号。如果工作人员需要处理的任务数量未知,这将特别有用。在这里查看:https : //play.golang.org/p/LZ1y0eIRMeSpackage mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time"&nbsp; &nbsp; "math/rand")func main() {&nbsp; &nbsp; rand.Seed(time.Now().UnixNano())&nbsp; &nbsp; ch := make(chan interface{}, 10)&nbsp; &nbsp; go worker(ch)&nbsp; &nbsp; for i := 1; i <= rand.Intn(9) + 1; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- i&nbsp; &nbsp; }&nbsp; &nbsp; blocker := make(chan interface{})&nbsp; &nbsp; <-blocker}func worker(ch chan interface{}){&nbsp; &nbsp;&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case msg := <- ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("msg: ", msg)&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("exiting worker")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp;}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go