猿问

程序因频道而挂起

我想使用 goroutine 来批量处理来自不同客户的不同日期的请求。

我的意思是 50 个消费者 goroutine 来消费数据库中的所有客户,以及 2 个日期消费者 goroutine 来消费日期切片。

主要代码如下,但它挂起并且没有按预期退出。

为什么它没有按预期退出?

func Run(){

    var syncWg sync.WaitGroup

    syncWg.Add(1)

    go SyncCustomerMetricsHistory(&syncWg)

    syncWg.Wait()

}


func SyncCustomerMetricsHistory(wg *sync.WaitGroup){

    defer wg.Done()

    odb := orm.NewOrm()

    start := time.Now()

    logs.Info("start sync  customer metrics, time:[%v]", start)


    qs := odb.QueryTable("gg_customer")

    var customers []*db.GgCustomer

    if num, err := qs.All(&customers); err != nil || num == 0 {

        logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)

    }


    customersChan := make(chan *db.GgCustomer, 50)


    var wgC sync.WaitGroup

    wgC.Add(50)

    for i := 0; i < 50; i++ {

        go syncCustomerMetricsHistory(customersChan, &wgC)

    }


    go func() {

        for _, customer := range customers {

            customersChan <- customer

        }

        close(customersChan)

    }()


    wgC.Wait()

}


func  syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer, wg *sync.WaitGroup){

    defer wg.Done()

    for customer := range customerChan{

            dateChan := make(chan string, 2)

            var wgD sync.WaitGroup

            wgD.Add(2)

            for i := 1; i < 2; i++{

                go test(dateChan, customer, &wgD)

            }

            go func(){

                for _, date := range GetAllYearDate(){

                    dateChan <- date

                }

                close(dateChan)

            }()


            wgD.Wait()

        }

    }

}


func test(dateChan <- chan string, customer *db.GgCustomer, wg *sync.WaitGroup){

    defer wg.Done()

    for date := range dateChan{

        fmt.Println(date, customer)

    }

}



func  GetAllYearDate()  []string{

  return []string{"2019-10-01", "2019-10-02"}

}


慕桂英4014372
浏览 139回答 1
1回答

四季花海

我没有尝试运行这个(因为它需要额外的代码),但相信你的问题是:wgD.Add(2)for i := 1; i < 2; i++{&nbsp;go test(dateChan, customer, &wgD)}该 for 循环只会迭代一次,但您调用了 wgD.Add(2) (我认为您可能意味着循环迭代两次;尝试i <= 2)。另一点反馈;您使用等待组的方式会起作用,但很难遵循(可能导致您没有发现问题);怎么样:func Run(){&nbsp; &nbsp; SyncCustomerMetricsHistory()&nbsp; // No wait group needed as this will not return before done}func SyncCustomerMetricsHistory(){&nbsp; &nbsp; odb := orm.NewOrm()&nbsp; &nbsp; start := time.Now()&nbsp; &nbsp; logs.Info("start sync&nbsp; customer metrics, time:[%v]", start)&nbsp; &nbsp; qs := odb.QueryTable("gg_customer")&nbsp; &nbsp; var customers []*db.GgCustomer&nbsp; &nbsp; if num, err := qs.All(&customers); err != nil || num == 0 {&nbsp; &nbsp; &nbsp; &nbsp; logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)&nbsp; &nbsp; }&nbsp; &nbsp; customersChan := make(chan *db.GgCustomer, 50)&nbsp; &nbsp; var wgC sync.WaitGroup&nbsp; &nbsp; wgC.Add(50)&nbsp; &nbsp; for i := 0; i < 50; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; syncCustomerMetricsHistory(customersChan)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wgC.Done()&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for _, customer := range customers {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; customersChan <- customer&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(customersChan)&nbsp; &nbsp; }()&nbsp; &nbsp; wgC.Wait()}func&nbsp; syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer){&nbsp; &nbsp; for customer := range customerChan{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dateChan := make(chan string, 2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var wgD sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wgD.Add(2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for i := 1; i < 2; i++{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; test(dateChan, customer)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wgD.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func(){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for _, date := range GetAllYearDate(){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dateChan <- date&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(dateChan)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wgD.Wait()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}我认为这更容易理解,因为您可以看到 wg.Done() 被调用的位置。在两侧粘贴一些 fmt.Println 命令也非常容易,这使得调试此类问题变得更简单。
随时随地看视频慕课网APP

相关分类

Go
我要回答