我有一个脚本,它从数据库中选择一些数据并将其发送到一个通道以供多个 goroutine 处理,然后将结果发送回主线程以在数据库上更新。
但是,它在将数据发送到第一个通道时挂起(可能阻塞)。
频道是在全球范围内创建的:
var chin = make(chan in)
var chout = make(chan out)
in并且out都是structs
首先启动goroutines:
for i:=0; i<5; i++ {
go worker()
}
加载通道的代码是:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
然后紧接着:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
并且在后台worker()运行多个goroutine:
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
此代码在打印后挂起Getting nextbatch2 and sending to workers。它永远不会到达Processing out channel from workers。所以它挂在rows.Next()循环内的某个地方,我无法弄清楚原因,因为chin通道应该是非阻塞的——即使worker()goroutine 没有处理它,它仍然至少应该完成该循环。
有任何想法吗?
编辑:
通过fmt.Println(" on", numtodo)在rows.Next()循环末尾添加,我可以看到它在 5 之后阻塞,我不明白它应该是非阻塞的,对吧?
编辑2:
通过将频道更改为make(chan in/out, 100)现在将在 105 之后阻塞。
相关分类