使用 goroutines 和 context 创建可取消的 worker

我试图了解如何正确使用 goroutines 以及通道和上下文,以创建可取消的后台工作者。


我熟悉使用在显式调用时可以取消的上下文,将它附加到 worker goroutine 应该可以让我停止 worker。


但我无法弄清楚如何使用它来实现这一目标。


下面的示例说明了一个从通道“urls”获取数据的 worker goroutine,它还带有一个可取消的上下文。


//worker.go

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {

    fmt.Printf("Worker %d is starting\n", id)

    select {

    // placeholder for a channel writing the data from the URL

    case url := <-urls:

        fmt.Printf("Worker :%d received url :%s\n", id, url)

    // checking if the process is cancelled

    case <-ctx.Done():

        fmt.Printf("Worker :%d exitting..\n", id)

    }

    fmt.Printf("Worker :%d done..\n", id)

    wg.Done()

}

这对我不起作用有两个原因,

  1. 对于无缓冲的通道,在没有 goroutines 读取的情况下写入它会阻塞它,所以一旦有更多数据添加到 urls 通道,发送方就会阻塞。

  2. 一旦两个通道中的任何一个返回,它就会立即返回。

我还尝试将选择包装在一个无限循环中,但在上下文引发错误后添加一个中断。

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {

    fmt.Printf("Worker %d is starting\n", id)

    for {

        select {

        // placeholder for a channel writing the data from the URL

        case url := <-urls:

            fmt.Printf("Worker :%d received url :%s\n", id, url)

        // checking if the process is cancelled

        case <-ctx.Done():

            fmt.Printf("Worker :%d exitting..\n", id)

            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck

        }

    }

    fmt.Printf("Worker :%d done..\n", id) // code is unreachable

    wg.Done()

}

实施这样的事情的正确方法是什么?


PS:有关设计此类工作进程的任何资源/参考资料也将有所帮助。


海绵宝宝撒
浏览 194回答 2
2回答

人到中年有点甜

您可以用 return 代替 break,代码将起作用。但是,更好的方法可能是:Worker 在 for / range 循环中消费通道生产者应负责检测取消并关闭通道。for 循环将停止级联

不负相思意

我专门为此做了一个 Go 包。你可以在这里找到它:https ://github.com/MicahParks/ctxerrpool这是项目的示例README.md:package mainimport (&nbsp; &nbsp; "bytes"&nbsp; &nbsp; "context"&nbsp; &nbsp; "log"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "os"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/MicahParks/ctxerrpool")func main() {&nbsp; &nbsp; // Create an error handler that logs all errors.&nbsp; &nbsp; var errorHandler ctxerrpool.ErrorHandler&nbsp; &nbsp; errorHandler = func(pool ctxerrpool.Pool, err error) {&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("An error occurred. Error: \"%s\".\n", err.Error())&nbsp; &nbsp; }&nbsp; &nbsp; // Create a worker pool with 4 workers.&nbsp; &nbsp; pool := ctxerrpool.New(4, errorHandler)&nbsp; &nbsp; // Create some variables to inherit through a closure.&nbsp; &nbsp; httpClient := &http.Client{}&nbsp; &nbsp; u := "https://golang.org"&nbsp; &nbsp; logger := log.New(os.Stdout, "status codes: ", 0)&nbsp; &nbsp; // Create the worker function.&nbsp; &nbsp; var work ctxerrpool.Work&nbsp; &nbsp; work = func(ctx context.Context) (err error) {&nbsp; &nbsp; &nbsp; &nbsp; // Create the HTTP request.&nbsp; &nbsp; &nbsp; &nbsp; var req *http.Request&nbsp; &nbsp; &nbsp; &nbsp; if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // Do the HTTP request.&nbsp; &nbsp; &nbsp; &nbsp; var resp *http.Response&nbsp; &nbsp; &nbsp; &nbsp; if resp, err = httpClient.Do(req); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // Log the status code.&nbsp; &nbsp; &nbsp; &nbsp; logger.Println(resp.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; }&nbsp; &nbsp; // Do the work 16 times.&nbsp; &nbsp; for i := 0; i < 16; i++ {&nbsp; &nbsp; &nbsp; &nbsp; // Create a context for the work.&nbsp; &nbsp; &nbsp; &nbsp; ctx, cancel := context.WithTimeout(context.Background(), time.Second)&nbsp; &nbsp; &nbsp; &nbsp; defer cancel()&nbsp; &nbsp; &nbsp; &nbsp; // Send the work to the pool.&nbsp; &nbsp; &nbsp; &nbsp; pool.AddWorkItem(ctx, work)&nbsp; &nbsp; }&nbsp; &nbsp; // Wait for the pool to finish.&nbsp; &nbsp; pool.Wait()}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go