Golang - 在 N 单位时间内执行 X 任务

我正在尝试模拟一个程序,它需要在 N 秒内执行任务中的 X 任务并丢弃其他工作请求


当收到 on 的值时,我试图timer在无限循环中使用 with select <-timer.C,我正在继续模拟任务,其中包含切片中的所有数据,这些数据受到保护maxLimit并再次将其重置timer为初始持续时间


这是代码


     type Pool struct {

            // The maximum number of items allowed in pool

            maxSize int

            //the queue to hold the data

            queue []interface{}

            // time window for this pool

            time time.Duration

            //timer for the batch

            timer *time.Timer

            //channel

            ch chan interface{}

        }

        

        func NewPool(maxSize int, t int32) *Pool {

            p := &Pool{

                maxSize: maxSize,

                size:    0,

                queue:   make([]interface{}, maxSize),

                time:    time.Duration(t * int32(time.Second)),

                timer:   time.NewTimer(time.Duration(t) * time.Second),

                ch:      make(chan interface{}),

            }

            go p.Schedule()

            return p

        }

        

        func (p *Pool) Add(ele interface{}) {

            p.ch <- ele

        }

        

        func (p *Pool) Schedule() {

            for {

                select {

                case <-p.timer.C:

                    fmt.Println("Time is over")

                    p.queue = make([]interface{}, 0)

                    p.timer.Reset(p.time)

                    p.flush()

                case data := <-p.ch:

                    if len(p.queue) < p.maxSize {

                        fmt.Println("Addding")

                        p.queue = append(p.queue, data)

        

                    }

                    //p.flush()

                    if !p.timer.Stop() {

                        <-p.timer.C

                    }

                    p.queue = make([]interface{}, 0)

        

                }

        

            }

        

        }

        




但这没有按预期工作我在这里遗漏了一些东西,你能指导我使用并发模式来满足这个要求吗,谢谢


噜噜哒
浏览 114回答 1
1回答

MMMHUHU

添加一些额外的输出可以帮助您定位问题:case data := <-p.ch:    fmt.Println("Got Data", len(p.queue), p.maxSize)    if len(p.queue) < p.maxSize {        p.queue = append(p.queue, data)    }    if !p.timer.Stop() {        fmt.Println("Draining")        <-p.timer.C    }    p.queue = make([]interface{}, 0)}使用该更改运行输出为:Got Data 5 5Got Data 0 5AdddingDraining因此正在处理两条消息(第一条不会输出,Adding因为len(p.queue)它是 5;这是因为您将其初始化为大小为 5 - make([]interface{}, maxSize))。考虑收到消息时代码在做什么:处理queue停止计时器翻拍queue现在从文档中获取timer.Stop():Stop 阻止 Timer 触发。如果调用停止计时器,它返回 true ,如果计时器已经过期或停止,则返回 false 。在第一次迭代中,它起作用了(计时器停止,如有必要,通道被耗尽)。但是,您在停止计时器后不会重置计时器,因此在第二次迭代时,当您调用时计时器已经停止p.timer.Stop()。这意味着Stop()它将返回false(计时器已经停止!)并且您尝试耗尽通道(但由于计时器已经停止,这将永远阻塞)。如何解决这个问题取决于你的目标;我怀疑你是想重置定时器?如果没有,你可以这样做:if p.timer.C != nil && !p.timer.Stop() {    fmt.Println("Draining")    <-p.timer.C}p.timer.C = nil // Timer has been stopped (nil channel will block forever)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go