猿问

始终有 x 个 goroutine 随时运行

我看到很多关于如何让 Go 等待 x 个 goroutines 完成的教程和示例,但我想做的是确保总是有 x 个 goroutine 在运行,所以一旦一个 goroutine 结束就会启动一个新的 goroutine .


具体来说,我有几十万个“要做的事情”,它们正在处理一些来自 MySQL 的东西。所以它是这样工作的:


db, err := sql.Open("mysql", connection_string)

checkErr(err)

defer db.Close()


rows,err := db.Query(`SELECT id FROM table`)

checkErr(err)

defer rows.Close()


var id uint

for rows.Next() {

    err := rows.Scan(&id)

    checkErr(err)

    go processTheThing(id)

    }

checkErr(err)

rows.Close()

目前,将推出数十万线程processTheThing()。我需要的是最多启动 x 个(我们称之为 20 个)goroutines。因此,它首先为前 20 行启动 20 个,从那时起,它将在当前 goroutine 之一完成时为下一个 id 启动一个新的 goroutine。所以在任何时间点总是有 20 个在运行。


我敢肯定,这是很简单/标准,但我似乎无法找到任何教程或例子或如何做到这一点的一个很好的解释。


有只小跳蛙
浏览 206回答 3
3回答

郎朗坤

您可能会发现Go Concurrency Patterns文章很有趣,尤其是有界并行性部分,它解释了您需要的确切模式。您可以使用空结构的通道作为限制保护来控制并发工作程序 goroutine 的数量:package mainimport "fmt"func main() {&nbsp; &nbsp; maxGoroutines := 10&nbsp; &nbsp; guard := make(chan struct{}, maxGoroutines)&nbsp; &nbsp; for i := 0; i < 30; i++ {&nbsp; &nbsp; &nbsp; &nbsp; guard <- struct{}{} // would block if guard channel is already filled&nbsp; &nbsp; &nbsp; &nbsp; go func(n int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; worker(n)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-guard&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }}func worker(i int) { fmt.Println("doing work on", i) }

ABOUTYOU

在这里,我认为像这样简单的事情会起作用:package mainimport "fmt"const MAX = 20func main() {&nbsp; &nbsp; sem := make(chan int, MAX)&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; sem <- 1 // will block if there is MAX ints in sem&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("hello again, world")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-sem // removes an int from sem, allowing another to proceed&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }}

慕姐8265434

感谢大家帮助我解决这个问题。但是,我认为没有人真正提供既有效又简单/易懂的东西,尽管你们都帮助我理解了该技术。我最后所做的是我认为作为对我的具体问题的答案更容易理解和实用,所以我会在这里发布,以防其他人有同样的问题。不知何故,这最终看起来很像 OneOfOne 发布的内容,这很棒,因为现在我明白了。但是 OneOfOne 的代码一开始我发现很难理解,因为将函数传递给函数使得理解什么是什么非常令人困惑。我认为这种方式更有意义:package mainimport ("fmt""sync")const xthreads = 5 // Total number of threads to use, excluding the main() threadfunc doSomething(a int) {&nbsp; &nbsp; fmt.Println("My job is",a)&nbsp; &nbsp; return}func main() {&nbsp; &nbsp; var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; // This starts xthreads number of goroutines that wait for something to do&nbsp; &nbsp; wg.Add(xthreads)&nbsp; &nbsp; for i:=0; i<xthreads; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; a, ok := <-ch&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok { // if there is nothing to do and the channel has been closed then end the goroutine&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; doSomething(a) // do the thing&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; // Now the jobs can be added to the channel, which is used as a queue&nbsp; &nbsp; for i:=0; i<50; i++ {&nbsp; &nbsp; &nbsp; &nbsp; ch <- i // add i to the queue&nbsp; &nbsp; }&nbsp; &nbsp; close(ch) // This tells the goroutines there's nothing else to do&nbsp; &nbsp; wg.Wait() // Wait for the threads to finish}
随时随地看视频慕课网APP

相关分类

Go
我要回答