我正在学习 Go,我正在尝试实现作业队列。
我想做的是:
让主 goroutine 馈送行通过一个通道供多个解析器工作线程(将一条线解析为 s 结构),并让每个分析器将结构发送到其他工作线程(goroutines)将处理的结构通道(发送到数据库等)。
代码如下所示:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa
工人看起来像这样:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}
我知道问题与“完成”通道有关,因为如果我不使用它,就不会有错误,但我不知道如何解决它。
MYYA
小唯快跑啊
相关分类