如何同步慢速计算并缓存它?

在golang后端,我想为多个客户端提供一个值,让我们称之为分数。分数随时间而变化,计算速度很慢。计算不依赖于先前的结果。当没有客户时,我根本不想计算它。因此,计算应该只在有要求的情况下进行。但还有另一个事实 - 分数不能在5秒内改变。所以我尝试了不同的建议,一切都有其缺点:


在客户缺席的情况下进行昂贵的计算:

var score interface{}


// run in a separate goroutine

func calculateScorePeriodically() {

    for{

        select{

        case <-time.After(5*time.Second):

            score = calculateScoreExpensiveAndSlow()

        }

    }

}


func serveScore(w http.ResponseWriter, r* http.Request) {

    b, _ := json.Marshal(score)

    w.Write(b)

}

在很长的计算周期内阻止所有客户端(但实际上可能只是向它们提供旧数据)。而且你不能移动到互斥体之外,因为这样多个客户端可以同时进入计算块,并且不会在5秒间隔内进行计算,而是按顺序进行计算:if


var (

    score interface{}

    mutex sync.Mutex

    updatedAt time.Time

)


func getCachedScore() float64 {

    mutex.Lock()

    defer mutex.Unlock()

    currentTime := time.Now()

    if currentTime.Sub(updatedAt) < 5*time.Second {

        return score

    }

    updatedAt = currentTime

    score = calculateScoreExpensiveAndSlow()

    return score

}


func serveScore(w http.ResponseWriter, r* http.Request) {

    b, _ := json.Marshal(getCachedScore())

    w.Write(b)

}

如何解决上述两个缺点?


PS.我认为这是一个通用问题,也是一种模式 - 它有一个特殊的名字吗?


小怪兽爱吃肉
浏览 98回答 2
2回答

largeQ

可能有多种解决方案。一个简单的解决方案是使用指定的 goroutine 进行计算,您可以通过在通道上发送值来表示需要重新计算。发送可能是非阻塞的,因此,如果计算正在进行中,则不会发生任何事情。下面是一个可重用的缓存实现:type cache struct {&nbsp; &nbsp; mu&nbsp; &nbsp; &nbsp; sync.RWMutex&nbsp; &nbsp; value&nbsp; &nbsp;interface{}&nbsp; &nbsp; updated time.Time&nbsp; &nbsp; calcCh&nbsp; &nbsp; &nbsp;chan struct{}&nbsp; &nbsp; expiration time.Duration}func NewCache(calc func() interface{}, expiration time.Duration) *cache {&nbsp; &nbsp; c := &cache{&nbsp; &nbsp; &nbsp; &nbsp; value:&nbsp; &nbsp;calc(),&nbsp; &nbsp; &nbsp; &nbsp; updated: time.Now(),&nbsp; &nbsp; &nbsp; &nbsp; calcCh:&nbsp; make(chan struct{}),&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for range c.calcCh {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; v := calc()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.value, c.updated = v, time.Now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; return c}func (c *cache) Get() (value interface{}, updated time.Time) {&nbsp; &nbsp; c.mu.RLock()&nbsp; &nbsp; value, updated = c.value, c.updated&nbsp; &nbsp; c.mu.RUnlock()&nbsp; &nbsp; if time.Since(updated) > c.expiration {&nbsp; &nbsp; &nbsp; &nbsp; // Trigger a new calculation (will happen in another goroutine).&nbsp; &nbsp; &nbsp; &nbsp; // Do non-blocking send, if a calculation is in progress,&nbsp; &nbsp; &nbsp; &nbsp; // this will have no effect&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case c.calcCh <- struct{}{}:&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; return}func (c *cache) Stop() {&nbsp; &nbsp; close(c.calcCh)}注意:是停止背景戈鲁廷。呼叫后,不得调用。Cache.Stop()Cache.Stop()Cache.Get()将其用于您的情况:func getCachedScore() interface{} {&nbsp; &nbsp; // ...}var scoreCache = NewCache(getCachedScore, 5*time.Second)func serveScore(w http.ResponseWriter, r* http.Request) {&nbsp; &nbsp; score, _ := scoreCache.Get()&nbsp; &nbsp; b, _ := json.Marshal(score)&nbsp; &nbsp; w.Write(b)}

汪汪一只猫

这是我实现的,与icza的答案相关,但具有更多功能:package commonimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "sync/atomic"&nbsp; &nbsp; "time")type (&nbsp; &nbsp; CachedUpdater func() interface{}&nbsp; &nbsp; ChanStruct&nbsp; &nbsp; chan struct{})type Cached struct {&nbsp; &nbsp; value&nbsp; &nbsp; &nbsp; &nbsp; atomic.Value&nbsp; // holds the cached value's interface{}&nbsp; &nbsp; updatedAt&nbsp; &nbsp; atomic.Value&nbsp; // holds time.Time, time when last update sequence was started at&nbsp; &nbsp; updatePeriod time.Duration // controls minimal anount of time between updates&nbsp; &nbsp; needUpdate&nbsp; &nbsp;ChanStruct}//cachedUpdater is a user-provided function with long expensive calculation, that gets current statefunc MakeCached(ctx context.Context, updatePeriod time.Duration, cachedUpdater CachedUpdater) *Cached {&nbsp; &nbsp; v := &Cached{&nbsp; &nbsp; &nbsp; &nbsp; updatePeriod: updatePeriod,&nbsp; &nbsp; &nbsp; &nbsp; needUpdate:&nbsp; &nbsp;make(ChanStruct),&nbsp; &nbsp; }&nbsp; &nbsp; //v.updatedAt.Store(time.Time{}) // "was never updated", but time should never be nil interface&nbsp; &nbsp; v.doUpdate(time.Now(), cachedUpdater)&nbsp; &nbsp; go v.updaterController(ctx, cachedUpdater)&nbsp; &nbsp; return v}//client will get cached value immediately, and optionally may trigger an update, if value is outdatedfunc (v *Cached) Get() interface{} {&nbsp; &nbsp; if v.IsExpired(time.Now()) {&nbsp; &nbsp; &nbsp; &nbsp; v.RequestUpdate()&nbsp; &nbsp; }&nbsp; &nbsp; return v.value.Load()}//updateController goroutine can be terminated both by cancelling context, provided to maker, or by closing chanfunc (v *Cached) Stop() {&nbsp; &nbsp; close(v.needUpdate)}//returns true if value is outdated and updater function was likely not called yetfunc (v *Cached) IsExpired(currentTime time.Time) bool {&nbsp; &nbsp; updatedAt := v.updatedAt.Load().(time.Time)&nbsp; &nbsp; return currentTime.Sub(updatedAt) > v.updatePeriod}//requests updaterController to perform update, using non-blocking send to unbuffered chan. controller can decide not to update in case if it has recently updated valuefunc (v *Cached) RequestUpdate() bool {&nbsp; &nbsp; select {&nbsp; &nbsp; case v.needUpdate <- struct{}{}:&nbsp; &nbsp; &nbsp; &nbsp; return true&nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; return false&nbsp; &nbsp; }}func (v *Cached) updaterController(ctx context.Context, cachedUpdater CachedUpdater) {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case _, ok := <-v.needUpdate:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; currentTime := time.Now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !v.IsExpired(currentTime) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; v.doUpdate(currentTime, cachedUpdater)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func (v *Cached) doUpdate(currentTime time.Time, cachedUpdater CachedUpdater) {&nbsp; &nbsp; v.updatedAt.Store(currentTime)&nbsp; &nbsp; v.value.Store(cachedUpdater())}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go