猿问

使用通道作为队列的死锁

我正在学习 Go,我正在尝试实现作业队列。


我想做的是:


让主 goroutine 馈送行通过一个通道供多个解析器工作线程(将一条线解析为 s 结构),并让每个分析器将结构发送到其他工作线程(goroutines)将处理的结构通道(发送到数据库等)。


代码如下所示:


lineParseQ := make(chan string, 5)

jobProcessQ := make(chan myStruct, 5)

doneQ := make(chan myStruct, 5)


fileName := "myfile.csv"


file, err := os.Open(fileName)

if err != nil {

    log.Fatal(err)

}


defer file.Close()


reader := bufio.NewReader(file)


// Start line parsing workers and send to jobProcessQ

for i := 1; i <= 2; i++ {

    go lineToStructWorker(i, lineParseQ, jobProcessQ)

}


// Process myStruct from jobProcessQ

for i := 1; i <= 5; i++ {

    go WorkerProcessStruct(i, jobProcessQ, doneQ)

}


lineCount := 0 

countSend := 0


for {

    line, err := reader.ReadString('\n')

    

    if err != nil && err != io.EOF {

        log.Fatal(err)

    }

    

    if err == io.EOF {

        break

    }

    

    lineCount++

    

    if lineCount > 1 {

        countSend++

        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'

    }


}


for i := 0; i < countSend; i++ {

    fmt.Printf("Received %+v.\n", <-doneQ)

}


close(doneQ)

close(jobProcessQ)

close(lineParseQ)

这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa


工人看起来像这样:


func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {


    for j := range lineQ {

        strQ <- lineToStruct(j) // just parses the csv to a struct...

    }


}


func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {


    for a := range strQ {

        time.Sleep(time.Millisecond * 500) // fake long operation...

        done <- a

    }

}

我知道问题与“完成”通道有关,因为如果我不使用它,就不会有错误,但我不知道如何解决它。


哆啦的时光机
浏览 134回答 2
2回答

MYYA

在完成将所有行发送到 之前,您不会开始读取,这比缓冲区空间的行数多。因此,一旦缓冲区已满,发送块(开始填充缓冲区),一旦缓冲区已满,它就会死锁。将发送到 的循环、从 读取的循环或两者一起移动,以分隔 goroutine,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQgo func() {&nbsp; &nbsp; for _, line := range lines {&nbsp; &nbsp; &nbsp; &nbsp; countSend++&nbsp; &nbsp; &nbsp; &nbsp; lineParseQ <- line&nbsp; &nbsp; }&nbsp; &nbsp; close(lineParseQ)}()这仍然会在结束时陷入僵局,因为你已经在同一个goroutine中有一个过端通道和之后的通道;由于继续直到通道关闭,并且关闭在完成后,您仍然有一个死锁。您需要将关闭放在适当的位置;也就是说,无论是在发送例程中,还是在监视发送例程时,如果给定通道有多个发送方,则阻止发送例程。rangecloserangerangeWaitGroup// Start line parsing workers and send to jobProcessQwg := new(sync.WaitGroup)for i := 1; i <= 2; i++ {&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)}// Process myStruct from jobProcessQfor i := 1; i <= 5; i++ {&nbsp; &nbsp; go WorkerProcessStruct(i, jobProcessQ, doneQ)}countSend := 0go func() {&nbsp; &nbsp; for _, line := range lines {&nbsp; &nbsp; &nbsp; &nbsp; countSend++&nbsp; &nbsp; &nbsp; &nbsp; lineParseQ <- line&nbsp; &nbsp; }&nbsp; &nbsp; close(lineParseQ)}()go func() {&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(jobProcessQ)}()for a := range doneQ {&nbsp; &nbsp; fmt.Printf("Received %v.\n", a)}// ...func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {&nbsp; &nbsp; for j := range lineQ {&nbsp; &nbsp; &nbsp; &nbsp; strQ <- lineToStruct(j) // just parses the csv to a struct...&nbsp; &nbsp; }&nbsp; &nbsp; wg.Done()}func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {&nbsp; &nbsp; for a := range strQ {&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Millisecond * 500) // fake long operation...&nbsp; &nbsp; &nbsp; &nbsp; done <- a&nbsp; &nbsp; }&nbsp; &nbsp; close(done)}完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X

小唯快跑啊

协调管道,将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人写入特定通道)时,请关闭该通道以指示所有“工作人员”退出,例如sync.WaitGroupvar wg sync.WaitGroupfor i := 1; i <= 5; i++ {&nbsp; &nbsp; i := i&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; Worker(i)&nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; }()}// wg.Wait() signals the above have completed缓冲通道对于处理突发工作负载非常方便,但有时它们用于避免不良设计中的死锁。如果要避免在 goroutine 中运行管道的某些部分,可以缓冲一些通道(通常与 worker 的数量匹配),以避免主 goroutine 堵塞。如果您有读取和写入的依赖部分,并希望避免死锁 - 请确保它们位于单独的goroutine中。将管道的所有部分都拥有自己的goroutine,甚至可以消除对缓冲通道的需求:// putting all channel work into separate goroutines// removes the need for buffered channelslineParseQ := make(chan string, 0)jobProcessQ := make(chan myStruct, 0)doneQ := make(chan myStruct, 0)当然,这是一个权衡 - 一个goroutine的资源成本约为2K - 而缓冲通道要少得多。与大多数设计一样,这取决于如何使用它。也不要被臭名昭著的Go for-loop gotcha抓住,所以使用闭包赋值来避免这种情况:for i := 1; i <= 5; i++ {&nbsp; &nbsp; i := i&nbsp; &nbsp; &nbsp; &nbsp;// new i (not the i above)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; myfunc(i) // otherwise all goroutines will most likely get '5'&nbsp; &nbsp; }()}最后,请确保在退出之前等待所有结果得到处理。从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是正确的。但在独立的可执行文件中,处理循环可能仍在处理结果。go func() {&nbsp; &nbsp; wgW.Wait()&nbsp; &nbsp;// waiting on worker goroutines to finish&nbsp; &nbsp; close(doneQ) // safe to close results channel now}()// ensure we don't return until all results have been processedfor a := range doneQ {&nbsp; &nbsp; fmt.Printf("Received %v.\n", a)}通过在主goroutine中处理结果,我们确保在未处理所有内容的情况下不会过早返回。将它们全部放在一起:https://play.golang.org/p/MjLpQ5xglP3
随时随地看视频慕课网APP

相关分类

Go
我要回答