猿问

与超时并行运行循环

我需要运行请求,parallel而不是一个接一个,而是超时。现在我可以去吗?


这是我需要并行运行的特定代码,这里的技巧也是使用超时,即根据超时等待所有请求并在所有完成后获得响应。


    for _, test := range testers {

        checker := NewTap(test.name, test.url, test.timeout)

        res, err := checker.Check()

        if err != nil {

            fmt.Println(err)

        }

        fmt.Println(res.name)

        fmt.Println(res.res.StatusCode)


    }

这是所有代码(工作代码) https://play.golang.org/p/cXnJJ6PW_CF


package main


import (

    `fmt`

    `net/http`

    `time`

)


type HT interface {

    Name() string

    Check() (*testerResponse, error)

}


type testerResponse struct {

    name string

    res  http.Response

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func NewTap(name, url string, timeout time.Duration) *Tap {

    return &Tap{

        url:    url,

        name:   name,

        client: &http.Client{Timeout: timeout},

    }

}


func (p *Tap) Check() (*testerResponse, error) {

    response := &testerResponse{}

    req, err := http.NewRequest("GET", p.url, nil)

    if err != nil {

        return nil, err

    }

    res, e := p.client.Do(req)

    response.name = p.name

    response.res = *res

    if err != nil {

        return response, e

    }

    return response, e

}


func (p *Tap) Name() string {

    return p.name

}


func main() {


    var checkers []HT


    testers := []Tap{

        {

            name:    "first call",

            url:     "http://stackoverflow.com",

            timeout: time.Second * 20,

        },

        {

            name:    "second call",

            url:     "http://www.example.com",

            timeout: time.Second * 10,

        },

    }


    for _, test := range testers {

        checker := NewTap(test.name, test.url, test.timeout)

        res, err := checker.Check()

        if err != nil {

            fmt.Println(err)

        }

        fmt.Println(res.name)

        fmt.Println(res.res.StatusCode)


        checkers = append(checkers, checker)


    }

}


翻过高山走不出你
浏览 119回答 2
2回答

千万里不及你

Go 中一种流行的并发模式是使用工作池。一个基本的工作池使用两个通道;一个用于放置工作,另一个用于读取结果。在这种情况下,我们的工作频道将是 type Tap,我们的结果频道将是 type testerResponse。工作人员从作业通道中获取工作并将结果放在结果通道上。// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up&nbsp; the "next" jobfunc worker(jobs <-chan Tap, results chan<- testerResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; results <- n.Check()&nbsp; &nbsp; }}工作要添加工作,我们需要迭代我们的testers并将它们放在我们的工作频道上。// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) {&nbsp; &nbsp; for _, t := range taps {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}结果为了读取结果,我们需要遍历它们。// getResults takes a job from our worker pool and gets the resultfunc getResults(tr <-chan testerResponse, taps []Tap) {&nbsp; &nbsp; for range taps {&nbsp; &nbsp; &nbsp; &nbsp; r := <- tr&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; if r.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(r.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(status)&nbsp; &nbsp; }}最后是我们的主要功能。func main() {&nbsp; &nbsp; // Make buffered channels&nbsp; &nbsp; buffer := len(testers)&nbsp; &nbsp; jobsPipe := make(chan Tap, buffer)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Jobs will be of type `Tap`&nbsp; &nbsp; resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`&nbsp; &nbsp; // Create worker pool&nbsp; &nbsp; // Max workers default is 5&nbsp; &nbsp; // maxWorkers := 5&nbsp; &nbsp; // for i := 0; i < maxWorkers; i++ {&nbsp; &nbsp; //&nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; // }&nbsp; &nbsp; // the loop above is the same as doing:&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; // ^^ this creates 5 workers..&nbsp; &nbsp; makeJobs(jobsPipe, testers)&nbsp; &nbsp; getResults(resultsPipe, testers)}把它们放在一起我将“第二次调用”的超时更改为一毫秒,以显示超时的工作原理。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "time")type HT interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Check() (*testerResponse, error)}type testerResponse struct {&nbsp; &nbsp; err&nbsp; error&nbsp; &nbsp; name string&nbsp; &nbsp; res&nbsp; http.Response&nbsp; &nbsp; url&nbsp; string}type Tap struct {&nbsp; &nbsp; url&nbsp; &nbsp; &nbsp;string&nbsp; &nbsp; name&nbsp; &nbsp; string&nbsp; &nbsp; timeout time.Duration&nbsp; &nbsp; client&nbsp; *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap {&nbsp; &nbsp; return &Tap{&nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; url,&nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp;name,&nbsp; &nbsp; &nbsp; &nbsp; client: &http.Client{Timeout: timeout},&nbsp; &nbsp; }}func (p *Tap) Check() testerResponse {&nbsp; &nbsp; fmt.Printf("Fetching %s %s \n", p.name, p.url)&nbsp; &nbsp; // theres really no need for NewTap&nbsp; &nbsp; nt := NewTap(p.name, p.url, p.timeout)&nbsp; &nbsp; res, err := nt.client.Get(p.url)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return testerResponse{err: err}&nbsp; &nbsp; }&nbsp; &nbsp; // need to close body&nbsp; &nbsp; res.Body.Close()&nbsp; &nbsp; return testerResponse{name: p.name, res: *res, url: p.url}}func (p *Tap) Name() string {&nbsp; &nbsp; return p.name}// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) {&nbsp; &nbsp; for _, t := range taps {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}// getResults takes a job from our jobs channel, gets the result, and// places it on the results channelfunc getResults(tr <-chan testerResponse, taps []Tap) {&nbsp; &nbsp; for range taps {&nbsp; &nbsp; &nbsp; &nbsp; r := <-tr&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; if r.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(r.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; }}// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up&nbsp; the "next" jobfunc worker(jobs <-chan Tap, results chan<- testerResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; results <- n.Check()&nbsp; &nbsp; }}var (&nbsp; &nbsp; testers = []Tap{&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "1",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://google.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "2",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.yahoo.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "3",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "4",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "5",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "6",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "7",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "8",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "9",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "10",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "11",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "12",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "13",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "14",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 10,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; })func main() {&nbsp; &nbsp; // Make buffered channels&nbsp; &nbsp; buffer := len(testers)&nbsp; &nbsp; jobsPipe := make(chan Tap, buffer)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Jobs will be of type `Tap`&nbsp; &nbsp; resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`&nbsp; &nbsp; // Create worker pool&nbsp; &nbsp; // Max workers default is 5&nbsp; &nbsp; // maxWorkers := 5&nbsp; &nbsp; // for i := 0; i < maxWorkers; i++ {&nbsp; &nbsp; //&nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; // }&nbsp; &nbsp; // the loop above is the same as doing:&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; // ^^ this creates 5 workers..&nbsp; &nbsp; makeJobs(jobsPipe, testers)&nbsp; &nbsp; getResults(resultsPipe, testers)}哪个输出:// Fetching http://stackoverflow.com&nbsp;// Fetching http://www.example.com&nbsp;// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)// 'first call' to 'http://stackoverflow.com' was fetched with status '200'

qq_笑_17

在 Golang 中可以通过不同的方式实现并行。这是一种带有等待组、互斥锁和无限 goroutine 的幼稚方法,不推荐使用。我认为使用通道是进行并行的首选方式。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type HT interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Check() (*testerResponse, error)}type testerResponse struct {&nbsp; &nbsp; name string&nbsp; &nbsp; res&nbsp; http.Response}type Tap struct {&nbsp; &nbsp; url&nbsp; &nbsp; &nbsp;string&nbsp; &nbsp; name&nbsp; &nbsp; string&nbsp; &nbsp; timeout time.Duration&nbsp; &nbsp; client&nbsp; *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap {&nbsp; &nbsp; return &Tap{&nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; url,&nbsp; &nbsp; &nbsp; &nbsp; name: name,&nbsp; &nbsp; &nbsp; &nbsp; client: &http.Client{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Timeout: timeout,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }}func (p *Tap) Check() (*testerResponse, error) {&nbsp; &nbsp; response := &testerResponse{}&nbsp; &nbsp; req, err := http.NewRequest("GET", p.url, nil)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return nil, err&nbsp; &nbsp; }&nbsp; &nbsp; res, e := p.client.Do(req)&nbsp; &nbsp; if e != nil {&nbsp; &nbsp; &nbsp; &nbsp; return response, e&nbsp; &nbsp; }&nbsp; &nbsp; response.name = p.name&nbsp; &nbsp; response.res = *res&nbsp; &nbsp; return response, e}func (p *Tap) Name() string {&nbsp; &nbsp; return p.name}func main() {&nbsp; &nbsp; var checkers []HT&nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; locker := sync.Mutex{}&nbsp; &nbsp; testers := []Tap{&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "first call",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://google.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "second call",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://www.example.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Millisecond * 100,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; for _, test := range testers {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(tst Tap) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checker := NewTap(tst.name, tst.url, tst.timeout)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res, err := checker.Check()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(res.name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(res.res.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; locker.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer locker.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checkers = append(checkers, checker)&nbsp; &nbsp; &nbsp; &nbsp; }(test)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()}
随时随地看视频慕课网APP

相关分类

Go
我要回答