如何将多个 goroutine 同步到选定 goroutine 的终止

我在上一个问题中问过这个问题,但有些人觉得我最初的问题不够详细(“为什么你想要一个定时条件等待??”)所以这里有一个更具体的问题。


我有一个 goroutine 正在运行,称之为服务器。它已经启动,将执行一段时间,并做它的事情。然后,它会退出,因为它完成了。


在它的执行过程中,一些其他的 goroutine 会启动。如果您愿意,可以称它们为“客户端”线程。他们运行步骤 A 和步骤 B。然后,他们必须等待“服务器”goroutine 完成指定的时间,如果“服务器未完成,则退出状态,如果完成则说运行步骤 C。”


(请不要告诉我如何重组此工作流程。它是假设的和给定的。无法更改。)


一个正常的、合理的方法是让服务器线程用 selectAll 或 Broadcast 函数通知一个条件变量,并让其他线程处于定时等待状态监视条件变量。


func (s *Server) Join(timeMillis int) error {

  s.mux.Lock()

  defer s.mux.Unlock()

  while !s.isFinished {

     err = s.cond.Wait(timeMillis)

     if err != nil {

        stepC()

     }

  }

  return err

}

服务器将进入一个状态,其中 isFinished 变为 true 并广播关于互斥锁的条件变量。除非这是不可能的,因为 Go 不支持定时条件等待。(但有一个 Broadcast())


那么,什么是“以 Go 为中心”的方式来做到这一点?我已经了解了所有的 Go 博客和文档,这种模式或其等价物,尽管很明显,但从未出现,也没有对基本问题进行任何等效的“重构”——即 IPC 风格的通道介于一个例程和一个例程之间其他例行公事。是的,有扇入/扇出,但请记住这些线程不断出现和消失。这应该很简单 - 并且至关重要的是/不要让成千上万的“等待状态”goroutine 在复用通道的另一个“分支”(计时器)发出信号时等待服务器死亡/。


请注意,上面的一些“客户端”可能在服务器 goroutine 启动之前启动(这是通常创建通道的时间),一些可能出现在期间,一些可能出现在之后......在所有情况下,如果和,他们应该运行 stepC仅当服务器在进入 Join() 函数后 timeMillis 毫秒后运行并退出时...


一般来说,当有多个消费者时,渠道设施似乎非常缺乏。“首先构建一个将侦听器映射到的通道注册表”和“这是一个非常漂亮的递归数据结构,它通过它作为字段保存的通道发送自己”是这样的: 等待(forSomeTime)


慕哥9229398
浏览 159回答 1
1回答

慕田峪9158850

我认为可以通过在单个共享通道上进行选择,然后在完成后让服务器关闭它来完成您想要的操作。假设我们创建了一个全局“退出通道”,它在所有 goroutine 之间共享。它可以在创建“服务器”goroutine 之前创建。重要的部分是服务器 goroutine 从不向通道发送任何内容,而只是将其关闭。现在客户端 goroutines,只需执行以下操作:select {&nbsp; &nbsp; case <- ch:&nbsp; &nbsp; fmt.Println("Channel closed, server is done!")&nbsp; &nbsp; case <-time.After(time.Second):&nbsp; &nbsp; fmt.Println("Timed out. do recovery stuff")}服务器 goroutine 只做:close(ch)更完整的例子:package mainimport(&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time")func waiter(ch chan struct{}) {&nbsp; &nbsp; fmt.Println("Doing stuff")&nbsp; &nbsp; fmt.Println("Waiting...")&nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <- ch:&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Channel closed")&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(time.Second):&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Timed out. do recovery stuff")&nbsp; &nbsp; }}func main(){&nbsp; &nbsp; ch := make(chan struct{})&nbsp; &nbsp; go waiter(ch)&nbsp; &nbsp; go waiter(ch)&nbsp; &nbsp; time.Sleep(100*time.Millisecond)&nbsp; &nbsp; fmt.Println("Closing channel")&nbsp; &nbsp; close(ch)&nbsp; &nbsp; time.Sleep(time.Second)}这可以抽象为以下实用程序 API:type TimedCondition struct {&nbsp; &nbsp; ch chan struct{}}func NewTimedCondition()*TimedCondition {&nbsp; &nbsp; return &TimedCondition {&nbsp; &nbsp; &nbsp; &nbsp; ch: make(chan struct{}),&nbsp; &nbsp; }}func (c *TimedCondition)Broadcast() {&nbsp; &nbsp; close(c.ch)}func (c *TimedCondition)Wait(t time.Duration) error {&nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; // channel closed, meaning broadcast was called&nbsp; &nbsp; &nbsp; &nbsp; case <- c.ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(t):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return errors.New("Time out")&nbsp; &nbsp;&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go