扬帆大鱼
这个有很多答案,这取决于你的数据结构和程序逻辑。请参阅: 如何在并发 goroutines 期间锁定/同步对 Go 中变量的访问?以及: 如何在 Golang 中使用 RWMutex?1- 使用 有状态的 Goroutine和通道2- 使用sync.Mutex3- 使用同步/原子4- 使用 WaitGroup5- 使用程序逻辑(信号量)...1:有状态的 Goroutines和通道:我模拟了非常相似的示例(假设您想从一个 SSD 读取并以不同的速度写入另一个 SSD):在这个示例代码中,一个 goroutine(名为 write)做了一些工作准备数据并填充大struct,另一个 goroutine(名为 read)从 big struct 读取数据然后做一些工作,而 manger goroutine 保证不会并发访问相同的数据。三个 goroutine 之间的通信是通过通道完成的。在您的情况下,您可以将指针用于通道数据,或像此示例这样的全局结构。输出将是这样的:mean= 36.6920166015625 stdev= 6.068973186592054我希望这可以帮助您了解这个想法。工作示例代码:package mainimport ( "fmt" "math" "math/rand" "runtime" "sync" "time")type BigStruct struct { big []uint16 rpos int wpos int full bool empty bool stopped bool}func main() { wg.Add(1) go write() go read() go manage() runtime.Gosched() stopCh <- <-time.After(5 * time.Second) wg.Wait() mean := Mean(hist) stdev := stdDev(hist, mean) fmt.Println("mean=", mean, "stdev=", stdev)}const N = 1024 * 1024 * 1024var wg sync.WaitGroupvar stopCh chan time.Time = make(chan time.Time)var hist []int = make([]int, 65536)var s *BigStruct = &BigStruct{empty: true, big: make([]uint16, N), //2GB}var rc chan uint16 = make(chan uint16)var wc chan uint16 = make(chan uint16)func next(pos int) int { pos++ if pos >= N { pos = 0 } return pos}func manage() { dataReady := false var data uint16 for { if !dataReady && !s.empty { dataReady = true data = s.big[s.rpos] s.rpos++ if s.rpos >= N { s.rpos = 0 } s.empty = s.rpos == s.wpos s.full = next(s.wpos) == s.rpos } if dataReady { select { case rc <- data: dataReady = false default: runtime.Gosched() } } if !s.full { select { case d := <-wc: s.big[s.wpos] = d s.wpos++ if s.wpos >= N { s.wpos = 0 } s.empty = s.rpos == s.wpos s.full = next(s.wpos) == s.rpos default: runtime.Gosched() } } if s.stopped { if s.empty { wg.Done() return } } }}func read() { for { d := <-rc hist[d]++ }}func write() { for { wc <- uint16(rand.Intn(65536)) select { case <-stopCh: s.stopped = true return default: runtime.Gosched() } }}func stdDev(data []int, mean float64) float64 { sum := 0.0 for _, d := range data { sum += math.Pow(float64(d)-mean, 2) } variance := sum / float64(len(data)-1) return math.Sqrt(variance)}func Mean(data []int) float64 { sum := 0.0 for _, d := range data { sum += float64(d) } return sum / float64(len(data))}5:某些用例的另一种方式(更快):这里使用共享数据结构的另一种方式来读取作业/写入作业/处理作业,它在第一篇文章中被分开,现在这里做同样的 3 个没有通道和没有互斥体的工作。工作样本:package mainimport ( "fmt" "math" "math/rand" "time")type BigStruct struct { big []uint16 rpos int wpos int full bool empty bool stopped bool}func manage() { for { if !s.empty { hist[s.big[s.rpos]]++ //sample read job with any time len nextPtr(&s.rpos) } if !s.full && !s.stopped { s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len nextPtr(&s.wpos) } if s.stopped { if s.empty { return } } else { s.stopped = time.Since(t0) >= 5*time.Second } }}func main() { t0 = time.Now() manage() mean := Mean(hist) stdev := StdDev(hist, mean) fmt.Println("mean=", mean, "stdev=", stdev) d0 := time.Since(t0) fmt.Println(d0) //5.8523347s}var t0 time.Timeconst N = 100 * 1024 * 1024var hist []int = make([]int, 65536)var s *BigStruct = &BigStruct{empty: true, big: make([]uint16, N), //2GB}func next(pos int) int { pos++ if pos >= N { pos = 0 } return pos}func nextPtr(pos *int) { *pos++ if *pos >= N { *pos = 0 } s.empty = s.rpos == s.wpos s.full = next(s.wpos) == s.rpos}func StdDev(data []int, mean float64) float64 { sum := 0.0 for _, d := range data { sum += math.Pow(float64(d)-mean, 2) } variance := sum / float64(len(data)-1) return math.Sqrt(variance)}func Mean(data []int) float64 { sum := 0.0 for _, d := range data { sum += float64(d) } return sum / float64(len(data))}
largeQ
为了防止在保留读取能力的同时对结构进行并发修改,您通常会嵌入一个sync.RWMutex。这不是豁免。您可以在传输过程中简单地锁定结构以进行写入,并在您方便的时间点将其解锁。package mainimport ( "fmt" "sync" "time")// Big simulates your big structtype Big struct { sync.RWMutex value string}// pump uses a groutine to take the slice of pointers to Big,// locks the underlying structs and sends the pointers to// the locked instances of Big downstreamfunc pump(bigs []*Big) chan *Big { // We make the channel buffered for this example // for illustration purposes c := make(chan *Big, 3) go func() { for _, big := range bigs { // We lock the struct before sending it to the channel // so it can not be changed via pointer while in transit big.Lock() c <- big } close(c) }() return c}// sink reads pointers to the locked instances of Big// reads them and unlocks themfunc sink(c chan *Big) { for big := range c { fmt.Println(big.value) time.Sleep(1 * time.Second) big.Unlock() }}// modify tries to achieve locks to the instances and modify themfunc modify(bigs []*Big) { for _, big := range bigs { big.Lock() big.value = "modified" big.Unlock() }}func main() { bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}} c := pump(bigs) // For the sake of this example, we wait until all entries are // send into the channel and hence are locked time.Sleep(1 * time.Second) // Now we try to modify concurrently before we even start to read // the struct of which the pointers were sent into the channel go modify(bigs) sink(c) // We use sleep here to keep waiting for modify() to finish simple. // Usually, you'd use a sync.waitGroup time.Sleep(1 * time.Second) for _, big := range bigs { fmt.Println(big.value) }}