猿问

golang 中的事件驱动模式

我正在使用 golang 来实现一个简单的事件驱动工作者。就像这样:


  go func() {

        for {

            select {

            case data := <-ch:

                time.Sleep(1)

                someGlobalMap[data.key] = data.value 

            }

        }

    }()

main 函数会创建几个 goroutines,每个 goroutines 都会做这样的事情:


ch <- data

fmt.Println(someGlobalMap[data.key])

如您所见,由于我的工作人员需要一些时间来完成工作,我的主要功能将得到 nil 结果。如何正确控制此工作流程?


慕斯王
浏览 372回答 1
1回答

慕盖茨4494581

编辑:我可能误读了您的问题,我看到您提到 main 将启动许多生产者goroutine。我认为这是许多消费者goroutine 和一个生产者。在这里留下答案,以防它对寻找该模式的其他人有用,尽管项目符号仍然适用于您的案例。因此,如果我正确理解您的用例,您就不能指望在频道上发送并在之后立即阅读结果。您不知道工作人员何时会处理该发送,您需要在 goroutine 之间进行通信,这是通过通道完成的。假设仅调用具有返回值的函数在您的场景中不起作用,如果您确实需要发送给工作人员,然后阻塞直到获得结果,您可以发送通道作为数据结构的一部分,然后阻塞-发送后接收它,即:resCh := make(chan Result)ch <- Data{key, value, resCh}res := <- resCh但是您可能应该尝试将工作分解为独立步骤的管道,请参阅我在原始答案中链接到的博客文章。我认为它是单个生产者的原始答案- 多个消费者/工人模式:这是 Go 的 goroutine 和通道语义非常适合的常见模式。您需要记住以下几点:main 函数不会自动等待 goroutine 完成。如果在 main 中没有其他事情可做,那么程序将退出并且您没有结果。您使用的全局映射不是线程安全的。您需要通过互斥锁同步访问,但有一个更好的方法 - 使用输出通道获取结果,该通道已经同步。您可以在通道上使用 for..range,并且可以在多个 goroutine 之间安全地共享通道。正如我们将看到的,这使得这个模式写起来非常优雅。游乐场: https: //play.golang.org/p/WqyZfwldqp有关 Go 管道和并发模式的更多信息,介绍错误处理、提前取消等:https ://blog.golang.org/pipelines您提到的用例的注释代码:// could be a command-line flag, a config, etc.const numGoros = 10// Data is a similar data structure to the one mentioned in the question.type Data struct {&nbsp; &nbsp; key&nbsp; &nbsp;string&nbsp; &nbsp; value int}func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; // create the input channel that sends work to the goroutines&nbsp; &nbsp; inch := make(chan Data)&nbsp; &nbsp; // create the output channel that sends results back to the main function&nbsp; &nbsp; outch := make(chan Data)&nbsp; &nbsp; // the WaitGroup keeps track of pending goroutines, you can add numGoros&nbsp; &nbsp; // right away if you know how many will be started, otherwise do .Add(1)&nbsp; &nbsp; // each time before starting a worker goroutine.&nbsp; &nbsp; wg.Add(numGoros)&nbsp; &nbsp; for i := 0; i < numGoros; i++ {&nbsp; &nbsp; &nbsp; &nbsp; // because it uses a closure, it could've used inch and outch automaticaly,&nbsp; &nbsp; &nbsp; &nbsp; // but if the func gets bigger you may want to extract it to a named function,&nbsp; &nbsp; &nbsp; &nbsp; // and I wanted to show the directed channel types: within that function, you&nbsp; &nbsp; &nbsp; &nbsp; // can only receive from inch, and only send (and close) to outch.&nbsp; &nbsp; &nbsp; &nbsp; //&nbsp; &nbsp; &nbsp; &nbsp; // It also receives the index i, just for fun so it can set the goroutines'&nbsp; &nbsp; &nbsp; &nbsp; // index as key in the results, to show that it was processed by different&nbsp; &nbsp; &nbsp; &nbsp; // goroutines. Also, big gotcha: do not capture a for-loop iteration variable&nbsp; &nbsp; &nbsp; &nbsp; // in a closure, pass it as argument, otherwise it very likely won't do what&nbsp; &nbsp; &nbsp; &nbsp; // you expect.&nbsp; &nbsp; &nbsp; &nbsp; go func(i int, inch <-chan Data, outch chan<- Data) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // make sure WaitGroup.Done is called on exit, so Wait unblocks&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // eventually.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // range over a channel gets the next value to process, safe to share&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // concurrently between all goroutines. It exits the for loop once&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the channel is closed and drained, so wg.Done will be called once&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // ch is closed.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for data := range inch {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // process the data...&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(10 * time.Millisecond)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; outch <- Data{strconv.Itoa(i), data.value}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(i, inch, outch)&nbsp; &nbsp; }&nbsp; &nbsp; // start the goroutine that prints the results, use a separate WaitGroup to track&nbsp; &nbsp; // it (could also have used a "done" channel but the for-loop would be more complex, with a select).&nbsp; &nbsp; var wgResults sync.WaitGroup&nbsp; &nbsp; wgResults.Add(1)&nbsp; &nbsp; go func(ch <-chan Data) {&nbsp; &nbsp; &nbsp; &nbsp; defer wgResults.Done()&nbsp; &nbsp; &nbsp; &nbsp; // to prove it processed everything, keep a counter and print it on exit&nbsp; &nbsp; &nbsp; &nbsp; var n int&nbsp; &nbsp; &nbsp; &nbsp; for data := range ch {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(data.key, data.value)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; n++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // for fun, try commenting out the wgResults.Wait() call at the end, the output&nbsp; &nbsp; &nbsp; &nbsp; // will likely miss this line.&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(">>> Processed: ", n)&nbsp; &nbsp; }(outch)&nbsp; &nbsp; // send work, wherever that comes from...&nbsp; &nbsp; for i := 0; i < 1000; i++ {&nbsp; &nbsp; &nbsp; &nbsp; inch <- Data{"main", i}&nbsp; &nbsp; }&nbsp; &nbsp; // when there's no more work to send, close the inch, so the goroutines will begin&nbsp; &nbsp; // draining it and exit once all values have been processed.&nbsp; &nbsp; close(inch)&nbsp; &nbsp; // wait for all goroutines to exit&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; // at this point, no more results will be written to outch, close it to signal&nbsp; &nbsp; // to the results goroutine that it can terminate.&nbsp; &nbsp; close(outch)&nbsp; &nbsp; // and wait for the results goroutine to actually exit, otherwise the program would&nbsp; &nbsp; // possibly terminate without printing the last few values.&nbsp; &nbsp; wgResults.Wait()}在实际场景中,工作量无法提前知道,通道内的关闭可能来自例如 SIGINT 信号。只要确保在通道关闭后没有代码路径可以发送工作,因为这会导致恐慌。
随时随地看视频慕课网APP

相关分类

Go
我要回答