我正在尝试学习如何使用通道在 Go 中为我的其他项目之一创建队列。我的另一个项目基本上是对数据库行进行排队,然后使用行中的详细信息对数据库进行数字运算。
我不希望工作人员同时处理同一行,因此它需要检查工作人员当前是否正在处理该特定行 ID,如果是,则等待它完成。如果不是同一个row ID,可以异步运行,但是我也想限制可以同时运行的异步worker的数量。在我下面的代码中,我目前试图将其限制为三名工人。
这是我所拥有的:
package main
import (
"log"
"strconv"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
id int
}
// WorkerCount holds how many workers are currently running
var WorkerCount int
// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool
// Process the RowInfo
func worker(row RowInfo) {
rowID := strconv.Itoa(row.id)
WorkerCount++
WorkerLocked[rowID] = true
time.Sleep(1 * time.Second)
log.Printf("ID rcvd: %d", row.id)
WorkerLocked[rowID] = false
WorkerCount--
}
// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
rowID := strconv.Itoa(row.id)
for WorkerLocked[rowID] == true {
time.Sleep(1 * time.Second)
}
go worker(row)
}
func main() {
jobsQueue := make(chan RowInfo, 10)
WorkerLocked = make(map[string]bool)
// Dispatcher waits for jobs on the channel and dispatches to waiter
go func() {
// Wait for a job
for {
// Only have a max of 3 workers running asynch at a time
for WorkerCount > 3 {
time.Sleep(1 * time.Second)
}
job := <-jobsQueue
go waiter(job)
}
}()
// Test the queue, send some data
for i := 0; i < 12; i++ {
r := RowInfo{
id: i,
}
jobsQueue <- r
}
// Prevent exit!
for {
time.Sleep(1 * time.Second)
}
}
ABOUTYOU
幕布斯6054654
相关分类