Go:学习通道和排队,致命错误

我正在尝试学习如何使用通道在 Go 中为我的其他项目之一创建队列。我的另一个项目基本上是对数据库行进行排队,然后使用行中的详细信息对数据库进行数字运算。


我不希望工作人员同时处理同一行,因此它需要检查工作人员当前是否正在处理该特定行 ID,如果是,则等待它完成。如果不是同一个row ID,可以异步运行,但是我也想限制可以同时运行的异步worker的数量。在我下面的代码中,我目前试图将其限制为三名工人。


这是我所拥有的:


package main


import (

    "log"

    "strconv"

    "time"

)


// RowInfo holds the job info

type RowInfo struct {

    id int

}


// WorkerCount holds how many workers are currently running

var WorkerCount int


// WorkerLocked specifies whether a row ID is currently processing by a worker

var WorkerLocked map[string]bool


// Process the RowInfo

func worker(row RowInfo) {

    rowID := strconv.Itoa(row.id)


    WorkerCount++

    WorkerLocked[rowID] = true


    time.Sleep(1 * time.Second)

    log.Printf("ID rcvd: %d", row.id)


    WorkerLocked[rowID] = false

    WorkerCount--

}


// waiter will check if the row is already processing in a worker

// Block until it finishes completion, then dispatch

func waiter(row RowInfo) {

    rowID := strconv.Itoa(row.id)

    for WorkerLocked[rowID] == true {

        time.Sleep(1 * time.Second)

    }


    go worker(row)

}


func main() {

    jobsQueue := make(chan RowInfo, 10)

    WorkerLocked = make(map[string]bool)


    // Dispatcher waits for jobs on the channel and dispatches to waiter

    go func() {

        // Wait for a job

        for {

            // Only have a max of 3 workers running asynch at a time

            for WorkerCount > 3 {

                time.Sleep(1 * time.Second)

            }


            job := <-jobsQueue

            go waiter(job)

        }

    }()


    // Test the queue, send some data

    for i := 0; i < 12; i++ {

        r := RowInfo{

            id: i,

        }

        jobsQueue <- r

    }


    // Prevent exit!

    for {

        time.Sleep(1 * time.Second)

    }

}


aluckdog
浏览 149回答 2
2回答

ABOUTYOU

如果您要使用WorkerLocked地图,则需要使用sync包保护对其的访问。您还需要以WorkerCount相同的方式(或使用原子操作)进行保护。这样做也会使睡眠变得不必要(使用条件变量)。更好的是,让 3 个(或多个)工作人员等待行使用通道处理。然后,您会将行分配给各个工作人员,以便特定的工作人员始终处理特定的行(例如,使用 row.id % 3 来确定将行发送到哪个工作人员/通道)。

幕布斯6054654

我强烈建议不要在这种情况下使用任何锁定,其中您有处理从数据库读取的工作人员。锁和信号量一般会导致很多问题,最终会给你留下一堆损坏的数据。相信我。去过也做过。在这种情况下,您需要小心并避免使用它们。例如,如果您希望保留数据和维护地图但不用于实际处理,则锁定是很好的。通过锁定 go 例程,你会不必要地减慢你的 go 程序。Go 旨在尽可能快地处理事情。不要压着他。这是我自己的一些理论,可以帮助您更好地理解我想说的内容:为了处理工人限制为 3。只需生成 3 个不同的从队列中选择的 goroutine。Worker 永远不会从频道接受相同的工作,所以你在这里很安全。make() 已经完成了内部通道限制,可以很好地在这种情况下使用。该通道限制是实际的第二个参数。所以如果你写队列&nbsp;:=&nbsp;make(chan&nbsp;RowInfo,&nbsp;10)这意味着这个队列最多可以占用10个 RowInfo。如果聚合到此队列中的循环达到 10 个,它将锁定并等待工作人员从通道中释放一项。因此,一旦队列达到 9,数据库聚合器将写入第 10 个,而 worker 将取出第 10 个。通过这种方式,您可以拥有 golang 的自然工作流程:) 这也称为生成pre-workerspackage mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "os"&nbsp; &nbsp; "os/signal"&nbsp; &nbsp; "syscall"&nbsp; &nbsp; "time")// RowInfo holds the job infotype RowInfo struct {&nbsp; &nbsp; ID int}func worker(queue chan RowInfo, done chan bool) {&nbsp; &nbsp; fmt.Println("Starting worker...")&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case row := <-queue:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Got row info: %v \n", row)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Keep it for second so we can see actual queue lock working&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(10 * time.Second):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")&nbsp; &nbsp; &nbsp; &nbsp; case <-done:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Got quit signal... Killing'em all")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func handleSigterm(kill chan os.Signal, done chan bool) {&nbsp; &nbsp; select {&nbsp; &nbsp; case _ = <-kill:&nbsp; &nbsp; &nbsp; &nbsp; close(done)&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; // Do not allow more than 10 records to be in the channel.&nbsp; &nbsp; queue := make(chan RowInfo, 10)&nbsp; &nbsp; done := make(chan bool)&nbsp; &nbsp; kill := make(chan os.Signal, 1)&nbsp; &nbsp; signal.Notify(kill, os.Interrupt)&nbsp; &nbsp; signal.Notify(kill, syscall.SIGTERM)&nbsp; &nbsp; go handleSigterm(kill, done)&nbsp; &nbsp; for i := 0; i < 3; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go worker(queue, done)&nbsp; &nbsp; }&nbsp; &nbsp; // Should be infinite loop in the end...&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Queueing: %v \n", i)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue <- RowInfo{ID: i}&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; <-done&nbsp; &nbsp; // Give it some time to process things before shutting down. This is bad way of doing things&nbsp; &nbsp; // but is efficient for this example&nbsp; &nbsp; time.Sleep(5 * time.Second)}至于管理数据库状态,您可以在数据库中显示“进行中”的状态,因此每次选择您时,也要对该行进行更新,以表明正在进行中。这当然是一种方法。通过在 golang 中保留某种映射,我会说你会比需要的更多地折磨你的服务。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go