千万里不及你
Go 中一种流行的并发模式是使用工作池。一个基本的工作池使用两个通道;一个用于放置工作,另一个用于读取结果。在这种情况下,我们的工作频道将是 type Tap,我们的结果频道将是 type testerResponse。工作人员从作业通道中获取工作并将结果放在结果通道上。// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up the "next" jobfunc worker(jobs <-chan Tap, results chan<- testerResponse) { for n := range jobs { results <- n.Check() }}工作要添加工作,我们需要迭代我们的testers并将它们放在我们的工作频道上。// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) { for _, t := range taps { jobs <- t }}结果为了读取结果,我们需要遍历它们。// getResults takes a job from our worker pool and gets the resultfunc getResults(tr <-chan testerResponse, taps []Tap) { for range taps { r := <- tr status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode) if r.err != nil { status = fmt.Sprintf(r.err.Error()) } fmt.Println(status) }}最后是我们的主要功能。func main() { // Make buffered channels buffer := len(testers) jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap` resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse` // Create worker pool // Max workers default is 5 // maxWorkers := 5 // for i := 0; i < maxWorkers; i++ { // go worker(jobsPipe, resultsPipe) // } // the loop above is the same as doing: go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) // ^^ this creates 5 workers.. makeJobs(jobsPipe, testers) getResults(resultsPipe, testers)}把它们放在一起我将“第二次调用”的超时更改为一毫秒,以显示超时的工作原理。package mainimport ( "fmt" "net/http" "time")type HT interface { Name() string Check() (*testerResponse, error)}type testerResponse struct { err error name string res http.Response url string}type Tap struct { url string name string timeout time.Duration client *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap { return &Tap{ url: url, name: name, client: &http.Client{Timeout: timeout}, }}func (p *Tap) Check() testerResponse { fmt.Printf("Fetching %s %s \n", p.name, p.url) // theres really no need for NewTap nt := NewTap(p.name, p.url, p.timeout) res, err := nt.client.Get(p.url) if err != nil { return testerResponse{err: err} } // need to close body res.Body.Close() return testerResponse{name: p.name, res: *res, url: p.url}}func (p *Tap) Name() string { return p.name}// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- Tap, taps []Tap) { for _, t := range taps { jobs <- t }}// getResults takes a job from our jobs channel, gets the result, and// places it on the results channelfunc getResults(tr <-chan testerResponse, taps []Tap) { for range taps { r := <-tr status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode) if r.err != nil { status = fmt.Sprintf(r.err.Error()) } fmt.Printf(status) }}// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up the "next" jobfunc worker(jobs <-chan Tap, results chan<- testerResponse) { for n := range jobs { results <- n.Check() }}var ( testers = []Tap{ { name: "1", url: "http://google.com", timeout: time.Second * 20, }, { name: "2", url: "http://www.yahoo.com", timeout: time.Second * 10, }, { name: "3", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "4", url: "http://www.example.com", timeout: time.Second * 10, }, { name: "5", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "6", url: "http://www.example.com", timeout: time.Second * 10, }, { name: "7", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "8", url: "http://www.example.com", timeout: time.Second * 10, }, { name: "9", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "10", url: "http://www.example.com", timeout: time.Second * 10, }, { name: "11", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "12", url: "http://www.example.com", timeout: time.Second * 10, }, { name: "13", url: "http://stackoverflow.com", timeout: time.Second * 20, }, { name: "14", url: "http://www.example.com", timeout: time.Second * 10, }, })func main() { // Make buffered channels buffer := len(testers) jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap` resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse` // Create worker pool // Max workers default is 5 // maxWorkers := 5 // for i := 0; i < maxWorkers; i++ { // go worker(jobsPipe, resultsPipe) // } // the loop above is the same as doing: go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) go worker(jobsPipe, resultsPipe) // ^^ this creates 5 workers.. makeJobs(jobsPipe, testers) getResults(resultsPipe, testers)}哪个输出:// Fetching http://stackoverflow.com // Fetching http://www.example.com // Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)// 'first call' to 'http://stackoverflow.com' was fetched with status '200'
qq_笑_17
在 Golang 中可以通过不同的方式实现并行。这是一种带有等待组、互斥锁和无限 goroutine 的幼稚方法,不推荐使用。我认为使用通道是进行并行的首选方式。package mainimport ( "fmt" "net/http" "sync" "time")type HT interface { Name() string Check() (*testerResponse, error)}type testerResponse struct { name string res http.Response}type Tap struct { url string name string timeout time.Duration client *http.Client}func NewTap(name, url string, timeout time.Duration) *Tap { return &Tap{ url: url, name: name, client: &http.Client{ Timeout: timeout, }, }}func (p *Tap) Check() (*testerResponse, error) { response := &testerResponse{} req, err := http.NewRequest("GET", p.url, nil) if err != nil { return nil, err } res, e := p.client.Do(req) if e != nil { return response, e } response.name = p.name response.res = *res return response, e}func (p *Tap) Name() string { return p.name}func main() { var checkers []HT wg := sync.WaitGroup{} locker := sync.Mutex{} testers := []Tap{ { name: "first call", url: "http://google.com", timeout: time.Second * 20, }, { name: "second call", url: "http://www.example.com", timeout: time.Millisecond * 100, }, } for _, test := range testers { wg.Add(1) go func(tst Tap) { defer wg.Done() checker := NewTap(tst.name, tst.url, tst.timeout) res, err := checker.Check() if err != nil { fmt.Println(err) } fmt.Println(res.name) fmt.Println(res.res.StatusCode) locker.Lock() defer locker.Unlock() checkers = append(checkers, checker) }(test) } wg.Wait()}