永远连续运行最多两个 goroutine

我正在尝试同时运行一个函数。它调用我的数据库可能需要 2-10 秒。我希望它完成后继续执行下一个例程,即使另一个例程仍在处理中,但只希望它一次最多处理 2 个。我希望这种情况无限期地发生。我觉得我快到了,但是 waitGroup 强制这两个例程等到完成,然后再继续另一个迭代。


const ROUTINES = 2;

for {

            var wg sync.WaitGroup

            _, err:= db.Exec(`Random DB Call`)

            if err != nil {

                panic(err)

            }

            ch := createRoutines(db, &wg)

            wg.Add(ROUTINES)

            for i := 1; i <= ROUTINES; i++ {

                ch <- i

                time.Sleep(2 * time.Second)

            }


            close(ch)

            wg.Wait() 

        }



func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {

    var ch = make(chan int, 5)

    for i := 0; i < ROUTINES ; i++ {

        go func(db *sqlx.DB) {

            defer wg.Done()

            for {

                _, ok := <-ch

                if !ok { 

                    return

                }

                doStuff(db) 


            }

        }(db)


    }

    return ch

}


森栏
浏览 232回答 2
2回答

有只小跳蛙

如果您只需要同时运行 n 个 goroutine,您可以拥有一个大小为 n 的缓冲通道,并在没有剩余空间时使用它来阻止创建新的 goroutine,就像这样package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; const ROUTINES = 2&nbsp; &nbsp; rand.Seed(time.Now().UnixNano())&nbsp; &nbsp; stopper := make(chan struct{}, ROUTINES)&nbsp; &nbsp; var counter int&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; counter++&nbsp; &nbsp; &nbsp; &nbsp; stopper <- struct{}{}&nbsp; &nbsp; &nbsp; &nbsp; go func(c int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("+ Starting goroutine", c)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Duration(rand.Intn(3)) * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("- Stopping goroutine", c)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-stopper&nbsp; &nbsp; &nbsp; &nbsp; }(counter)&nbsp; &nbsp; }}在这个例子中,你会看到你只能拥有 ROUTINES 个存活时间为 0、1 或 2 秒的 goroutine。在输出中,您还可以看到每次一个 goroutine 结束时另一个 goroutine 是如何开始的。

慕姐8265434

这增加了一个外部依赖,但考虑这个实现:package mainimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "database/sql"&nbsp; &nbsp; "log"&nbsp; &nbsp; "github.com/MicahParks/ctxerrpool")func main() {&nbsp; &nbsp; // Create a pool of 2 workers for database queries. Log any errors.&nbsp; &nbsp; databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Failed to execute database query.\nError: %s", err.Error())&nbsp; &nbsp; })&nbsp; &nbsp; // Get a list of queries to execute.&nbsp; &nbsp; queries := []string{&nbsp; &nbsp; &nbsp; &nbsp; "SELECT first_name, last_name FROM customers",&nbsp; &nbsp; &nbsp; &nbsp; "SELECT price FROM inventory WHERE sku='1234'",&nbsp; &nbsp; &nbsp; &nbsp; "other queries...",&nbsp; &nbsp; }&nbsp; &nbsp; // TODO Make a database connection.&nbsp; &nbsp; var db *sql.DB&nbsp; &nbsp; for _, query := range queries {&nbsp; &nbsp; &nbsp; &nbsp; // Intentionally shadow the looped variable for scope.&nbsp; &nbsp; &nbsp; &nbsp; query := query&nbsp; &nbsp; &nbsp; &nbsp; // Perform the query on a worker. If no worker is ready, it will block until one is.&nbsp; &nbsp; &nbsp; &nbsp; databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _, err = db.ExecContext(workCtx, query)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; }&nbsp; &nbsp; // Wait for all workers to finish.&nbsp; &nbsp; databasePool.Wait()}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go