婷婷同学_
更新 5(接受的答案)*既然您对这个问题感兴趣,那么您可能也对这个问题感兴趣。有关如何使用自动取消超时运行每个作业的更多信息,请参见此处。*要回答您的问题,您将如何添加随机函数..我不知道你想要返回什么类型,但你可以做任何你想做的事情。有大约一百万种不同的方法可以做到这一点,这只是一个例子:package mainimport ( "encoding/json" "fmt" "github.com/gammazero/workerpool")var ( numWorkers = 10)type MyReturnType struct { Name string Data interface{}}func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() { return func() { rc <- f() }}func main() { // create results chan and worker pool // should prob make your results channel typed to what you need jobs := []func() MyReturnType { func() MyReturnType { // whatever you want to do here return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}} }, func() MyReturnType { // whatever you want to do here // do a curl or a kubectl or whatever you want resultFromCurl := "i am a result" return MyReturnType{Name: "job2", Data: resultFromCurl} }, } results := make(chan MyReturnType, len(jobs)) pool := workerpool.New(numWorkers) for _, job := range jobs { j := job pool.Submit(wrapJob(results, j)) } // Wait for all jobs to finish pool.StopWait() // Close results chan close(results) // Iterate over results, printing to console for res := range results { prettyPrint(res) }}func prettyPrint(i interface{}) { prettyJSON, err := json.MarshalIndent(i, "", " ") if err != nil { fmt.Printf("Error : %s \n", err.Error()) } fmt.Printf("MyReturnType %s\n", string(prettyJSON))}返回:// MyReturnType {// "Name": "job2",// "Data": "i am a result"// }// MyReturnType {// "Name": "job1",// "Data": {// "Whatever": "You want"// }// }更新 4在研究了几个小时之后,我建议使用类似的东西workerpool,你可以在这里找到。老实说,使用workerpool似乎在这里最有意义。它看起来已经准备好生产并且被少数相当大的名字使用(请参阅他们的 repo 中的自述文件)。我写了一个小例子,展示了如何使用workerpool:package mainimport ( "fmt" "net/http" "time" "github.com/gammazero/workerpool")var ( numWorkers = 10 urls = []string{"yahoo.com", "example.com", "google.com"})func main() { // create results chan and worker pool // should prob make your results channel typed to what you need results := make(chan interface{}, len(urls)) pool := workerpool.New(numWorkers) // Create jobs by iterating over urls for i, u := range urls { url := u jobNum := i // Create job f := func() { start := time.Now() c := &http.Client{} r, e := c.Get("http://" + url) if e != nil { fmt.Println(e.Error()) } took := time.Since(start).Milliseconds() o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'\n", jobNum, url, took, r.StatusCode) results <- o } // Add job to workerpool pool.Submit(f) } // Wait for all jobs to finish pool.StopWait() // Close results chan close(results) // Iterate over results, printing to console for res := range results { fmt.Printf(res.(string)) }}哪个输出:// completed job '1' to 'example.com' in '81ms' with status code '200'// completed job '2' to 'google.com' in '249ms' with status code '200'// completed job '0' to 'yahoo.com' in '816ms' with status code '200'更新 3我继续编写了一个工作池库(在 的帮助下workerpool),因为我还想更深入地研究通道和并发设计。你可以在这里找到 repo和下面的代码。如何使用:pool := New(3)pool.Job(func() { c := &http.Client{} r, e := c.Get("http://google.com") if e != nil { panic(e.Error()) } fmt.Printf("To google.com %d\n", r.StatusCode)})pool.Job(func() { c := &http.Client{} r, e := c.Get("http://yahoo.com") if e != nil { panic(e.Error()) } fmt.Printf("To yahoo.com %d\n", r.StatusCode)})pool.Job(func() { c := &http.Client{} r, e := c.Get("http://example.com") if e != nil { panic(e.Error()) } fmt.Printf("To example.com %d\n", r.StatusCode)})pool.Seal()工作池代码(水坑)package puddleimport ( "container/list" "fmt" "net/http" "sync" "time")const ( idleTimeout = time.Second * 2)// New creates a new puddle (aka worker pool)func New(maxWorkers int) Puddle { // There must be at least one worker if maxWorkers < 1 { maxWorkers = 1 } p := &puddle{ maxWorkers: maxWorkers, jobs: make(chan func(), 1), workers: make(chan func()), killswitch: make(chan struct{}), } // Start accepting/working jobs as they come in go p.serve() return p}// Puddle knows how to interact with worker poolstype Puddle interface { Job(f func()) Seal()}// puddle is a worker pool that holds workers, tasks, and misc metadatatype puddle struct { maxWorkers int jobs chan func() workers chan func() killswitch chan struct{} queue List once sync.Once stopped int32 waiting int32 wait bool}// Job submits a new task to the worker poolfunc (p *puddle) Job(f func()) { if f != nil { p.jobs <- f }}// Seal stops worker pool and waits for queued tasks to completefunc (p *puddle) Seal() { p.stop(true)}func (p *puddle) stop(wait bool) { p.once.Do(func() { p.wait = wait // Close task queue and wait for currently running tasks to finish close(p.jobs) }) <-p.killswitch}func (p *puddle) killWorkerIfIdle() bool { select { case p.workers <- nil: // Kill worker return true default: // No ready workers return false }}// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.// Returns false if puddle is stopped.func (p *puddle) process() bool { select { case task, ok := <-p.jobs: if !ok { return false } p.queue.PushBack(task) case p.workers <- p.queue.Front().Value.(func()): // Give task to ready worker p.queue.PopFront() } return true}func (p *puddle) serve() { defer close(p.killswitch) timeout := time.NewTimer(idleTimeout) var workerCount int var idle boolServing: for { if p.queue.Len() != 0 { if !p.process() { break Serving } continue } select { case job, ok := <-p.jobs: if !ok { break Serving } // Give a task to our workers select { case p.workers <- job: default: // If we are not maxed on workers, create a new one if workerCount < p.maxWorkers { go startJob(job, p.workers) workerCount++ } else { // Place a task on the back of the queue p.queue.PushBack(job) } } idle = false case <-timeout.C: // Timed out waiting for work to arrive. Kill a ready worker if // pool has been idle for a whole timeout. if idle && workerCount > 0 { if p.killWorkerIfIdle() { workerCount-- } } idle = true timeout.Reset(idleTimeout) } } // Allow queued jobs to complete if p.wait { p.work() } // Stop all workers before shutting down for workerCount > 0 { p.workers <- nil workerCount-- } timeout.Stop()}// work removes each task from the waiting queue and gives it to// workers until queue is empty.func (p *puddle) work() { for p.queue.Len() != 0 { // A worker is ready, so give task to worker. p.workers <- p.queue.PopFront() }}// startJob runs initial task, then starts a worker waiting for more.func startJob(job func(), workerQueue chan func()) { job() go worker(workerQueue)}// worker executes tasks and stops when it receives a nil task.func worker(queue chan func()) { for job := range queue { if job == nil { return } job() }}// List wraps `container/list`type List struct { list.List}// PopFront removes then returns first element in list as func()func (l *List) PopFront() func() { f := l.Front() l.Remove(f) return f.Value.(func())}更新 2由于您询问如何使用代码,这就是您要这样做的方式。我变成worker了它自己的包,并编写了另一个 repo 来展示如何使用该包。工人包如何使用工人包worker包裹package workerimport "fmt"type JobResponse struct { err error name string res int url string}type Job interface { Name() string Callback() JobResponse}func Do(jobs []Job, maxWorkers int) { jobsPool := make(chan Job, len(jobs)) resultsPool := make(chan JobResponse, len(jobs)) for i := 0; i < maxWorkers; i++ { go worker(jobsPool, resultsPool) } makeJobs(jobsPool, jobs) getResults(resultsPool, jobs)}func worker(jobs <-chan Job, response chan<- JobResponse) { for n := range jobs { response <- n.Callback() }}func makeJobs(jobs chan<- Job, queue []Job) { for _, t := range queue { jobs <- t }}func getResults(response <-chan JobResponse, queue []Job) { for range queue { job := <-response status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res) if job.err != nil { status = fmt.Sprintf(job.err.Error()) } fmt.Printf(status) }}如何使用工人包package mainimport ( "github.com/oze4/worker")func main() { jobs := []worker.Job{ AddedByUser{name: "1"}, AddedByUser{name: "2"}, AddedByUser{name: "3"}, AddedByUser{name: "4"}, AddedByUser{name: "5"}, AddedByUser{name: "6"}, } worker.Do(jobs, 5)}type AddedByUser struct { name string}func (a AddedByUser) Name() string { return a.name}func (a AddedByUser) Callback() worker.JobResponse { // User added func/callback goes here return worker.JobResponse{}}更新我重命名了一些东西,希望能帮助它更清楚一点。这是您需要的基础知识:package mainimport ( "fmt")func main() { fmt.Println("Hello, playground")}type JobResponse struct { err error name string res int url string}type Job interface { Name() string Callback() JobResponse}func worker(jobs <-chan Job, response chan<- JobResponse) { for n := range jobs { response <- n.Callback() }}func makeJobs(jobs chan<- Job, queue []Job) { for _, t := range queue { jobs <- t }}func getResults(response <-chan JobResponse, queue []Job) { for range queue { j := <-response status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", j.name, j.url, j.res) if j.err != nil { status = fmt.Sprintf(j.err.Error()) } fmt.Printf(status) }}只要我满足Job接口,我就可以将它传递给 worker、makeJobs 和 getResults:type AddedByUser struct { name string}func (a AddedByUser) Name() string { return a.name}func (a AddedByUser) Callback() JobResponse { // User added func/callback goes here return JobResponse{}}像这样:package mainimport ( "fmt")func main() { jobsPool := make(chan Job, len(testers)) resultsPool := make(chan JobResponse, len(testers)) maxWorkers := 5 for i := 0; i < maxWorkers; i++ { go worker(jobsPool, resultsPool) } makeJobs(jobsPool, testers) getResults(resultsPool, testers)}var testers = []Job{ AddedByUser{name: "abu"}, // Using different types in Job Tap{name: "tap"}, // Using different types in Job}type AddedByUser struct { name string}func (a AddedByUser) Name() string { return a.name}func (a AddedByUser) Callback() JobResponse { // User added func/callback goes here return JobResponse{}}type Tap struct { name string}func (t Tap) Name() string { return t.name}func (t Tap) Callback() JobResponse { // User added func/callback goes here return JobResponse{}}type JobResponse struct { err error name string res int url string}type Job interface { Name() string Callback() JobResponse}func worker(jobs <-chan Job, response chan<- JobResponse) { for n := range jobs { response <- n.Callback() }}func makeJobs(jobs chan<- Job, queue []Job) { for _, t := range queue { jobs <- t }}func getResults(response <-chan JobResponse, queue []Job) { for range queue { job := <-response status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res) if job.err != nil { status = fmt.Sprintf(job.err.Error()) } fmt.Printf(status) }}原始答案[添加此答案是因为 OP 和我一直在此线程之外交谈]您的代码中有几个错误,但最终您所要做的就是接受人们给您的建议。你只需要连接点。我建议对您的代码进行故障排除并尝试完全了解问题所在。老实说,这是唯一的学习方法。我能记住的最大问题是:需要修改您的HT界面,以便Check(...)签名匹配每个方法否则,这些结构 ( Tap, Tap1, Tap2) 不满足HT接口,因此不实现 HTfuncs worker(...)、makeJobs(...)和getResults(...)中的参数类型从更改[]Tap为[]HT 您没有将所有 Taps 聚合到一个切片中我们可以将所有不同的 Taps 用作 HT 的唯一原因是因为它们都实现了 HT你在找这样的东西吗?https://play.golang.org/p/zLmKOKAnX4Cpackage mainimport ( "fmt" "net/http" // "os/exec" "time")type HT interface { Name() string Check() testerResponse}type testerResponse struct { err error name string //res http.Response res int url string}type Tap struct { url string name string timeout time.Duration client *http.Client}func (p *Tap) Check() testerResponse { fmt.Printf("[job][Tap1] Fetching %s %s \n", p.name, p.url) p.client = &http.Client{Timeout: p.timeout} res, err := p.client.Get(p.url) if err != nil { return testerResponse{err: err} } // need to close body res.Body.Close() return testerResponse{name: p.name, res: res.StatusCode, url: p.url}}func (p *Tap) Name() string { return p.name}// ---- CUSTOM CHECKS-------------// ---- 1. NEW specific function -------------type Tap2 struct { url string name string timeout time.Duration client *http.Client}func (p *Tap2) Check() testerResponse { // Do some request here..... fmt.Printf("[job][Tap2] Fetching %s %s \n", p.name, p.url) return testerResponse{res: 200, url: p.url, name: p.name}}func (p *Tap2) Name() string { return "yahoo custom check"}// ---- 2. NEW specific function which is not running httptype Tap3 struct { url string name string timeout time.Duration client *http.Client}func (p *Tap3) Check() testerResponse { // Do some request here.... fmt.Printf("[job][Tap3] Fetching %s %s \n", p.name, p.url) return testerResponse{res: 200, url: p.url, name: p.name}}func (p *Tap3) Name() string { return "custom check2"}// makeJobs fills up our jobs channelfunc makeJobs(jch chan<- HT, jobs []HT) { for _, t := range jobs { jch <- t }}// getResults takes a job from our jobs channel, gets the result, and// places it on the results channelfunc getResults(tr <-chan testerResponse, jobs []HT) []testerResponse { var rts []testerResponse var r testerResponse for range jobs { r = <-tr status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res) if r.err != nil { status = fmt.Sprintf(r.err.Error()) } fmt.Printf(status) rts = append(rts, r) } return rts}// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up the "next" jobfunc worker(jobs <-chan HT, results chan<- testerResponse) { for n := range jobs { results <- n.Check() }}var ( testers1 = []Tap{ { name: "First Tap1", url: "http://google.com", timeout: time.Second * 20, }, { name: "Second Tap1", url: "http://stackoverflow.com", timeout: time.Second * 20, }, } testers2 = []Tap2{ { name: "First Tap2", url: "http://1.tap2.com", }, { name: "Second Tap2", url: "http://2.tap2.com", }, } testers3 = []Tap3{ { name: "First Tap3", url: "http://1.tap3.com", }, { name: "Second Tap3", url: "http://2.tap3.com", }, })func main() { // Aggregate all testers into one slice var testers []HT for _, t1 := range testers1 { testers = append(testers, &t1) } for _, t2 := range testers2 { testers = append(testers, &t2) } for _, t3 := range testers3 { testers = append(testers, &t3) } // Make buffered channels buffer := len(testers) jobsPipe := make(chan HT, buffer) // Jobs will be of type `HT` resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse` // Create worker pool // Max workers default is 5 maxWorkers := 5 for i := 0; i < maxWorkers; i++ { go worker(jobsPipe, resultsPipe) } makeJobs(jobsPipe, testers) getResults(resultsPipe, testers) //fmt.Println("at the end",tr)}哪个输出:// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com // [job][Tap2] Fetching Second Tap2 http://2.tap2.com // [job][Tap3] Fetching Second Tap3 http://2.tap3.com // [job][Tap3] Fetching Second Tap3 http://2.tap3.com // [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'// [job][Tap2] Fetching Second Tap2 http://2.tap2.com // [job][Tap1] Fetching Second Tap1 http://stackoverflow.com // [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'