猿问

潜在递归任务的工作池(即,每个作业可以排队其他作业)

我正在编写一个应用程序,用户可以从多个“作业”(实际上是 URL)开始。在开始(主程序)时,我将这些 URL 添加到队列中,然后启动 x 处理这些 URL 的 goroutine。


在特殊情况下,URL 指向的资源可能包含更多必须添加到队列中的 URL。这 3 名工人正在等待新的工作进入并处理它们。问题是:一旦每个工人都在等待工作(并且没有人生产任何工作),工人就应该完全停止。所以要么所有这些都有效,要么没有一个有效。


我当前的实现看起来像这样,我认为它不优雅。不幸的是,我想不出一个更好的方法来不包含竞争条件,而且我不完全确定这个实现是否真的按预期工作:


var queue // from somewhere

const WORKER_COUNT = 3

var done chan struct{}


func work(working chan int) {

  absent := make(chan struct{}, 1)

  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.

  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.

  one := false

  for {

    select {

    case u, ok := <-queue.Pop():

      if !ok {

        close(absent)

        return

      }

      if !one {

        // I have started working (delta + 1)

        working <- 1

        absent <- struct{}{}

        one = true

      }

      // do work with u (which may lead to queue.Push(urls...))

    case <-absent: // no jobs at the moment. consume absent => wait

      one = false

      working <- -1

    }

  }

}


func Start() {

  working := make(chan int)

  for i := 0; i < WORKER_COUNT; i++ {

    go work(working)

  }

  // the amount of actually working workers...

  sum := 0

  for {

    delta := <-working

    sum += delta

    if sum == 0 {

      queue.Close() // close channel -> kill workers.

      done <- struct{}{}

      return

    }

  }

}

有没有更好的方法来解决这个问题?


慕尼黑5688855
浏览 169回答 1
1回答

德玛西亚99

您可以使用 sync.WaitGroup(请参阅文档)来控制工作人员的生命周期,并使用非阻塞发送,以便工作人员在尝试排队更多作业时不会死锁:package mainimport "sync"const workers = 4type job struct{}func (j *job) do(enqueue func(job)) {&nbsp; &nbsp; // do the job, calling enqueue() for subtasks as needed}func main() {&nbsp; &nbsp; jobs, wg := make(chan job), new(sync.WaitGroup)&nbsp; &nbsp; var enqueue func(job)&nbsp; &nbsp; // workers&nbsp; &nbsp; for i := 0; i < workers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; j.do(enqueue)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; // how to queue a job&nbsp; &nbsp; enqueue = func(j job) {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case jobs <- j: // another worker took it&nbsp; &nbsp; &nbsp; &nbsp; default: // no free worker; do the job now&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; j.do(enqueue)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; todo := make([]job, 1000)&nbsp; &nbsp; for _, j := range todo {&nbsp; &nbsp; &nbsp; &nbsp; enqueue(j)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(jobs)}尝试使用缓冲通道避免死锁的困难在于,您必须预先分配一个足够大的通道,以确保在不阻塞的情况下保持所有挂起的任务。除非您有少量已知的 URL 可供抓取,否则会出现问题。当您回退到在当前线程中进行普通递归时,您没有那个静态缓冲区大小限制。当然,仍然存在限制:如果有太多工作待处理,您可能会耗尽 RAM,理论上您可以通过深度递归耗尽堆栈(但这很难!)。因此,如果您要对整个 Web 进行爬行,则需要以更复杂的方式跟踪待处理的任务。最后,作为一个更完整的例子,我对这段代码并不感到非常自豪,但我碰巧写了一个函数来启动一个并行排序,它以与获取 URL 的方式相同的方式递归。
随时随地看视频慕课网APP

相关分类

Go
我要回答