猿问

是否有一些优雅的方式来暂停和恢复任何其他 goroutine?

就我而言,我有数千个 goroutine 同时作为work(). 我也有一个sync()goroutine。当sync启动时,我需要任何其他的goroutine同步作业完成后暂停了一段时间。这是我的代码:


var channels []chan int

var channels_mutex sync.Mutex


func work() {

  channel := make(chan int, 1)

  channels_mutex.Lock()  

  channels = append(channels, channel)

  channels_mutex.Unlock()

  for {

    for {

      sync_stat := <- channel // blocked here

      if sync_stat == 0 { // if sync complete

        break  

      }

    }

    // Do some jobs

    if (some condition) {

      return

    }

  }

}


func sync() {

  channels_mutex.Lock()

  // do some sync


  for int i := 0; i != len(channels); i++ {

    channels[i] <- 0

  }

  channels_mutex.Unlock()

}

现在的问题是,由于<-总是在读取时阻塞,所以每次都sync_stat := <- channel阻塞。我知道如果通道关闭它不会被阻塞,但是因为我必须使用这个通道直到work()退出,而且我没有找到任何方法来重新打开一个关闭的通道。


我怀疑自己走错了路,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?


一只斗牛犬
浏览 205回答 1
1回答

不负相思意

如果我理解正确,您需要 N 个工人和一个控制器,可以随意暂停、恢复和停止工人。下面的代码将做到这一点。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "sync")// Possible worker states.const (&nbsp; &nbsp; Stopped = 0&nbsp; &nbsp; Paused&nbsp; = 1&nbsp; &nbsp; Running = 2)// Maximum number of workers.const WorkerCount = 1000func main() {&nbsp; &nbsp; // Launch workers.&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(WorkerCount + 1)&nbsp; &nbsp; workers := make([]chan int, WorkerCount)&nbsp; &nbsp; for i := range workers {&nbsp; &nbsp; &nbsp; &nbsp; workers[i] = make(chan int, 1)&nbsp; &nbsp; &nbsp; &nbsp; go func(i int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; worker(i, workers[i])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; // Launch controller routine.&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; controller(workers)&nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; }()&nbsp; &nbsp; // Wait for all goroutines to finish.&nbsp; &nbsp; wg.Wait()}func worker(id int, ws <-chan int) {&nbsp; &nbsp; state := Paused // Begin in the paused state.&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case state = <-ws:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; switch state {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case Stopped:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Worker %d: Stopped\n", id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case Running:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Worker %d: Running\n", id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case Paused:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Worker %d: Paused\n", id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // We use runtime.Gosched() to prevent a deadlock in this case.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // It will not be needed of work is performed here which yields&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // to the scheduler.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if state == Paused {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Do actual work here.&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}// controller handles the current state of all workers. They can be// instructed to be either running, paused or stopped entirely.func controller(workers []chan int) {&nbsp; &nbsp; // Start workers&nbsp; &nbsp; setState(workers, Running)&nbsp; &nbsp; // Pause workers.&nbsp; &nbsp; setState(workers, Paused)&nbsp; &nbsp; // Unpause workers.&nbsp; &nbsp; setState(workers, Running)&nbsp; &nbsp; // Shutdown workers.&nbsp; &nbsp; setState(workers, Stopped)}// setState changes the state of all given workers.func setState(workers []chan int, state int) {&nbsp; &nbsp; for _, w := range workers {&nbsp; &nbsp; &nbsp; &nbsp; w <- state&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Go
我要回答