猿问

Golang 工作线程池 - 从作业内部排队新作业

我正在尝试同时从一组存储桶中构建一棵树,并且鉴于worker模式在go中似乎非常流行,我尝试将其应用于我的问题。基本上,我启动了一定数量的工人,让他们收听共享的工作渠道。然后,第一个工作线程接收树根节点作为第一个作业,并在分支和创建另外 2 个作业之前用相关信息填充它。然后,这些作业应该分布在其他工作者之间,然后这些工作线程将递归地生成更多的工作,直到整个树被构造出来。我的幼稚方法的简化表示类似于以下内容:


func workers(count int) {


    wg := sync.WaitGroup{}

    wg.Add(count)


    jobs := make(chan job)

    for i := 0; i < count; i++ {

        go func() {

            // worker waits for job and then executes it

            for j := range jobs {

                processJob(j, jobs)

            }

            wg.Done()

        }()

    }


    // start with some initial job

    jobs <- job{}


    wg.Wait()


}


func processJob(j job, jobs chan job) {


    // jobs channel is closed when tree is finished

    if done {

        close(jobs)

    }

    // Do some more irrelevant stuff


    // sometimes 2 new jobs result from this one

    jobs <- job{}

    jobs <- job{}

    // but that doesn't work, if all workers try to send and no one receives


}

问题是,我无法从 1 个作业中添加 2 个新作业,因为在某些时候,每个工作线程都会忙于尝试将作业发送到通道,并且没有工作线程位于接收端。


任何人都可以为我指出一个优雅的解决方案的方向,还是我对这个问题的整个方法都是错误的?


繁花不似锦
浏览 68回答 1
1回答

莫回无

如果没有其他工作人员准备好处理作业,请使用当前工作人员:func doJob(j job, jobs chan job) {&nbsp; &nbsp; select {&nbsp; &nbsp; case jobs <- j:&nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; // Send to jobs was not ready, do the job&nbsp; &nbsp; &nbsp; &nbsp; // in the current worker.&nbsp; &nbsp; &nbsp; &nbsp; processJob(j, jobs)&nbsp; &nbsp; }}将 send 语句替换为调用 。jobs <- job{}doJob(job{}, jobs)使用缓冲通道让工作线程保持忙碌:jobs := make(chan job, N)调整,直到找到一个工作人员大多忙的值。的一个好的起始值是 。此调整不是防止死锁所必需的。当 N 等于零时,程序不会死锁。NNcount
随时随地看视频慕课网APP

相关分类

Go
我要回答