Goroutine-停止运行进程

我使用以下代码在大多数情况下都可以正常工作,以防我们使用一些长时间运行的进程,它不会在程序内部停止并不会结束(这里我限制为 60 秒的示例)


我希望每项工作在5 秒后终止(即使没有完成工作也终止进程),我怎样才能在 不 改变功能的情况下做到这一点myLongRunningFunc。


我知道这不是直接解决它,我可以使用任何技巧吗?


这是一些最小的可重现示例


https://play.golang.org/p/a0RWY4bYWMt


package main


import (

    "context"

    "errors"

    "fmt"

    "time"


    "github.com/gammazero/workerpool"

)


func main() {

    // here define a timeout for 5 sec,

    // the task should be terminate after 5 sec

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)

    defer cancel()


    runner := newRunner(ctx, 10)


    runner.do(job{

        Name: "a",

        Task: func() jobResult {

            select {

            case <-ctx.Done():

                return jobResult{Error: errors.New("Timedout, exiting")}

            default:

                myLongRunningFunc("A job")

            }

            return jobResult{Data: "from a"}

        },

    })


    runner.do(job{

        Name: "b",

        Task: func() jobResult {

            select {

            case <-ctx.Done():

                return jobResult{Error: errors.New("Timeouts, exiting")}

            default:

                myLongRunningFunc("B job")

            }


            return jobResult{Data: "from b"}

        },

    })


    results := runner.getjobResults()

    fmt.Println(results)

    time.Sleep(time.Second * 60)

}


func myLongRunningFunc(name string) {

    for i := 0; i < 100000; i++ {

        time.Sleep(time.Second * 1)

        msg := "job" + name + " running..\n"

        fmt.Println(msg)

    }

}


type runner struct {

    *workerpool.WorkerPool

    ctx     context.Context

    kill    chan struct{}

    result  chan jobResult

    results []jobResult

}


func (r *runner) processResults() {

    for {

        select {

        case res, ok := <-r.result:

            if !ok {

                goto Done

            }

            r.results = append(r.results, res)

        }

    }

Done:

    <-r.kill

}


当我使用圣坛频道时,编辑不相关


慕码人8056858
浏览 148回答 2
2回答

慕少森

我希望每个作业在 5 秒后终止(即使它没有完成工作也终止进程),我怎样才能在不更改函数 myLongRunningFunc 的情况下做到这一点。然后你只需添加一个 5 秒的服务员然后退出。package mainimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "errors"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/gammazero/workerpool")func main() {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; // here define a timeout for 5 sec,&nbsp; &nbsp; &nbsp; &nbsp; // the task should be terminate after 5 sec&nbsp; &nbsp; &nbsp; &nbsp; ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)&nbsp; &nbsp; &nbsp; &nbsp; defer cancel()&nbsp; &nbsp; &nbsp; &nbsp; runner := newRunner(ctx, 10)&nbsp; &nbsp; &nbsp; &nbsp; runner.do(job{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Name: "a",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Task: func() jobResult {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return jobResult{Error: errors.New("Timedout, exiting")}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; myLongRunningFunc("A job")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return jobResult{Data: "from a"}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; runner.do(job{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Name: "b",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Task: func() jobResult {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return jobResult{Error: errors.New("Timeouts, exiting")}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; myLongRunningFunc("B job")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return jobResult{Data: "from b"}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; results := runner.getjobResults()&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(results)&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Second * 60)&nbsp; &nbsp; }()&nbsp; &nbsp; <-time.After(time.Second * 5)}func myLongRunningFunc(name string) {&nbsp; &nbsp; for i := 0; i < 100000; i++ {&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Second * 1)&nbsp; &nbsp; &nbsp; &nbsp; msg := "job" + name + " running..\n"&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(msg)&nbsp; &nbsp; }}type runner struct {&nbsp; &nbsp; *workerpool.WorkerPool&nbsp; &nbsp; ctx&nbsp; &nbsp; &nbsp;context.Context&nbsp; &nbsp; kill&nbsp; &nbsp; chan struct{}&nbsp; &nbsp; result&nbsp; chan jobResult&nbsp; &nbsp; results []jobResult}func (r *runner) processResults() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case res, ok := <-r.result:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; goto Done&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r.results = append(r.results, res)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }Done:&nbsp; &nbsp; <-r.kill}func newRunner(ctx context.Context, numRunners int) *runner {&nbsp; &nbsp; r := &runner{&nbsp; &nbsp; &nbsp; &nbsp; WorkerPool: workerpool.New(numRunners),&nbsp; &nbsp; &nbsp; &nbsp; ctx:&nbsp; &nbsp; &nbsp; &nbsp; ctx,&nbsp; &nbsp; &nbsp; &nbsp; kill:&nbsp; &nbsp; &nbsp; &nbsp;make(chan struct{}),&nbsp; &nbsp; &nbsp; &nbsp; result:&nbsp; &nbsp; &nbsp;make(chan jobResult),&nbsp; &nbsp; }&nbsp; &nbsp; go r.processResults()&nbsp; &nbsp; return r}func (r *runner) do(j job) {&nbsp; &nbsp; r.Submit(r.wrap(&j))}func (r *runner) getjobResults() []jobResult {&nbsp; &nbsp; r.StopWait()&nbsp; &nbsp; close(r.result)&nbsp; &nbsp; r.kill <- struct{}{}&nbsp; &nbsp; return r.results}func (r *runner) wrap(job *job) func() {&nbsp; &nbsp; return func() {&nbsp; &nbsp; &nbsp; &nbsp; job.result = make(chan jobResult)&nbsp; &nbsp; &nbsp; &nbsp; go job.Run()&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case res := <-job.result:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r.result <- res&nbsp; &nbsp; &nbsp; &nbsp; case <-r.ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Job '%s' should stop here\n", job.Name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r.result <- jobResult{name: job.Name, Error: r.ctx.Err()}&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}type job struct {&nbsp; &nbsp; Name&nbsp; &nbsp; string&nbsp; &nbsp; Task&nbsp; &nbsp; func() jobResult&nbsp; &nbsp; Context context.Context&nbsp; &nbsp; result&nbsp; chan jobResult&nbsp; &nbsp; stopped chan struct{}&nbsp; &nbsp; done&nbsp; &nbsp; context.CancelFunc}func (j *job) Run() {&nbsp; &nbsp; result := j.Task()&nbsp; &nbsp; result.name = j.Name&nbsp; &nbsp; j.result <- result}type jobResult struct {&nbsp; &nbsp; name&nbsp; string&nbsp; &nbsp; Error error&nbsp; &nbsp; Data&nbsp; interface{}}

海绵宝宝撒

我认为不可能从外部 goroutine 中停止 goroutine。您可以检查它是否超时,但是,您不能停止它。您可以做的是通过通道向 goroutine 发送消息,在这种情况下可以对其进行监视和停止。你可以在这里找到例子
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go