猿问

Golang 在 goroutine 之间共享大量数据

我需要从另一个 goroutine 读取结构字段集,afaik 直接这样做,即使确定不会有并发访问(在读取发生之前写入完成,通过 发出信号chan struct{})可能会导致数据陈旧

考虑到我可以保证没有并发访问,发送指向结构的指针(在第一个 goroutine 中创建,在第二个 goroutine 中修改,由第三个读取)会解决可能的过时问题吗?

我想避免复制,因为结构很大并且包含填充在第二个 goroutine 中的巨大 Bytes.Buffer,我需要从第三个读取

有一个锁定选项,但考虑到我知道不会有并发访问,这似乎有点过头了


红颜莎娜
浏览 231回答 2
2回答

扬帆大鱼

这个有很多答案,这取决于你的数据结构和程序逻辑。请参阅:&nbsp;如何在并发 goroutines 期间锁定/同步对 Go 中变量的访问?以及:&nbsp;如何在 Golang 中使用 RWMutex?1- 使用&nbsp;有状态的 Goroutine和通道2- 使用sync.Mutex3- 使用同步/原子4- 使用 WaitGroup5- 使用程序逻辑(信号量)...1:有状态的 Goroutines和通道:我模拟了非常相似的示例(假设您想从一个 SSD 读取并以不同的速度写入另一个 SSD):在这个示例代码中,一个 goroutine(名为 write)做了一些工作准备数据并填充大struct,另一个 goroutine(名为 read)从 big struct 读取数据然后做一些工作,而 manger goroutine 保证不会并发访问相同的数据。三个 goroutine 之间的通信是通过通道完成的。在您的情况下,您可以将指针用于通道数据,或像此示例这样的全局结构。输出将是这样的:mean= 36.6920166015625 stdev= 6.068973186592054我希望这可以帮助您了解这个想法。工作示例代码:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "math"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type BigStruct struct {&nbsp; &nbsp; big&nbsp; &nbsp; &nbsp;[]uint16&nbsp; &nbsp; rpos&nbsp; &nbsp; int&nbsp; &nbsp; wpos&nbsp; &nbsp; int&nbsp; &nbsp; full&nbsp; &nbsp; bool&nbsp; &nbsp; empty&nbsp; &nbsp;bool&nbsp; &nbsp; stopped bool}func main() {&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go write()&nbsp; &nbsp; go read()&nbsp; &nbsp; go manage()&nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; stopCh <- <-time.After(5 * time.Second)&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; mean := Mean(hist)&nbsp; &nbsp; stdev := stdDev(hist, mean)&nbsp; &nbsp; fmt.Println("mean=", mean, "stdev=", stdev)}const N = 1024 * 1024 * 1024var wg sync.WaitGroupvar stopCh chan time.Time = make(chan time.Time)var hist []int = make([]int, 65536)var s *BigStruct = &BigStruct{empty: true,&nbsp; &nbsp; big: make([]uint16, N), //2GB}var rc chan uint16 = make(chan uint16)var wc chan uint16 = make(chan uint16)func next(pos int) int {&nbsp; &nbsp; pos++&nbsp; &nbsp; if pos >= N {&nbsp; &nbsp; &nbsp; &nbsp; pos = 0&nbsp; &nbsp; }&nbsp; &nbsp; return pos}func manage() {&nbsp; &nbsp; dataReady := false&nbsp; &nbsp; var data uint16&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; if !dataReady && !s.empty {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dataReady = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data = s.big[s.rpos]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.rpos++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if s.rpos >= N {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.rpos = 0&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.empty = s.rpos == s.wpos&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.full = next(s.wpos) == s.rpos&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if dataReady {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case rc <- data:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dataReady = false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if !s.full {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case d := <-wc:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.big[s.wpos] = d&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.wpos++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if s.wpos >= N {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.wpos = 0&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.empty = s.rpos == s.wpos&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.full = next(s.wpos) == s.rpos&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if s.stopped {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if s.empty {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func read() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; d := <-rc&nbsp; &nbsp; &nbsp; &nbsp; hist[d]++&nbsp; &nbsp; }}func write() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; wc <- uint16(rand.Intn(65536))&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-stopCh:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.stopped = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func stdDev(data []int, mean float64) float64 {&nbsp; &nbsp; sum := 0.0&nbsp; &nbsp; for _, d := range data {&nbsp; &nbsp; &nbsp; &nbsp; sum += math.Pow(float64(d)-mean, 2)&nbsp; &nbsp; }&nbsp; &nbsp; variance := sum / float64(len(data)-1)&nbsp; &nbsp; return math.Sqrt(variance)}func Mean(data []int) float64 {&nbsp; &nbsp; sum := 0.0&nbsp; &nbsp; for _, d := range data {&nbsp; &nbsp; &nbsp; &nbsp; sum += float64(d)&nbsp; &nbsp; }&nbsp; &nbsp; return sum / float64(len(data))}5:某些用例的另一种方式(更快):这里使用共享数据结构的另一种方式来读取作业/写入作业/处理作业,它在第一篇文章中被分开,现在这里做同样的 3 个没有通道和没有互斥体的工作。工作样本:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "math"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "time")type BigStruct struct {&nbsp; &nbsp; big&nbsp; &nbsp; &nbsp;[]uint16&nbsp; &nbsp; rpos&nbsp; &nbsp; int&nbsp; &nbsp; wpos&nbsp; &nbsp; int&nbsp; &nbsp; full&nbsp; &nbsp; bool&nbsp; &nbsp; empty&nbsp; &nbsp;bool&nbsp; &nbsp; stopped bool}func manage() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; if !s.empty {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; hist[s.big[s.rpos]]++ //sample read job with any time len&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextPtr(&s.rpos)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if !s.full && !s.stopped {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextPtr(&s.wpos)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if s.stopped {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if s.empty {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.stopped = time.Since(t0) >= 5*time.Second&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; t0 = time.Now()&nbsp; &nbsp; manage()&nbsp; &nbsp; mean := Mean(hist)&nbsp; &nbsp; stdev := StdDev(hist, mean)&nbsp; &nbsp; fmt.Println("mean=", mean, "stdev=", stdev)&nbsp; &nbsp; d0 := time.Since(t0)&nbsp; &nbsp; fmt.Println(d0) //5.8523347s}var t0 time.Timeconst N = 100 * 1024 * 1024var hist []int = make([]int, 65536)var s *BigStruct = &BigStruct{empty: true,&nbsp; &nbsp; big: make([]uint16, N), //2GB}func next(pos int) int {&nbsp; &nbsp; pos++&nbsp; &nbsp; if pos >= N {&nbsp; &nbsp; &nbsp; &nbsp; pos = 0&nbsp; &nbsp; }&nbsp; &nbsp; return pos}func nextPtr(pos *int) {&nbsp; &nbsp; *pos++&nbsp; &nbsp; if *pos >= N {&nbsp; &nbsp; &nbsp; &nbsp; *pos = 0&nbsp; &nbsp; }&nbsp; &nbsp; s.empty = s.rpos == s.wpos&nbsp; &nbsp; s.full = next(s.wpos) == s.rpos}func StdDev(data []int, mean float64) float64 {&nbsp; &nbsp; sum := 0.0&nbsp; &nbsp; for _, d := range data {&nbsp; &nbsp; &nbsp; &nbsp; sum += math.Pow(float64(d)-mean, 2)&nbsp; &nbsp; }&nbsp; &nbsp; variance := sum / float64(len(data)-1)&nbsp; &nbsp; return math.Sqrt(variance)}func Mean(data []int) float64 {&nbsp; &nbsp; sum := 0.0&nbsp; &nbsp; for _, d := range data {&nbsp; &nbsp; &nbsp; &nbsp; sum += float64(d)&nbsp; &nbsp; }&nbsp; &nbsp; return sum / float64(len(data))}

largeQ

为了防止在保留读取能力的同时对结构进行并发修改,您通常会嵌入一个sync.RWMutex。这不是豁免。您可以在传输过程中简单地锁定结构以进行写入,并在您方便的时间点将其解锁。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")// Big simulates your big structtype Big struct {&nbsp; &nbsp; sync.RWMutex&nbsp; &nbsp; value string}// pump uses a groutine to take the slice of pointers to Big,// locks the underlying structs and sends the pointers to// the locked instances of Big downstreamfunc pump(bigs []*Big) chan *Big {&nbsp; &nbsp; // We make the channel buffered for this example&nbsp; &nbsp; // for illustration purposes&nbsp; &nbsp; c := make(chan *Big, 3)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for _, big := range bigs {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // We lock the struct before sending it to the channel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // so it can not be changed via pointer while in transit&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; big.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c <- big&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(c)&nbsp; &nbsp; }()&nbsp; &nbsp; return c}// sink reads pointers to the locked instances of Big// reads them and unlocks themfunc sink(c chan *Big) {&nbsp; &nbsp; for big := range c {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(big.value)&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; big.Unlock()&nbsp; &nbsp; }}// modify tries to achieve locks to the instances and modify themfunc modify(bigs []*Big) {&nbsp; &nbsp; for _, big := range bigs {&nbsp; &nbsp; &nbsp; &nbsp; big.Lock()&nbsp; &nbsp; &nbsp; &nbsp; big.value = "modified"&nbsp; &nbsp; &nbsp; &nbsp; big.Unlock()&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}&nbsp; &nbsp; c := pump(bigs)&nbsp; &nbsp; // For the sake of this example, we wait until all entries are&nbsp; &nbsp; // send into the channel and hence are locked&nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; // Now we try to modify concurrently before we even start to read&nbsp; &nbsp; // the struct of which the pointers were sent into the channel&nbsp; &nbsp; go modify(bigs)&nbsp; &nbsp; sink(c)&nbsp; &nbsp; // We use sleep here to keep waiting for modify() to finish simple.&nbsp; &nbsp; // Usually, you'd use a sync.waitGroup&nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; for _, big := range bigs {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(big.value)&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Go
我要回答