猿问

来自函数的递归调用以 goroutine 和惯用方式开始

我正在使用 goroutines 实现一个(某种)组合回溯算法。我的问题可以表示为具有一定程度/分布的树,我想访问每片叶子并根据所采用的路径计算结果。在给定的级别上,我想生成 goroutines 来同时处理子问题,即如果我有一个度数为 3 的树并且我想在级别 2 之后开始并发,我会生成 3*3=9 goroutines 继续处理子问题并发。


func main() {

    cRes := make(chan string, 100)

    res := []string{}

    numLevels := 5

    spread := 3

    startConcurrencyAtLevel := 2

    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)

    for {

        select {

        case r := <-cRes:

            res = append(res, r)

        case <-time.After(10 * time.Second):

            ft.Println("Caculation timed out")

            fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))

            return

        }

    }

}


func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {

    if len(path) == maxLevels {

        // some longer running task here associated with the found path, also using a lookup table

        // real problem actually returns not the path but the result if it satisfies some condition

        cRes <- path

        return

    }

    for i := 1; i <= spread; i++ {

        nextPath := path + fmt.Sprint(i)

        if len(path) == startConcurrencyAtLevel {

            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)

        } else {

            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)

        }

    }

}


上面的代码有效,但是我依赖于 for select 语句超时。我正在寻找一种在所有 goroutine 完成后立即继续 main() 的方法,即所有子问题都已处理。


我已经想出了两种可能的(不受欢迎的/不雅的)解决方案:


使用互斥锁保护的结果映射 + 等待组而不是基于通道的方法应该可以解决问题,但我很好奇是否有一个简洁的通道解决方案。


使用退出通道(int 类型)。每次生成一个 goroutine 时,退出通道都会得到一个 +1 int,每次在叶子中完成计算时,它都会得到一个 -1 int 并且调用者对这些值求和。请参阅以下代码片段,但这不是一个好的解决方案,因为它(相当明显地)遇到了我不想处理的时间问题。例如,如果第一个 goroutine 在另一个 goroutine 生成之前完成,它会过早退出。




以下问题:


从作为 goroutine 开始的函数对其自身进行递归调用是一种有效的方法吗?

cRes在所有派生的 goroutine 完成之前读取结果的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。

对任何想法都很满意,谢谢!


繁华开满天机
浏览 77回答 2
2回答

呼唤远方

阅读描述和片段我无法准确理解你想要实现的目标,但我有一些我每天使用的频道的提示和模式并且认为有帮助。该context包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中,time.After用于结束主程序,但在非主函数中它可能会泄漏 goroutines:如果您改为使用context.Context它并将其传递给 gorotuines(它通常传递函数的第一个参数),您将能够控制取消下游呼叫。这简单解释一下。通常的做法是在生成消息并在通道中发送消息的函数中创建通道(并返回它们)。同样的功能应该负责关闭通道,例如,defer close(channel)当它完成写入时。这很方便,因为当通道被缓冲时,即使它仍然有数据也可以关闭它:Go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲通道,该函数将无法通过通道发送消息,直到通道的读者准备好轮询它,因此无法退出。&nbsp;这是一个例子(没有递归)。在此示例中,我们可以close在缓冲或未缓冲时使用通道,因为发送将阻塞直到for := range在主 goroutine 的通道上从中读取。&nbsp;这是相同原理的变体,通道作为参数传递。我们可以与通道sync.WaitGroup&nbsp;一起使用,为单个 goroutine 发出完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者都已完成向通道发送数据。与第 1 点相同的注意事项适用于close操作。&nbsp;这是一个示例,显示了 waitGroup 的使用和通道的外部关闭器。渠道可以有方向!请注意,在示例中,我在将箭头传入/外部函数时添加/删除了通道旁边的箭头(例如<-chan string, 或)。chan<- string这告诉编译器,一个通道在该函数的范围内分别只读或写。这在两个方面有所帮助:编译器将生成更高效的代码,因为有方向的通道将有一个锁而不是 2 个。函数的签名描述了它是否只会使用通道写入(可能close())或读取:请记住,range当通道关闭时,从带有 a 的通道读取会自动停止迭代。您可以构建通道的通道:make(chan chan string)是构建处理管道的有效(且有用)构造。它的一个常见用法是扇入 goroutine,它收集一系列 channel-producing goroutines 的多个输出。&nbsp;这是如何使用它们的示例。本质上,回答您最初的问题:从作为 goroutine 开始的函数对其自身进行递归调用是一种有效的方法吗?如果你真的需要递归,最好将它与并发代码分开处理:创建一个专用函数,递归地将数据发送到通道,并在调用者中协调通道的关闭。在所有派生的 goroutine 完成之前,从 cRes 读取结果的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。一个很好的参考是Go Concurrency Patterns: Pipelines and cancellation:这是一个相当古老的帖子(在contextstd lib 中存在包之前),我认为Parallel digestion你正在寻找解决原始问题的方法。

紫衣仙女

正如 torek 所提到的,我在 waitgroup 完成等待后剥离了一个关闭通道的匿名函数。还需要一些逻辑,仅在goroutine 生成级别的递归返回后才wg.Done()调用生成的 goroutine 。一般来说,我认为这是一个有用的成语(如果我错了请纠正我:))游乐场:https ://go.dev/play/p/bQjHENsZL25func main() {&nbsp; &nbsp; cRes := make(chan string, 100)&nbsp; &nbsp; numLevels := 3&nbsp; &nbsp; spread := 3&nbsp; &nbsp; startConcurrencyAtLevel := 2&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; // time.Sleep(1 * time.Second) // edit: code should work without this initial sleep&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(cRes)&nbsp; &nbsp; }()&nbsp; &nbsp; for r := range cRes {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(r)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("Done!")}func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {&nbsp; &nbsp; if len(path) == maxLevels {&nbsp; &nbsp; &nbsp; &nbsp; // some longer running task here associated with the found path&nbsp; &nbsp; &nbsp; &nbsp; cRes <- path&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; for i := 1; i <= spread; i++ {&nbsp; &nbsp; &nbsp; &nbsp; nextPath := path + fmt.Sprint(i)&nbsp; &nbsp; &nbsp; &nbsp; if len(path) == startConcurrencyAtLevel {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Go
我要回答