有没有办法并行化时间。睡眠但保持围棋的有效执行时间?

我正在开发一个慢速查询日志解析器包,该包与golang中的慢速查询日志重放器相关联。对于重修器,请提供以下代码段(为了便于阅读,我在其中添加了注释):


for {

        // method from my package that returns a Query object, containing headers values

        // and the query itself

        q := p.GetNext()

        if q == (query.Query{}) {

            break

        }

        db.logger.Tracef("query: %s", q.Query)


        // we send the SQL query to a chan that is read by workers.

        // workers just execute the query on the database, that's all.

        // results from the db query are handled in another piece of the code, it doesn't really

        // matter here

        queries <- q.Query


        // We need a reference time

        if firstPass {

            firstPass = false

            previousDate = q.Time

            continue

        }

        

        // Time field contains the Time: field value in the query header

        now := q.Time

        sleeping := now.Sub(previousDate)

        db.logger.Tracef("next sleeping time: %s", sleeping)

        time.Sleep(sleeping) // Here is my issue. For MariaDB the value is < 0 so no sleep is done


        // For MariaDB, when there is multiple queries in a short amount of

        // time, the Time field is not repeated, so we do not have to update

        // the previous date.

        if now != (time.Time{}) {

            previousDate = now

        }

    }

我遇到了一个有趣的问题:在MariaDB慢速查询日志中,如果2个(或更多)查询彼此接近,则标头中没有Time:字段,这减少了上一个代码片段中的数量。但是,对于MySQL风格的慢速查询日志,查询标头中始终存在Time:字段,这意味着每个查询的睡眠都已完成(即使对于μs睡眠持续时间)。time.Sleep(sleeping)


我注意到MariaDB和MySQL日志之间存在巨大的重播时间差;MariaDB重放持续时间与实时时间非常相似(日志文件的第一个和最后一个查询之间的时间差),但另一方面,MySQL重放时间比IRL高得多。玩后,我注意到问题来自,特别是从中耗费了CPU时间。pproftime.Sleepruntime.Futex


我做了一些基准测试,持续时间结果与完成的数量相关(MySQL比MariaDB高)。time.Sleep


因此,我不是在单个线程中完成所有操作,而是寻找一种不同的方法来并行执行它们而不改变有效时间,但我无法找到一种方法来做到这一点。time.Sleep


www说
浏览 109回答 1
1回答

守候你守候我

我提出的解决方案如下:type job struct {&nbsp; &nbsp; query string&nbsp; &nbsp; idle&nbsp; time.Time}...&nbsp; &nbsp; var reference time.Time&nbsp; &nbsp; start := time.Now()&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; q := p.GetNext()&nbsp; &nbsp; &nbsp; &nbsp; if q == (query.Query{}) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.Stop()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; db.logger.Tracef("query: %s", q.Query)&nbsp; &nbsp; &nbsp; &nbsp; r.queries++&nbsp; &nbsp; &nbsp; &nbsp; s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)&nbsp; &nbsp; &nbsp; &nbsp; // We need a reference time&nbsp; &nbsp; &nbsp; &nbsp; if firstPass {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; firstPass = false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; reference = q.Time&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; var j job&nbsp; &nbsp; &nbsp; &nbsp; delta := q.Time.Sub(reference)&nbsp; &nbsp; &nbsp; &nbsp; j.idle = start.Add(delta)&nbsp; &nbsp; &nbsp; &nbsp; j.query = q.Query&nbsp; &nbsp; &nbsp; &nbsp; db.logger.Tracef("next sleeping time: %s", j.idle)&nbsp; &nbsp; &nbsp; &nbsp; jobs <- j&nbsp; &nbsp; }...func (db database) worker(jobs chan job, errors chan error, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; j, ok := <-jobs&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.logger.Trace("channel closed, worker exiting")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; sleep := time.Until(j.idle)&nbsp; &nbsp; &nbsp; &nbsp; if sleep > 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(sleep)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; rows, err := db.drv.Query(j.query)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; errors <- err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if rows != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rows.Close()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}解释:我们将程序的开头保存在一个变量中(此处 )。接下来,我们设置一个引用时间(in ),这是慢速查询日志文件的第一个时间戳。它永远不会改变。startreference然后,在每个新查询中,我们计算 与当前查询时间戳 之间的持续时间。让我们将其存储在 .referenceq.Timedelta我们添加到时间线中,并且有一个时间戳(而不是像过去那样在慢速查询日志文件中)。我们将此时间戳发送到我们创建的新结构中查询旁边的工作线程,该结构名为 。deltastartjob当工作人员通过通道收到作业时,他会计算等待的时间,直到他可以执行查询。如果它<= 0,它将立即执行查询,否则他将等待。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go