在顺序执行之前等待通道中的 N 个项目

所以我很新去!但是我对我想尝试的事情有这个想法。

我想要一个从通道接受字符串的 go 例程,但只有在它收到 N 个字符串后才应该对它们执行。

我四处寻找类似的问题或案例,但我只发现了一些想法是并行执行多个例程并等待汇总结果。

我想到了创建一个数组并将其传递给长度足够的例程的想法。但是我想保持一定的关注点分离并在接收端控制它。

我的问题是。

  1. 这是出于某种原因的不良做法吗?

  2. 有没有更好的方法来做到这一点,它是什么?

func main() {

    ch := make(chan string)

    go func() {

        tasks := []string{}

        for {

            tasks = append(tasks,<- ch)


            if len(tasks) < 3 {

                fmt.Println("Queue still to small")

            }

            if len(tasks) > 3 {

                for i := 0; i < len(tasks); i++ {

                    fmt.Println(tasks[i])

                }

            }

        }

    }()


    ch <- "Msg 1"

    time.Sleep(time.Second)

    ch <- "Msg 2"

    time.Sleep(time.Second)

    ch <- "Msg 3"

    time.Sleep(time.Second)

    ch <- "Msg 4"

    time.Sleep(time.Second)

}

编辑更简单更准确的例子。


慕哥9229398
浏览 90回答 2
2回答

慕娘9325324

根据一些评论,您正在寻找的似乎是某种形式的批处理。批处理有几种情况,当您想要获取批处理并将其一起发送时:批量大小足够已经过了足够的时间,应该冲洗部分批次您给出的示例不考虑第二种情况。如果您只是因为停止加载而从不冲水,这可能会导致一些尴尬的行为。因此,我建议要么查看库(例如,cloudfoundry/go-batching),要么简单地使用通道、计时器和选择语句。package mainimport (    "fmt"    "time")func main() {    ch := make(chan string)    go func() {        tasks := []string{}        timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience        for {            select {            case <-timer.C:                fmt.Println("Flush partial batch due to time")                flush(tasks)                tasks = nil                timer.Reset(time.Second)            case data := <-ch:                tasks = append(tasks, data)                // Reset the timer for each data point so that we only flush                // partial batches when we stop receiving data.                if !timer.Stop() {                    <-timer.C                }                timer.Reset(time.Second)                // Guard clause to for batch size                if len(tasks) < 3 {                    fmt.Println("Queue still too small")                    continue                }                flush(tasks)                tasks = nil // reset tasks            }        }    }()    ch <- "Msg 1"    time.Sleep(time.Second)    ch <- "Msg 2"    time.Sleep(time.Second)    ch <- "Msg 3"    time.Sleep(time.Second)    ch <- "Msg 4"    time.Sleep(time.Second)}func flush(tasks []string) {    // Guard against emtpy flushes    if len(tasks) == 0 {        return    }    fmt.Println("Flush")    for _, t := range tasks {        fmt.Println(t)    }}

萧十郎

我可以看到批处理结果的东西是如何有用的。但它确实需要定制解决方案。有很多方法可以解决这个问题——我试过使用Sync.WaitGroup但它变得很乱。似乎使用 async.Mutex来锁定批处理功能是最好的方法。但是,当 mutex 是最好的答案时,imo 应该触发对设计的重新检查,因为 imo,它应该是最后一个选项。package mainimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "sync/atomic")func main() {&nbsp; &nbsp; ctx, canc := context.WithCancel(context.Background())&nbsp; &nbsp; acc := NewAccumulator(4, ctx)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; acc.Write("hi")&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; canc()&nbsp; &nbsp; }()&nbsp; &nbsp; read := acc.ReadChan()&nbsp; &nbsp; for batch := range read {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(batch)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("done")}type Accumulator struct {&nbsp; &nbsp; count&nbsp; &nbsp; int64&nbsp; &nbsp; size&nbsp; &nbsp; &nbsp;int&nbsp; &nbsp; in&nbsp; &nbsp; &nbsp; &nbsp;chan string&nbsp; &nbsp; out&nbsp; &nbsp; &nbsp; chan []string&nbsp; &nbsp; ctx&nbsp; &nbsp; &nbsp; context.Context&nbsp; &nbsp; doneFlag int64&nbsp; &nbsp; mu&nbsp; &nbsp;sync.Mutex}func NewAccumulator(size int, parentCtx context.Context) *Accumulator {&nbsp; &nbsp; a := &Accumulator{&nbsp; &nbsp; &nbsp; &nbsp; size: size,&nbsp; &nbsp; &nbsp; &nbsp; in:&nbsp; &nbsp;make(chan string, size),&nbsp; &nbsp; &nbsp; &nbsp; out:&nbsp; make(chan []string, 1),&nbsp; &nbsp; &nbsp; &nbsp; ctx:&nbsp; parentCtx,&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; <-a.ctx.Done()&nbsp; &nbsp; &nbsp; &nbsp; atomic.AddInt64(&a.doneFlag, 1)&nbsp; &nbsp; &nbsp; &nbsp; close(a.in)&nbsp; &nbsp; &nbsp; &nbsp; a.mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; a.batch()&nbsp; &nbsp; &nbsp; &nbsp; a.mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; close(a.out)&nbsp; &nbsp; }()&nbsp; &nbsp; return a}func (a *Accumulator) Write(s string) {&nbsp; &nbsp; if atomic.LoadInt64(&a.doneFlag) > 0 {&nbsp; &nbsp; &nbsp; &nbsp; panic("write to closed accumulator")&nbsp; &nbsp; }&nbsp; &nbsp; a.in <- s&nbsp; &nbsp; atomic.AddInt64(&a.count, 1)&nbsp; &nbsp; a.mu.Lock()&nbsp; &nbsp; if atomic.LoadInt64(&a.count) == int64(a.size) {&nbsp; &nbsp; &nbsp; &nbsp; a.batch()&nbsp; &nbsp; }&nbsp; &nbsp; a.mu.Unlock()}func (a *Accumulator) batch() {&nbsp; &nbsp; batch := make([]string, 0)&nbsp; &nbsp; for i := 0; i < a.size; i++ {&nbsp; &nbsp; &nbsp; &nbsp; msg := <-a.in&nbsp; &nbsp; &nbsp; &nbsp; if msg != "" {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; batch = append(batch, msg)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("batching", batch)&nbsp; &nbsp; a.out <- batch&nbsp; &nbsp; atomic.StoreInt64(&a.count, 0)}func (a *Accumulator) ReadChan() <-chan []string {&nbsp; &nbsp; return a.out}最好只拥有一个累积字符串的切片,当该切片达到一定大小时,然后开始一些处理。
打开App,查看更多内容
随时随地看视频慕课网APP