猿问

一段时间后停止 gouroutine

像这样在一段时间后停止 gouroutine 是否并发安全?


代码:(注意:由于另一个 goroutine 的变化导致数据竞争):ok

package main


import (

    "fmt"

    "time"

)


func main() {

    var ok byte

    time.AfterFunc(1000*time.Millisecond, func() {

        ok = 1

    })


    var i uint64

    for ok == 0 {

        i++ // CPU intensive task

    }

    fmt.Println(i) // 2_776_813_033

}

终端:


go run -race .


==================

WARNING: DATA RACE

Write at 0x00c000132010 by goroutine 8:

  main.main.func1()

      ./main.go:11 +0x46


Previous read at 0x00c000132010 by main goroutine:

  main.main()

      ./main.go:15 +0xf4


Goroutine 8 (running) created at:

  time.goFunc()

      go/src/time/sleep.go:180 +0x51

==================

80849692

Found 1 data race(s)

代码(无数据竞争):

package main


import (

    "fmt"

    "sync/atomic"

    "time"

)


func main() {

    var ok int32

    time.AfterFunc(1000*time.Millisecond, func() {

        atomic.StoreInt32(&ok, 1)

    })


    var i uint64

    for atomic.LoadInt32(&ok) == 0 {

        i++ // CPU intensive task

    }

    fmt.Println(i) // 2_835_935_488

}


终端:


go run -race .


31934042


有只小跳蛙
浏览 159回答 2
2回答

墨色风雨

即使被另一个 goroutineok设置为,也不能保证忙等待循环将终止。false在设置和读取 的过程中没有显式同步ok,因此不能保证主 goroutine 看到对其所做的更改。换句话说,没有办法在两个 goroutine 之间建立发生前的关系。https://golang.org/ref/mem代码的第二个版本是安全的,即使在 Go 内存模型中ok没有关于 . 原子读/写具有发生之前关系所必需的内存屏障。您应该使用同步原语之一(互斥体、通道)来保证这一点。

月关宝盒

Go 内存模型:建议修改多个 goroutine 同时访问的数据的程序必须序列化这种访问。要序列化访问,请使用通道操作或其他同步原语(例如同步和同步/原子包中的同步原语)保护数据。对于第一个代码,您应该使用适当的同步,例如:“上下文”、“同步/原子” sync.Mutex、或通道。去1.14Goroutines 现在是异步可抢占的。因此,没有函数调用的循环不再可能使调度程序死锁或显着延迟垃圾收集。这在除 windows/arm、darwin/arm、js/wasm 和 plan9/* 之外的所有平台上都受支持。一段时间后停止 gouroutineBenchmarkAfterFunc-8&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1000000000 0.4468 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/opBenchmarkDoneChannel-8&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 121966824&nbsp; &nbsp;9.855 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/opBenchmarkTimeSince-8&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 89790115&nbsp; &nbsp; 12.95 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/opBenchmarkContextErr-8&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;58508900&nbsp; &nbsp; 19.78 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/opBenchmarkAfterFuncMutex-8&nbsp; &nbsp; &nbsp; &nbsp;58323207&nbsp; &nbsp; 20.00 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/opBenchmarkContext-8&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 48947625&nbsp; &nbsp; 27.43 ns/op&nbsp; 0 B/op&nbsp; 0 allocs/op测试:package mainimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "sync/atomic"&nbsp; &nbsp; "testing"&nbsp; &nbsp; "time")const d = 200 * time.Millisecond //&nbsp; To stop a task after a period of timefunc BenchmarkTimeSince(b *testing.B) {&nbsp; &nbsp; t0 := time.Now()&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; if time.Since(t0) < d {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}func BenchmarkContext(b *testing.B) {&nbsp; &nbsp; var ctx, cancel = context.WithTimeout(context.Background(), d)&nbsp; &nbsp; defer cancel()&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // break&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}func BenchmarkContextErr(b *testing.B) {&nbsp; &nbsp; var ctx, cancel = context.WithTimeout(context.Background(), d)&nbsp; &nbsp; defer cancel()&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; if ctx.Err() == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}func BenchmarkAfterFunc(b *testing.B) {&nbsp; &nbsp; var done uint32&nbsp; &nbsp; time.AfterFunc(d, func() { atomic.StoreUint32(&done, 1) })&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; if atomic.LoadUint32(&done) == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}func BenchmarkDoneChannel(b *testing.B) {&nbsp; &nbsp; var done = make(chan struct{})&nbsp; &nbsp; time.AfterFunc(d, func() { close(done) })&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-done:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // break&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}type foo struct {&nbsp; &nbsp; sync.Mutex&nbsp; &nbsp; state bool}func (p *foo) end() {&nbsp; &nbsp; p.Lock()&nbsp; &nbsp; p.state = true&nbsp; &nbsp; p.Unlock()}func (p *foo) isDone() bool {&nbsp; &nbsp; var b bool&nbsp; &nbsp; p.Lock()&nbsp; &nbsp; b = p.state&nbsp; &nbsp; p.Unlock()&nbsp; &nbsp; return b}func BenchmarkAfterFuncMutex(b *testing.B) {&nbsp; &nbsp; var it = foo{}&nbsp; &nbsp; time.AfterFunc(d, func() { it.end() })&nbsp; &nbsp; var count = 0&nbsp; &nbsp; for i := 0; i < b.N; i++ {&nbsp; &nbsp; &nbsp; &nbsp; if it.isDone() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; _ = count}https://medium.com/a-journey-with-go/go-asynchronous-preemption-b5194227371c抢占是调度器的重要组成部分,它可以在 goroutine 之间分配运行时间。事实上,如果没有抢占,一个长时间运行的占用 CPU 的 goroutine 会阻止其他 goroutine 被调度。1.14 版本引入了一种异步抢占的新技术,为调度程序提供了更多的权力和控制权。
随时随地看视频慕课网APP

相关分类

Go
我要回答