猿问

将来自多个 go routines 的响应获取到一个数组中

我需要从多个 go 例程中获取响应并将它们放入一个数组中。我知道通道可用于此目的,但我不确定如何确保所有 go 例程都已完成结果处理。因此我正在使用等待组。


代码


func main() {

  log.Info("Collecting ints")

  var results []int32

  for _, broker := range e.BrokersByBrokerID {

      wg.Add(1)

      go getInt32(&wg)

  }

  wg.Wait()

  log.info("Collected")

}


func getInt32(wg *sync.WaitGroup) (int32, error) {

  defer wg.Done()


  // Just to show that this method may just return an error and no int32

  err := broker.Open(config)

  if err != nil && err != sarama.ErrAlreadyConnected {

    return 0, fmt.Errorf("Cannot connect to broker '%v': %s", broker.ID(), err)

  }

  defer broker.Close()


  return 1003, nil

}

我的问题


如何将所有响应 int32(可能返回错误)放入我的 int32 数组,确保所有 go 例程都已完成处理工作并返回错误或 int?


眼眸繁星
浏览 209回答 4
4回答

繁花如伊

如果您不处理作为 goroutine 启动的函数的返回值,它们将被丢弃。您可以使用切片来收集结果,其中每个 goroutine 都可以接收将结果放入的索引,或者元素的地址。请注意,如果您使用它,则必须预先分配切片,并且只能写入属于 goroutine 的元素,您不能“触摸”其他元素,也不能附加到切片。或者您可以使用一个通道,goroutines 在该通道上发送包含它们处理的项目的索引或 ID 的值,以便收集 goroutine 可以识别或排序它们。请注意,这里不需要等待组,因为我们知道我们期望通道上的值与我们启动的 goroutine 一样多。type result struct {    task int32    data int32    err  error}func main() {    tasks := []int32{1, 2, 3, 4}    ch := make(chan result)    for _, task := range tasks {        go calcTask(task, ch)    }    // Collect results:    results := make([]result, len(tasks))    for i := range results {        results[i] = <-ch    }    fmt.Printf("Results: %+v\n", results)}func calcTask(task int32, ch chan<- result) {    if task > 2 {        // Simulate failure        ch <- result{task: task, err: fmt.Errorf("task %v failed", task)}        return    }    // Simulate success    ch <- result{task: task, data: task * 2, err: nil}}输出(在Go Playground上尝试):Results: [{task:4 data:0 err:0x40e130} {task:1 data:2 err:<nil>} {task:2 data:4 err:<nil>} {task:3 data:0 err:0x40e138}]

杨__羊羊

我也相信你必须使用频道,它必须是这样的:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "sync")var (&nbsp; &nbsp; BrokersByBrokerID = []int32{1, 2, 3})type result struct {&nbsp; &nbsp; data string&nbsp; &nbsp; err string // you must use error type here}func main()&nbsp; {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; var results []result&nbsp; &nbsp; ch := make(chan result)&nbsp; &nbsp; for _, broker := range BrokersByBrokerID {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go getInt32(ch, &wg, broker)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for v := range ch {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results = append(results, v)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(ch)&nbsp; &nbsp; log.Printf("collected %v", results)}func getInt32(ch chan result, wg *sync.WaitGroup, broker int32) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; if broker == 1 {&nbsp; &nbsp; &nbsp; &nbsp; ch <- result{err: fmt.Sprintf("error: gor broker 1")}&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; ch <- result{data: fmt.Sprintf("broker %d - ok", broker)}}结果将如下所示:2019/02/05 15:26:28 collected [{broker 3 - ok } {broker 2 - ok } { error: gor broker 1}]

拉莫斯之舞

package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "sync")var (&nbsp; &nbsp; BrokersByBrokerID = []int{1, 2, 3, 4})type result struct {&nbsp; &nbsp; data string&nbsp; &nbsp; err&nbsp; string // you must use error type here}func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; var results []int&nbsp; &nbsp; ch := make(chan int)&nbsp; &nbsp; done := make(chan bool)&nbsp; &nbsp; for _, broker := range BrokersByBrokerID {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(i int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- i&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if i == 4 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; done <- true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(broker)&nbsp; &nbsp; }L:&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case v := <-ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results = append(results, v)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if len(results) == 4 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //<-done&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(ch)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break L&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; case _ = <-done:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("STOPPED")&nbsp; &nbsp; //<-done&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; log.Printf("collected %v", results)}

月关宝盒

package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")var (&nbsp; &nbsp; BrokersByBrokerID = []int{1, 2, 3, 4})type result struct {&nbsp; &nbsp; data string&nbsp; &nbsp; err&nbsp; string // you must use error type here}func main() {&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; var wg sync.WaitGroup.&nbsp; &nbsp;&nbsp; &nbsp; var results []int&nbsp;&nbsp;&nbsp; &nbsp; ch := make(chan int)&nbsp;&nbsp;&nbsp; &nbsp; done := make(chan bool)&nbsp;&nbsp; &nbsp; for _, broker := range BrokersByBrokerID {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp;wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(i int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- i&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if i == 4 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; done <- true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; }(broker)&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; for v := range ch {&nbsp; &nbsp; &nbsp; &nbsp; results = append(results, v)&nbsp; &nbsp; &nbsp; &nbsp; if len(results) == 4 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(ch)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("STOPPED")&nbsp; &nbsp; <-done&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; log.Printf("collected %v", results)}</pre>
随时随地看视频慕课网APP

相关分类

Go
我要回答