从输入通道正确批处理项目

用例


我想在通过通道接收的 MySQL 数据库中保存大量数据。出于性能原因,我以 10 件为一组处理它们。我每 3 小时才收到一次输入项目。


问题


假设我得到 10004 个项目,将剩下 4 个项目,因为我的 go 例程在批量“冲走”之前等待 10 个项目。我想确保它创建一个少于 10 个项目的批次,以防该通道中没有更多项目(然后生产者也关闭了通道)。


代码:


// ProcessAudits sends the given audits in batches to SQL

func ProcessAudits(done <-chan bq.Audit) {

    var audits []bq.Audit

    for auditRow := range done {

        user := auditRow.UserID.StringVal

        log.Infof("Received audit %s", user)

        audits = append(audits, auditRow)


        if len(audits) == 10 {

            upsertBigQueryAudits(audits)

            audits = []bigquery.Audit{}

        }

    }

}

我是 Go 的新手,我不确定如何正确实施它?


繁星coding
浏览 121回答 2
2回答

红颜莎娜

这是一个工作示例。当通道关闭时,范围退出,因此您可以在循环后处理任何剩余的项目。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync")type Audit struct {&nbsp; &nbsp; ID int}func upsertBigQueryAudits(audits []Audit) {&nbsp; &nbsp; fmt.Printf("Processing batch of %d\n", len(audits))&nbsp; &nbsp; for _, a := range audits {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("%d ", a.ID)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println()}func processAudits(audits <-chan Audit, batchSize int) {&nbsp; &nbsp; var batch []Audit&nbsp; &nbsp; for audit := range audits {&nbsp; &nbsp; &nbsp; &nbsp; batch = append(batch, audit)&nbsp; &nbsp; &nbsp; &nbsp; if len(batch) == batchSize {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; upsertBigQueryAudits(batch)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; batch = []Audit{}&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; if len(batch) > 0 {&nbsp; &nbsp; &nbsp; &nbsp; upsertBigQueryAudits(batch)&nbsp; &nbsp; }}func produceAudits(x int, to chan Audit) {&nbsp; &nbsp; for i := 0; i < x; i++ {&nbsp; &nbsp; &nbsp; &nbsp; to <- Audit{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ID: i,&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}const batchSize = 10func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; audits := make(chan Audit)&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; processAudits(audits, batchSize)&nbsp; &nbsp; }()&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; produceAudits(25, audits)&nbsp; &nbsp; &nbsp; &nbsp; close(audits)&nbsp; &nbsp; }()&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println("Complete")}输出:Processing batch of 100 1 2 3 4 5 6 7 8 9Processing batch of 1010 11 12 13 14 15 16 17 18 19Processing batch of 520 21 22 23 24Complete

拉莫斯之舞

您也可以使用定时器。在这里玩例子https://play.golang.org/p/0atlGVCL-pxfunc printItems(items []int) {&nbsp; &nbsp; fmt.Println(items)&nbsp; &nbsp; return}func main() {&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; items := []int {1,2,3,4,5,6,7,8}&nbsp; &nbsp; ch := make(chan int, 5)&nbsp; &nbsp; go func(ch <-chan int) {&nbsp; &nbsp; &nbsp; &nbsp; timer := time.NewTimer(1 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; temp := make([]int, 0, 5)&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case i := <- ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timer.Reset(1 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp = append(temp, i)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if len(temp) == 5 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; printItems(temp)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp = []int {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <- timer.C:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; printItems(temp)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp = []int {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }(ch)&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; for k, i := range items {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Send ", i)&nbsp; &nbsp; &nbsp; &nbsp; ch <- i&nbsp; &nbsp; &nbsp; &nbsp; if k == 7 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(5 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go