如何跨 goroutines 共享地图

我正在尝试在 Go 中编写一个通知结构,它将保存一系列键及其各自的值,并且如果值低于阈值将触发通知。

当第一个样本低于阈值时,通知应该只触发一次,并且低于该值的其他样本不应再次触发,直到值升至阈值以上。

例如,假设我的阈值是 10,我发送了 15、14、11、10、... 9 的样本。发送 9 后,应触发通知。8、7、4 的进一步样本不应造成任何影响。以下样本如 5、6、7、9、10、11、14、30 不应执行任何操作。一旦样本再次低于 10:30、20、15、10、7... 必须发送另一个通知。

当多个 goroutines 操纵我的结构时,我遇到了问题。

我尝试使用 sync.Mutex 进行同步,还使用了 sync.Map,但没有成功。我觉得某处有参考副本或缓存,但我对 Go 太陌生了,无法找到问题所在。

为此,我创建了一个这样的结构:

type Notifier interface {

    Send(message string)

}


type NotificationBoard struct {

    mutex    sync.Mutex

    Last     sync.Map

    notifier Notifier

}


func (n *NotificationBoard) Init(notifier Notifier) {

    n.notifier = notifier

}


// NotifyLess ...

func (n *NotificationBoard) NotifyLess(key string, value, threshold float64) {

    n.mutex.Lock()

    defer n.mutex.Unlock()


    if value >= threshold {

        fmt.Printf("NotificationBoard.NotifyLess %v (value >= threshold): %v >= %v\n", key, value, threshold)

        n.Last.Store(key, value)

        return

    }


    // value < threshold

    if last, found := n.Last.Load(key); found == true {

        fmt.Printf("NotificationBoard.NotifyLess %v (value < threshold): %v < %v : found %v\n", key, value, threshold, last)

        if last.(float64) >= threshold { // first trigger

            n.notifier.Send(fmt.Sprintf("%s < %v (%v)", key, threshold, value))

        }

    } else {

        fmt.Printf("NotificationBoard.NotifyLess %v (value < threshold): %v < %v : not found\n", key, value, threshold)

        // not found, started board as less

        n.notifier.Send(fmt.Sprintf("%s < %v (%v)", key, threshold, value))

    }


    n.Last.Store(key, value)

    return

}

我知道使用 sync.Mutex 或 sync.Map 应该就足够了,但上面的代码两者都有,因为它是我当前的(损坏的)版本。


为了测试,我设置了以下代码:


type dummy struct{}


func (d *dummy) Send(message string) {

    fmt.Println("--------------> notifying", message)

}


func newBoard() *NotificationBoard {

    notificationBoard := &NotificationBoard{}

    notificationBoard.Init(&dummy{})

    return notificationBoard

}


犯罪嫌疑人X
浏览 149回答 2
2回答

噜噜哒

在这里简化逻辑的一种方法可能是运行一个修改地图的 goroutine。然后,它可以监听通道上的新值(因为如果按顺序处理值应该没问题)。您需要小心知道您的 goroutine 何时返回以确保它不会泄漏。一般来说,你不应该在 goroutines 之间共享数据,你应该使用通道在 goroutines 之间进行通信。这是一个示例,说明如何使用通道而不是共享内存(游乐场版本)来实现此类应用程序。package mainimport (    "fmt"    "sync")type value struct {    key       string    value     float64    threshold float64}func main() {    b := board{        last: map[string]float64{},    }    c := b.start()    wg := sync.WaitGroup{}    for i := 0; i < 30; i++ {        wg.Add(1)        go func(i int) {            for j := 15.0; j > 5; j-- {                c <- value{"k1", j + float64(i+1)/100, 10}            }            wg.Done()        }(i)    }    wg.Wait()    close(c)}type board struct {    last map[string]float64}func (b *board) start() chan<- value {    c := make(chan value)    go func() {        for v := range c {            b.notify(v)        }    }()    return c}func (b *board) notify(v value) {    if l, ok := b.last[v.key]; !ok || l >= v.threshold {        if v.value < v.threshold {            fmt.Printf("%s < %v (%v)\n", v.key, v.threshold, v.value)        }    }    b.last[v.key] = v.value}

当年话下

我认为您在设置此类跟踪器时需要设置标志,一个用于价值上升,另一个用于价值下降。我实施了一个package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync")const (&nbsp; &nbsp; threshold&nbsp; &nbsp; &nbsp; int = 10&nbsp; &nbsp; upperThreshold int = 20)var mu sync.Mutexvar downwatch boolvar upwatch boolfunc main() {&nbsp; &nbsp; var tracker int = 10&nbsp; &nbsp; var temp int = 1&nbsp; &nbsp; var sign int = 1&nbsp; &nbsp; for i := 1; i < 20; i++ {&nbsp; &nbsp; &nbsp; &nbsp; sign = sign * -1&nbsp; &nbsp; &nbsp; &nbsp; temp = temp + i&nbsp; &nbsp; &nbsp; &nbsp; go UpdateTracker(&tracker, temp*sign)&nbsp; &nbsp; }&nbsp; &nbsp; for {&nbsp; &nbsp; }&nbsp; &nbsp; return}func SetDownWatch() {&nbsp; &nbsp; downwatch = true}func SetUpWatch() {&nbsp; &nbsp; upwatch = true}func UnSetDownWatch() {&nbsp; &nbsp; downwatch = false}func UnSetUpWatch() {&nbsp; &nbsp; upwatch = false}func UpdateTracker(tracker *int, val int) {&nbsp; &nbsp; mu.Lock()&nbsp; &nbsp; defer mu.Unlock()&nbsp; &nbsp; if !(upwatch || downwatch) {&nbsp; &nbsp; &nbsp; &nbsp; if (*tracker)+val < threshold {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotifyOnDrop()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SetDownWatch()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if (*tracker + val) > upperThreshold {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotifyOnRise()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SetUpWatch()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; if (*tracker)+val < threshold && upwatch {&nbsp; &nbsp; &nbsp; &nbsp; NotifyOnDrop()&nbsp; &nbsp; &nbsp; &nbsp; SetDownWatch()&nbsp; &nbsp; &nbsp; &nbsp; UnSetUpWatch()&nbsp; &nbsp; }&nbsp; &nbsp; if (*tracker+val) > upperThreshold && downwatch {&nbsp; &nbsp; &nbsp; &nbsp; NotifyOnRise()&nbsp; &nbsp; &nbsp; &nbsp; SetUpWatch()&nbsp; &nbsp; &nbsp; &nbsp; UnSetDownWatch()&nbsp; &nbsp; }&nbsp; &nbsp; *tracker = (*tracker) + val&nbsp; &nbsp; fmt.Println((*tracker))&nbsp; &nbsp; return}func NotifyOnDrop() {&nbsp; &nbsp; fmt.Println("dropped")&nbsp; &nbsp; return}func NotifyOnRise() {&nbsp; &nbsp; fmt.Println("rose")&nbsp; &nbsp; return}updateTracker当值超过设置的阈值时,作为 go 例程运行并打印到控制台。我认为这就是您正在寻找的功能,这里缺少的是Last.Store我认为是您的代码自定义的功能。我相信还有其他方法可以处理这个问题。这对我来说似乎很简单。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go