为什么我的golang无锁队列总是停在那里?

这是我的代码:


package main


import (

    "sync/atomic"

    "unsafe"

    "sync"

    "fmt"

    "time"

)


const (

    MAX_DATA_SIZE = 100

)


// lock free queue

type Queue struct {

    head unsafe.Pointer

    tail unsafe.Pointer

}

// one node in queue

type Node struct {

    val interface{}

    next unsafe.Pointer

}

// queue functions

func (self *Queue) enQueue(val interface{}) {

    newValue := unsafe.Pointer(&Node{val: val, next: nil})

    var tail,next unsafe.Pointer

    for {

        tail = self.tail

        next = ((*Node)(tail)).next

        if next != nil {

            atomic.CompareAndSwapPointer(&(self.tail), tail, next)

        }else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){

            break

        }

    }

}


func (self *Queue) deQueue() (val interface{}, success bool){

    var head,tail,next unsafe.Pointer

    for {

        head = self.head

        tail = self.tail

        next = ((*Node)(head)).next

        if head == tail {

            if next == nil {

                return nil, false

            }else {

                atomic.CompareAndSwapPointer(&(self.tail), tail, next)

            }

        }else {

            val = ((*Node)(next)).val

            if atomic.CompareAndSwapPointer(&(self.head), head, next) {

                return val, true

            }

        }

    }

    return

}


func main() {

    var wg sync.WaitGroup

    wg.Add(20)

    queue := new(Queue)

    queue.head = unsafe.Pointer(new(Node))

    queue.tail = queue.head


    for i := 0; i < 10; i++ {

        go func() {

            defer wg.Done()

            for j := 0; j < MAX_DATA_SIZE; j++ {

                t := time.Now()

                queue.enQueue(t)

                fmt.Println("enq = ", t)

            }

        }()

    }



问题是,有时代码可以正常运行,但是有时它会失败并且只会卡住而没有任何响应。


我的代码有什么问题吗?


慕侠2389804
浏览 455回答 2
2回答

陪伴而非守候

这是上面改写的通道,建议使用@mkb(排除无限队列大小)。它不会锁定。我建议您使用渠道,除非您有充分的理由不这样做,因为Go团队已花费大量精力使其变得可靠,高性能且易于使用。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")const (&nbsp; &nbsp; MAX_DATA_SIZE = 100)func main() {&nbsp; &nbsp; runtime.GOMAXPROCS(4)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(20)&nbsp; &nbsp; queue := make(chan time.Time, 10)&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := 0; j < MAX_DATA_SIZE; j++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; t := time.Now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue <- t&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("enq = ", t)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := 0; j < MAX_DATA_SIZE; j++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val := <-queue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("deq = ", val)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()}

MM们

这段代码中有很多活跃的等待者,我强烈建议像Nick的漂亮代码一样,干净地使用频道。但是,这是我对确切的原始问题“为什么会卡住?”的回答。:没有保证每个goroutine何时屈服以让其他goroutine执行,并且很可能在无限循环内永远不会屈服。您可以通过在每个可能无限的for循环中使用runtime.Gosched()来解决此问题:Gosched产生处理器,从而允许其他goroutine运行。它不会挂起当前的goroutine,因此执行会自动恢复。此增强的代码几乎可以与原始代码一样快地运行,但是永远不会挂起:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "sync/atomic"&nbsp; &nbsp; "time"&nbsp; &nbsp; "unsafe")const (&nbsp; &nbsp; MAX_DATA_SIZE = 100)// lock free queuetype Queue struct {&nbsp; &nbsp; head unsafe.Pointer&nbsp; &nbsp; tail unsafe.Pointer}// one node in queuetype Node struct {&nbsp; &nbsp; val&nbsp; interface{}&nbsp; &nbsp; next unsafe.Pointer}// queue functionsfunc (self *Queue) enQueue(val interface{}) {&nbsp; &nbsp; newValue := unsafe.Pointer(&Node{val: val, next: nil})&nbsp; &nbsp; var tail, next unsafe.Pointer&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; tail = self.tail&nbsp; &nbsp; &nbsp; &nbsp; next = ((*Node)(tail)).next&nbsp; &nbsp; &nbsp; &nbsp; if next != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; atomic.CompareAndSwapPointer(&(self.tail), tail, next)&nbsp; &nbsp; &nbsp; &nbsp; } else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; }}func (self *Queue) deQueue() (val interface{}, success bool) {&nbsp; &nbsp; var head, tail, next unsafe.Pointer&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; head = self.head&nbsp; &nbsp; &nbsp; &nbsp; tail = self.tail&nbsp; &nbsp; &nbsp; &nbsp; next = ((*Node)(head)).next&nbsp; &nbsp; &nbsp; &nbsp; if head == tail {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if next == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return nil, false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; atomic.CompareAndSwapPointer(&(self.tail), tail, next)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val = ((*Node)(next)).val&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if atomic.CompareAndSwapPointer(&(self.head), head, next) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return val, true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; }&nbsp; &nbsp; return}func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(20)&nbsp; &nbsp; queue := new(Queue)&nbsp; &nbsp; queue.head = unsafe.Pointer(new(Node))&nbsp; &nbsp; queue.tail = queue.head&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := 0; j < MAX_DATA_SIZE; j++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; t := time.Now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue.enQueue(t)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("enq = ", t)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ok := false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var val interface{}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := 0; j < MAX_DATA_SIZE; j++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val, ok = queue.deQueue()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val, ok = queue.deQueue()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime.Gosched()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("deq = ", val)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go