如何并行化递归函数

我试图在 Go 中并行化递归问题,我不确定最好的方法是什么。


我有一个递归函数,它的工作原理如下:


func recFunc(input string) (result []string) {

    for subInput := range getSubInputs(input) {

        subOutput := recFunc(subInput)

        result = result.append(result, subOutput...)

    }

    result = result.append(result, getOutput(input)...)

}


func main() {

    output := recFunc("some_input")

    ...

}

因此,该函数调用自身时间(其中N在某个级别为0),生成自己的输出并返回列表中的所有内容。N

现在我想让这个函数并行运行。但我不确定最干净的方法来做到这一点。我的想法:

  • 有一个“结果”通道,所有函数调用都向该通道发送其结果。

  • 在 main 函数中收集结果。

  • 有一个等待组,用于确定何时收集所有结果。

问题:我需要等待等待组并并行收集所有结果。我可以为此启动一个单独的 go 函数,但是我该如何退出这个单独的 go 函数呢?

func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {

    defer waitGroup.Done()

    waitGroup.Add(len(getSubInputs(input))

    for subInput := range getSubInputs(input) {

        go recFunc(subInput)

    }

    outputChannel <-getOutput(input)

}


func main() {

    outputChannel := make(chan []string)

    waitGroup := sync.WaitGroup{}


    waitGroup.Add(1)

    go recFunc("some_input", outputChannel, &waitGroup)


    result := []string{}

    go func() {

       nextResult := <- outputChannel

       result = append(result, nextResult ...)

    }

    waitGroup.Wait()

}

也许有更好的方法来做到这一点?或者,我如何确保收集结果的匿名 go 函数在完成时被截断?


慕妹3146593
浏览 177回答 2
2回答

小唯快跑啊

博士;递归算法应该对昂贵的资源(网络连接、goroutines、堆栈空间等)有限制。应支持取消 - 以确保在不再需要结果时可以快速清理昂贵的操作分支遍历应支持错误报告;这允许错误冒泡堆栈并返回部分结果,而不会使整个递归遍历失败。对于同步结果 ( 无论是否使用递归 - 建议使用通道。此外,对于具有许多 goroutine 的长时间运行的作业,请提供取消方法(上下文。上下文)以帮助清理。由于递归可能导致资源呈指数级消耗,因此设置限制非常重要(请参阅有界并行性)。以下是我经常用于异步任务的设计模式:始终支持采取上下文。取消的上下文任务所需的工作人员数返回 a 的结果和 a(将只返回一个错误或chanchan errornil)var (&nbsp; &nbsp; workers = 10&nbsp; &nbsp; ctx&nbsp; &nbsp; &nbsp;= context.TODO() // use request context here - otherwise context.Background()&nbsp; &nbsp; input&nbsp; &nbsp;= "abc")resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels// asynchronous results - so read that channel first in the event of partial results ...for r := range resultC {&nbsp; &nbsp; fmt.Println(r)}// ... then check for any errorsif err := <-errC; err != nil {&nbsp; &nbsp; log.Fatal(err)}递归:由于递归可以快速水平扩展,因此需要一种一致的方式来填充有限的工人列表,同时还要确保当工人被释放时,他们能够快速从其他(过度工作)工人那里接手工作。与其创建经理层,不如采用合作的同事对等系统:每个工作线程共享一个输入通道在输入上递归之前 () 检查是否有任何其他工作线程处于空闲状态subIinputs如果是这样,请委派给该工作人员如果不是,则当前工作线程继续递归该分支有了这个算法,有限的工人数量很快就会被工作所淹没。任何提前完成分支的工人 - 将很快从另一个工人那里获得子分支。最终,所有工作线程都将用完子分支,此时所有工作线程都将空闲(阻止),递归任务可以完成。要实现这一目标,需要进行一些认真的协调。允许工作线程写入输入通道有助于通过委派进行这种对等协调。“递归深度”用于跟踪所有工作线程的所有分支何时耗尽。WaitGroup(为了包括上下文支持和错误链接 - 我更新了您的函数以采用a并返回可选):getSubInputsctxerrorfunc recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {&nbsp; &nbsp; defer rwg.Done() // decrement recursion count when a depth of recursion has completed&nbsp; &nbsp; subInputs, err := getSubInputs(ctx, input)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; for subInput := range subInputs {&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; rwg.Add(1) // about to recurse (or delegate recursion)&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case in <- subInput:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // delegated - to another goroutine&nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // context canceled...&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // but first we need to undo the earlier `rwg.Add(1)`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // as this work item was never delegated or handled by this worker&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rwg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return ctx.Err()&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // noone available to delegate - so this worker will need to recurse this item themselves&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = recFunc(ctx, subInput, in, out, rwg)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // always check context when doing anything potentially blocking (in this case writing to `out`)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // context canceled&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return ctx.Err()&nbsp; &nbsp; &nbsp; &nbsp; case out <- subInput:&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; return nil}连接工件:recJob创建:输入和输出通道 - 由所有工人共享“递归”检测所有工作线程何时空闲WaitGroup然后可以安全地关闭“输出”通道所有工作线程的错误通道通过将初始输入写入输入通道来启动递归工作负载func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {&nbsp; &nbsp; // RW channels&nbsp; &nbsp; out := make(chan string)&nbsp; &nbsp; eC := make(chan error, 1)&nbsp; &nbsp; // R-only channels returned to caller&nbsp; &nbsp; resultsC, errC = out, eC&nbsp; &nbsp; // create workers + waitgroup logic&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; var err error // error that will be returned to call via error channel&nbsp; &nbsp; &nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; eC <- err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(eC)&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)&nbsp; &nbsp; &nbsp; &nbsp; workerErrC := createWorkers(ctx, workers, in, out, &wg)&nbsp; &nbsp; &nbsp; &nbsp; // get the ball rolling, pass input job to one of the workers&nbsp; &nbsp; &nbsp; &nbsp; // Note: must be done *after* workers are created - otherwise deadlock&nbsp; &nbsp; &nbsp; &nbsp; in <- input&nbsp; &nbsp; &nbsp; &nbsp; errCount := 0&nbsp; &nbsp; &nbsp; &nbsp; // wait for all worker error codes to return&nbsp; &nbsp; &nbsp; &nbsp; for err2 := range workerErrC {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err2 != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println("worker error:", err2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; errCount++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // all workers have completed&nbsp; &nbsp; &nbsp; &nbsp; if errCount > 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("All %d workers have FINISHED\n", workers)&nbsp; &nbsp; }()&nbsp; &nbsp; return}最后,创建工作线程:func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {&nbsp; &nbsp; eC := make(chan error) // RW-version&nbsp; &nbsp; errC = eC&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // RO-version (returned to caller)&nbsp; &nbsp; // track the completeness of the workers - so we know when to wrap up&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(workers)&nbsp; &nbsp; for i := 0; i < workers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; i := i&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var err error&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // ensure the current worker's return code gets returned&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // via the common workers' error-channel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("worker #%3d ERRORED: %s\n", i+1, err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("worker #%3d FINISHED.\n", i+1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; eC <- err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("worker #%3d STARTED successfully\n", i+1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // worker scans for input&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for input := range in {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = recFunc(ctx, input, in, out, rwg)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; rwg.Wait() // wait for all recursion to finish&nbsp; &nbsp; &nbsp; &nbsp; close(in)&nbsp; // safe to close input channel as all workers are blocked (i.e. no new inputs)&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; // now wait for all workers to return&nbsp; &nbsp; &nbsp; &nbsp; close(eC)&nbsp; // finally, signal to caller we're truly done by closing workers' error-channel&nbsp; &nbsp; }()&nbsp; &nbsp; return}

元芳怎么了

我可以为此启动一个单独的 go 函数,但是我该如何退出这个单独的 go 函数呢?您可以在单独的 go-routine 中通过输出通道。在这种情况下,当通道关闭时,go-routine将安全退出rangego func() {&nbsp; &nbsp;for nextResult := range outputChannel {&nbsp; &nbsp; &nbsp;result = append(result, nextResult ...)&nbsp; &nbsp;}}因此,现在我们需要注意的是,在作为递归函数调用的一部分生成的所有go-routine都成功存在之后,通道被关闭。为此,您可以在所有 go 例程中使用共享等待组,并在主函数中等待该等待组,就像您已经在做的那样。等待结束后,关闭输出通道,以便其他 go-routine 也安全退出func recFunc(input string, outputChannel chan, wg &sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for subInput := range getSubInputs(input) {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go recFunc(subInput)&nbsp; &nbsp; }&nbsp; &nbsp; outputChannel <-getOutput(input)}func main() {&nbsp; &nbsp; outputChannel := make(chan []string)&nbsp; &nbsp; waitGroup := sync.WaitGroup{}&nbsp; &nbsp; waitGroup.Add(1)&nbsp; &nbsp; go recFunc("some_input", outputChannel, &waitGroup)&nbsp; &nbsp; result := []string{}&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp;for nextResult := range outputChannel {&nbsp; &nbsp; &nbsp; result = append(result, nextResult ...)&nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; }&nbsp; &nbsp; waitGroup.Wait()&nbsp; &nbsp; close(outputChannel)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;}PS:如果你想有界并行度来限制指数增长,请查看这个
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go