添加回调而不是使用默认实现

我正在使用以下代码,它按预期工作。


用户向配置中添加testers一个新条目(现在它是硬编码的,但它将来自配置文件),该条目返回一个TAP他需要检查并通过 http 调用并行运行它们的列表。


还有一个我需要支持的用例,即用户还将提供一个function/method/callback函数将通过 http/curl/websocket/他需要的任何东西实现调用(而不是 check() 函数),并且该函数将返回响应无论是 200/400/500。


例如,假设用户除了配置点击列表之外还实现了两个函数/回调,程序将执行与列表相同的函数testers,这些函数将调用其他站点,例如: "http://www.yahoo.com"和https://www.bing.comcurl 或 http(只是为了演示区别)甚至一些是实现方法检查以返回一些子进程执行结果。


我怎样才能以干净的方式做到这一点?


package main


import (

    "fmt"

    "net/http"

    "time"

)


type HT interface {

    Name() string

    Check() (*testerResponse, error)

}


type testerResponse struct {

    err  error

    name string

    res  http.Response

    url  string

}


type Tap struct {

    url     string

    name    string

    timeout time.Duration

    client  *http.Client

}


func NewTap(name, url string, timeout time.Duration) *Tap {

    return &Tap{

        url:    url,

        name:   name,

        client: &http.Client{Timeout: timeout},

    }

}


func (p *Tap) Check() testerResponse {

    fmt.Printf("Fetching %s %s \n", p.name, p.url)

    // theres really no need for NewTap

    nt := NewTap(p.name, p.url, p.timeout)

    res, err := nt.client.Get(p.url)

    if err != nil {

        return testerResponse{err: err}

    }


    // need to close body

    res.Body.Close()

    return testerResponse{name: p.name, res: *res, url: p.url}

}


func (p *Tap) Name() string {

    return p.name

}


// makeJobs fills up our jobs channel

func makeJobs(jobs chan<- Tap, taps []Tap) {

    for _, t := range taps {

        jobs <- t

    }


更新 我尝试过以下 https://play.golang.org/p/cRPPzke27dZ


但不确定如何调用该custom handlers check()方法以在并行调用(例如配置)中从它们获取testers数据


红颜莎娜
浏览 147回答 2
2回答

婷婷同学_

更新 5(接受的答案)*既然您对这个问题感兴趣,那么您可能也对这个问题感兴趣。有关如何使用自动取消超时运行每个作业的更多信息,请参见此处。*要回答您的问题,您将如何添加随机函数..我不知道你想要返回什么类型,但你可以做任何你想做的事情。有大约一百万种不同的方法可以做到这一点,这只是一个例子:package mainimport (&nbsp; &nbsp; "encoding/json"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "github.com/gammazero/workerpool")var (&nbsp; &nbsp; numWorkers = 10)type MyReturnType struct {&nbsp; &nbsp; Name string&nbsp; &nbsp; Data interface{}}func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() {&nbsp; &nbsp; return func() {&nbsp; &nbsp; &nbsp; &nbsp; rc <- f()&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; // create results chan and worker pool&nbsp; &nbsp; // should prob make your results channel typed to what you need&nbsp; &nbsp; jobs := []func() MyReturnType {&nbsp; &nbsp; &nbsp; &nbsp; func() MyReturnType {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // whatever you want to do here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}}&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; func() MyReturnType {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // whatever you want to do here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // do a curl or a kubectl or whatever you want&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resultFromCurl := "i am a result"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return MyReturnType{Name: "job2", Data: resultFromCurl}&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; results := make(chan MyReturnType, len(jobs))&nbsp; &nbsp; pool := workerpool.New(numWorkers)&nbsp; &nbsp; for _, job := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; j := job&nbsp; &nbsp; &nbsp; &nbsp; pool.Submit(wrapJob(results, j))&nbsp; &nbsp; }&nbsp; &nbsp; // Wait for all jobs to finish&nbsp; &nbsp; pool.StopWait()&nbsp; &nbsp; // Close results chan&nbsp; &nbsp; close(results)&nbsp; &nbsp; // Iterate over results, printing to console&nbsp; &nbsp; for res := range results {&nbsp; &nbsp; &nbsp; &nbsp; prettyPrint(res)&nbsp; &nbsp; }}func prettyPrint(i interface{}) {&nbsp; &nbsp; prettyJSON, err := json.MarshalIndent(i, "", "&nbsp; &nbsp; ")&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Error : %s \n", err.Error())&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("MyReturnType %s\n", string(prettyJSON))}返回:// MyReturnType {//&nbsp; &nbsp; &nbsp;"Name": "job2",//&nbsp; &nbsp; &nbsp;"Data": "i am a result"// }// MyReturnType {//&nbsp; &nbsp; &nbsp;"Name": "job1",//&nbsp; &nbsp; &nbsp;"Data": {//&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;"Whatever": "You want"//&nbsp; &nbsp; &nbsp;}// }更新 4在研究了几个小时之后,我建议使用类似的东西workerpool,你可以在这里找到。老实说,使用workerpool似乎在这里最有意义。它看起来已经准备好生产并且被少数相当大的名字使用(请参阅他们的 repo 中的自述文件)。我写了一个小例子,展示了如何使用workerpool:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/gammazero/workerpool")var (&nbsp; &nbsp; numWorkers = 10&nbsp; &nbsp; urls&nbsp; &nbsp; &nbsp; &nbsp;= []string{"yahoo.com", "example.com", "google.com"})func main() {&nbsp; &nbsp; // create results chan and worker pool&nbsp; &nbsp; // should prob make your results channel typed to what you need&nbsp; &nbsp; results := make(chan interface{}, len(urls))&nbsp; &nbsp; pool := workerpool.New(numWorkers)&nbsp; &nbsp; // Create jobs by iterating over urls&nbsp; &nbsp; for i, u := range urls {&nbsp; &nbsp; &nbsp; &nbsp; url := u&nbsp; &nbsp; &nbsp; &nbsp; jobNum := i&nbsp; &nbsp; &nbsp; &nbsp; // Create job&nbsp; &nbsp; &nbsp; &nbsp; f := func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; start := time.Now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c := &http.Client{}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r, e := c.Get("http://" + url)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if e != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(e.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; took := time.Since(start).Milliseconds()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'\n", jobNum, url, took, r.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results <- o&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // Add job to workerpool&nbsp; &nbsp; &nbsp; &nbsp; pool.Submit(f)&nbsp; &nbsp; }&nbsp; &nbsp; // Wait for all jobs to finish&nbsp; &nbsp; pool.StopWait()&nbsp; &nbsp; // Close results chan&nbsp; &nbsp; close(results)&nbsp; &nbsp; // Iterate over results, printing to console&nbsp; &nbsp; for res := range results {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(res.(string))&nbsp; &nbsp; }}哪个输出:// completed job '1' to 'example.com' in '81ms' with status code '200'// completed job '2' to 'google.com' in '249ms' with status code '200'// completed job '0' to 'yahoo.com' in '816ms' with status code '200'更新 3我继续编写了一个工作池库(在 的帮助下workerpool),因为我还想更深入地研究通道和并发设计。你可以在这里找到 repo和下面的代码。如何使用:pool := New(3)pool.Job(func() {&nbsp; &nbsp; c := &http.Client{}&nbsp; &nbsp; r, e := c.Get("http://google.com")&nbsp; &nbsp; if e != nil {&nbsp; &nbsp; &nbsp; &nbsp; panic(e.Error())&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("To google.com %d\n", r.StatusCode)})pool.Job(func() {&nbsp; &nbsp; c := &http.Client{}&nbsp; &nbsp; r, e := c.Get("http://yahoo.com")&nbsp; &nbsp; if e != nil {&nbsp; &nbsp; &nbsp; &nbsp; panic(e.Error())&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("To yahoo.com %d\n", r.StatusCode)})pool.Job(func() {&nbsp; &nbsp; c := &http.Client{}&nbsp; &nbsp; r, e := c.Get("http://example.com")&nbsp; &nbsp; if e != nil {&nbsp; &nbsp; &nbsp; &nbsp; panic(e.Error())&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Printf("To example.com %d\n", r.StatusCode)})pool.Seal()工作池代码(水坑)package puddleimport (&nbsp; &nbsp; "container/list"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")const (&nbsp; &nbsp; idleTimeout = time.Second * 2)// New creates a new puddle (aka worker pool)func New(maxWorkers int) Puddle {&nbsp; &nbsp; // There must be at least one worker&nbsp; &nbsp; if maxWorkers < 1 {&nbsp; &nbsp; &nbsp; &nbsp; maxWorkers = 1&nbsp; &nbsp; }&nbsp; &nbsp; p := &puddle{&nbsp; &nbsp; &nbsp; &nbsp; maxWorkers: maxWorkers,&nbsp; &nbsp; &nbsp; &nbsp; jobs:&nbsp; &nbsp; &nbsp; &nbsp;make(chan func(), 1),&nbsp; &nbsp; &nbsp; &nbsp; workers:&nbsp; &nbsp; make(chan func()),&nbsp; &nbsp; &nbsp; &nbsp; killswitch: make(chan struct{}),&nbsp; &nbsp; }&nbsp; &nbsp; // Start accepting/working jobs as they come in&nbsp; &nbsp; go p.serve()&nbsp; &nbsp; return p}// Puddle knows how to interact with worker poolstype Puddle interface {&nbsp; &nbsp; Job(f func())&nbsp; &nbsp; Seal()}// puddle is a worker pool that holds workers, tasks, and misc metadatatype puddle struct {&nbsp; &nbsp; maxWorkers int&nbsp; &nbsp; jobs&nbsp; &nbsp; &nbsp; &nbsp;chan func()&nbsp; &nbsp; workers&nbsp; &nbsp; chan func()&nbsp; &nbsp; killswitch chan struct{}&nbsp; &nbsp; queue&nbsp; &nbsp; &nbsp; List&nbsp; &nbsp; once&nbsp; &nbsp; &nbsp; &nbsp;sync.Once&nbsp; &nbsp; stopped&nbsp; &nbsp; int32&nbsp; &nbsp; waiting&nbsp; &nbsp; int32&nbsp; &nbsp; wait&nbsp; &nbsp; &nbsp; &nbsp;bool}// Job submits a new task to the worker poolfunc (p *puddle) Job(f func()) {&nbsp; &nbsp; if f != nil {&nbsp; &nbsp; &nbsp; &nbsp; p.jobs <- f&nbsp; &nbsp; }}// Seal stops worker pool and waits for queued tasks to completefunc (p *puddle) Seal() {&nbsp; &nbsp; p.stop(true)}func (p *puddle) stop(wait bool) {&nbsp; &nbsp; p.once.Do(func() {&nbsp; &nbsp; &nbsp; &nbsp; p.wait = wait&nbsp; &nbsp; &nbsp; &nbsp; // Close task queue and wait for currently running tasks to finish&nbsp; &nbsp; &nbsp; &nbsp; close(p.jobs)&nbsp; &nbsp; })&nbsp; &nbsp; <-p.killswitch}func (p *puddle) killWorkerIfIdle() bool {&nbsp; &nbsp; select {&nbsp; &nbsp; case p.workers <- nil:&nbsp; &nbsp; &nbsp; &nbsp; // Kill worker&nbsp; &nbsp; &nbsp; &nbsp; return true&nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; // No ready workers&nbsp; &nbsp; &nbsp; &nbsp; return false&nbsp; &nbsp; }}// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.// Returns false if puddle is stopped.func (p *puddle) process() bool {&nbsp; &nbsp; select {&nbsp; &nbsp; case task, ok := <-p.jobs:&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return false&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; p.queue.PushBack(task)&nbsp; &nbsp; case p.workers <- p.queue.Front().Value.(func()):&nbsp; &nbsp; &nbsp; &nbsp; // Give task to ready worker&nbsp; &nbsp; &nbsp; &nbsp; p.queue.PopFront()&nbsp; &nbsp; }&nbsp; &nbsp; return true}func (p *puddle) serve() {&nbsp; &nbsp; defer close(p.killswitch)&nbsp; &nbsp; timeout := time.NewTimer(idleTimeout)&nbsp; &nbsp; var workerCount int&nbsp; &nbsp; var idle boolServing:&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; if p.queue.Len() != 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !p.process() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break Serving&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case job, ok := <-p.jobs:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break Serving&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Give a task to our workers&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case p.workers <- job:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // If we are not maxed on workers, create a new one&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if workerCount < p.maxWorkers {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go startJob(job, p.workers)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; workerCount++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Place a task on the back of the queue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.queue.PushBack(job)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; idle = false&nbsp; &nbsp; &nbsp; &nbsp; case <-timeout.C:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Timed out waiting for work to arrive.&nbsp; Kill a ready worker if&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // pool has been idle for a whole timeout.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if idle && workerCount > 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if p.killWorkerIfIdle() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; workerCount--&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; idle = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout.Reset(idleTimeout)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; // Allow queued jobs to complete&nbsp; &nbsp; if p.wait {&nbsp; &nbsp; &nbsp; &nbsp; p.work()&nbsp; &nbsp; }&nbsp; &nbsp; // Stop all workers before shutting down&nbsp; &nbsp; for workerCount > 0 {&nbsp; &nbsp; &nbsp; &nbsp; p.workers <- nil&nbsp; &nbsp; &nbsp; &nbsp; workerCount--&nbsp; &nbsp; }&nbsp; &nbsp; timeout.Stop()}// work removes each task from the waiting queue and gives it to// workers until queue is empty.func (p *puddle) work() {&nbsp; &nbsp; for p.queue.Len() != 0 {&nbsp; &nbsp; &nbsp; &nbsp; // A worker is ready, so give task to worker.&nbsp; &nbsp; &nbsp; &nbsp; p.workers <- p.queue.PopFront()&nbsp; &nbsp; }}// startJob runs initial task, then starts a worker waiting for more.func startJob(job func(), workerQueue chan func()) {&nbsp; &nbsp; job()&nbsp; &nbsp; go worker(workerQueue)}// worker executes tasks and stops when it receives a nil task.func worker(queue chan func()) {&nbsp; &nbsp; for job := range queue {&nbsp; &nbsp; &nbsp; &nbsp; if job == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; job()&nbsp; &nbsp; }}// List wraps `container/list`type List struct {&nbsp; &nbsp; list.List}// PopFront removes then returns first element in list as func()func (l *List) PopFront() func() {&nbsp; &nbsp; f := l.Front()&nbsp; &nbsp; l.Remove(f)&nbsp; &nbsp; return f.Value.(func())}更新 2由于您询问如何使用代码,这就是您要这样做的方式。我变成worker了它自己的包,并编写了另一个 repo 来展示如何使用该包。工人包如何使用工人包worker包裹package workerimport "fmt"type JobResponse struct {&nbsp; &nbsp; err&nbsp; error&nbsp; &nbsp; name string&nbsp; &nbsp; res&nbsp; int&nbsp; &nbsp; url&nbsp; string}type Job interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Callback() JobResponse}func Do(jobs []Job, maxWorkers int) {&nbsp; &nbsp; jobsPool := make(chan Job, len(jobs))&nbsp; &nbsp; resultsPool := make(chan JobResponse, len(jobs))&nbsp; &nbsp; for i := 0; i < maxWorkers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go worker(jobsPool, resultsPool)&nbsp; &nbsp; }&nbsp; &nbsp; makeJobs(jobsPool, jobs)&nbsp; &nbsp; getResults(resultsPool, jobs)}func worker(jobs <-chan Job, response chan<- JobResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; response <- n.Callback()&nbsp; &nbsp; }}func makeJobs(jobs chan<- Job, queue []Job) {&nbsp; &nbsp; for _, t := range queue {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}func getResults(response <-chan JobResponse, queue []Job) {&nbsp; &nbsp; for range queue {&nbsp; &nbsp; &nbsp; &nbsp; job := <-response&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)&nbsp; &nbsp; &nbsp; &nbsp; if job.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(job.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; }}如何使用工人包package mainimport (&nbsp; &nbsp; "github.com/oze4/worker")func main() {&nbsp; &nbsp; jobs := []worker.Job{&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "1"},&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "2"},&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "3"},&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "4"},&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "5"},&nbsp; &nbsp; &nbsp; &nbsp; AddedByUser{name: "6"},&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; worker.Do(jobs, 5)}type AddedByUser struct {&nbsp; &nbsp; name string}func (a AddedByUser) Name() string {&nbsp; &nbsp; return a.name}func (a AddedByUser) Callback() worker.JobResponse {&nbsp; &nbsp; // User added func/callback goes here&nbsp; &nbsp; return worker.JobResponse{}}更新我重命名了一些东西,希望能帮助它更清楚一点。这是您需要的基础知识:package mainimport (&nbsp; &nbsp; "fmt")func main() {&nbsp; &nbsp; fmt.Println("Hello, playground")}type JobResponse struct {&nbsp; &nbsp; err&nbsp; error&nbsp; &nbsp; name string&nbsp; &nbsp; res&nbsp; int&nbsp; &nbsp; url&nbsp; string}type Job interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Callback() JobResponse}func worker(jobs <-chan Job, response chan<- JobResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; response <- n.Callback()&nbsp; &nbsp; }}func makeJobs(jobs chan<- Job, queue []Job) {&nbsp; &nbsp; for _, t := range queue {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}func getResults(response <-chan JobResponse, queue []Job) {&nbsp; &nbsp; for range queue {&nbsp; &nbsp; &nbsp; &nbsp; j := <-response&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", j.name, j.url, j.res)&nbsp; &nbsp; &nbsp; &nbsp; if j.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(j.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; }}只要我满足Job接口,我就可以将它传递给 worker、makeJobs 和 getResults:type AddedByUser struct {&nbsp; &nbsp; name string}func (a AddedByUser) Name() string {&nbsp; &nbsp; return a.name}func (a AddedByUser) Callback() JobResponse {&nbsp; &nbsp; // User added func/callback goes here&nbsp; &nbsp; return JobResponse{}}像这样:package mainimport (&nbsp; &nbsp; "fmt")func main() {&nbsp; &nbsp; jobsPool := make(chan Job, len(testers))&nbsp; &nbsp; resultsPool := make(chan JobResponse, len(testers))&nbsp; &nbsp; maxWorkers := 5&nbsp; &nbsp; for i := 0; i < maxWorkers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go worker(jobsPool, resultsPool)&nbsp; &nbsp; }&nbsp; &nbsp; makeJobs(jobsPool, testers)&nbsp; &nbsp; getResults(resultsPool, testers)}var testers = []Job{&nbsp; &nbsp; AddedByUser{name: "abu"}, // Using different types in Job&nbsp; &nbsp; Tap{name: "tap"},&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Using different types in Job}type AddedByUser struct {&nbsp; &nbsp; name string}func (a AddedByUser) Name() string {&nbsp; &nbsp; return a.name}func (a AddedByUser) Callback() JobResponse {&nbsp; &nbsp; // User added func/callback goes here&nbsp; &nbsp; return JobResponse{}}type Tap struct {&nbsp; &nbsp; name string}func (t Tap) Name() string {&nbsp; &nbsp; return t.name}func (t Tap) Callback() JobResponse {&nbsp; &nbsp; // User added func/callback goes here&nbsp; &nbsp; return JobResponse{}}type JobResponse struct {&nbsp; &nbsp; err&nbsp; error&nbsp; &nbsp; name string&nbsp; &nbsp; res&nbsp; int&nbsp; &nbsp; url&nbsp; string}type Job interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Callback() JobResponse}func worker(jobs <-chan Job, response chan<- JobResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; response <- n.Callback()&nbsp; &nbsp; }}func makeJobs(jobs chan<- Job, queue []Job) {&nbsp; &nbsp; for _, t := range queue {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}func getResults(response <-chan JobResponse, queue []Job) {&nbsp; &nbsp; for range queue {&nbsp; &nbsp; &nbsp; &nbsp; job := <-response&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", job.name, job.url, job.res)&nbsp; &nbsp; &nbsp; &nbsp; if job.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(job.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; }}原始答案[添加此答案是因为 OP 和我一直在此线程之外交谈]您的代码中有几个错误,但最终您所要做的就是接受人们给您的建议。你只需要连接点。我建议对您的代码进行故障排除并尝试完全了解问题所在。老实说,这是唯一的学习方法。我能记住的最大问题是:需要修改您的HT界面,以便Check(...)签名匹配每个方法否则,这些结构 ( Tap, Tap1, Tap2) 不满足HT接口,因此不实现 HTfuncs worker(...)、makeJobs(...)和getResults(...)中的参数类型从更改[]Tap为[]HT&nbsp;您没有将所有 Taps 聚合到一个切片中我们可以将所有不同的 Taps 用作 HT 的唯一原因是因为它们都实现了 HT你在找这样的东西吗?https://play.golang.org/p/zLmKOKAnX4Cpackage mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; // "os/exec"&nbsp; &nbsp; "time")type HT interface {&nbsp; &nbsp; Name() string&nbsp; &nbsp; Check() testerResponse}type testerResponse struct {&nbsp; &nbsp; err&nbsp; error&nbsp; &nbsp; name string&nbsp; &nbsp; //res&nbsp; http.Response&nbsp; &nbsp; res int&nbsp; &nbsp; url string}type Tap struct {&nbsp; &nbsp; url&nbsp; &nbsp; &nbsp;string&nbsp; &nbsp; name&nbsp; &nbsp; string&nbsp; &nbsp; timeout time.Duration&nbsp; &nbsp; client&nbsp; *http.Client}func (p *Tap) Check() testerResponse {&nbsp; &nbsp; fmt.Printf("[job][Tap1] Fetching %s %s \n", p.name, p.url)&nbsp; &nbsp; p.client = &http.Client{Timeout: p.timeout}&nbsp; &nbsp; res, err := p.client.Get(p.url)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return testerResponse{err: err}&nbsp; &nbsp; }&nbsp; &nbsp; // need to close body&nbsp; &nbsp; res.Body.Close()&nbsp; &nbsp; return testerResponse{name: p.name, res: res.StatusCode, url: p.url}}func (p *Tap) Name() string {&nbsp; &nbsp; return p.name}// ---- CUSTOM CHECKS-------------// ---- 1. NEW specific function -------------type Tap2 struct {&nbsp; &nbsp; url&nbsp; &nbsp; &nbsp;string&nbsp; &nbsp; name&nbsp; &nbsp; string&nbsp; &nbsp; timeout time.Duration&nbsp; &nbsp; client&nbsp; *http.Client}func (p *Tap2) Check() testerResponse {&nbsp; &nbsp; // Do some request here.....&nbsp; &nbsp; fmt.Printf("[job][Tap2] Fetching %s %s \n", p.name, p.url)&nbsp; &nbsp; return testerResponse{res: 200, url: p.url, name: p.name}}func (p *Tap2) Name() string {&nbsp; &nbsp; return "yahoo custom check"}// ---- 2. NEW specific function which is not running httptype Tap3 struct {&nbsp; &nbsp; url&nbsp; &nbsp; &nbsp;string&nbsp; &nbsp; name&nbsp; &nbsp; string&nbsp; &nbsp; timeout time.Duration&nbsp; &nbsp; client&nbsp; *http.Client}func (p *Tap3) Check() testerResponse {&nbsp; &nbsp; // Do some request here....&nbsp; &nbsp; fmt.Printf("[job][Tap3] Fetching %s %s \n", p.name, p.url)&nbsp; &nbsp; return testerResponse{res: 200, url: p.url, name: p.name}}func (p *Tap3) Name() string {&nbsp; &nbsp; return "custom check2"}// makeJobs fills up our jobs channelfunc makeJobs(jch chan<- HT, jobs []HT) {&nbsp; &nbsp; for _, t := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; jch <- t&nbsp; &nbsp; }}// getResults takes a job from our jobs channel, gets the result, and// places it on the results channelfunc getResults(tr <-chan testerResponse, jobs []HT) []testerResponse {&nbsp; &nbsp; var rts []testerResponse&nbsp; &nbsp; var r testerResponse&nbsp; &nbsp; for range jobs {&nbsp; &nbsp; &nbsp; &nbsp; r = <-tr&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res)&nbsp; &nbsp; &nbsp; &nbsp; if r.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(r.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; &nbsp; &nbsp; rts = append(rts, r)&nbsp; &nbsp; }&nbsp; &nbsp; return rts}// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up&nbsp; the "next" jobfunc worker(jobs <-chan HT, results chan<- testerResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; results <- n.Check()&nbsp; &nbsp; }}var (&nbsp; &nbsp; testers1 = []Tap{&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "First Tap1",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://google.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; "Second Tap1",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; &nbsp; &nbsp;"http://stackoverflow.com",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout: time.Second * 20,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; testers2 = []Tap2{&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name: "First Tap2",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; "http://1.tap2.com",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name: "Second Tap2",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; "http://2.tap2.com",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; testers3 = []Tap3{&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name: "First Tap3",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; "http://1.tap3.com",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name: "Second Tap3",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; url:&nbsp; "http://2.tap3.com",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; })func main() {&nbsp; &nbsp; // Aggregate all testers into one slice&nbsp; &nbsp; var testers []HT&nbsp; &nbsp; for _, t1 := range testers1 {&nbsp; &nbsp; &nbsp; &nbsp; testers = append(testers, &t1)&nbsp; &nbsp; }&nbsp; &nbsp; for _, t2 := range testers2 {&nbsp; &nbsp; &nbsp; &nbsp; testers = append(testers, &t2)&nbsp; &nbsp; }&nbsp; &nbsp; for _, t3 := range testers3 {&nbsp; &nbsp; &nbsp; &nbsp; testers = append(testers, &t3)&nbsp; &nbsp; }&nbsp; &nbsp; // Make buffered channels&nbsp; &nbsp; buffer := len(testers)&nbsp; &nbsp; jobsPipe := make(chan HT, buffer)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Jobs will be of type `HT`&nbsp; &nbsp; resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`&nbsp; &nbsp; // Create worker pool&nbsp; &nbsp; // Max workers default is 5&nbsp; &nbsp; maxWorkers := 5&nbsp; &nbsp; for i := 0; i < maxWorkers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go worker(jobsPipe, resultsPipe)&nbsp; &nbsp; }&nbsp; &nbsp; makeJobs(jobsPipe, testers)&nbsp; &nbsp; getResults(resultsPipe, testers)&nbsp; &nbsp; //fmt.Println("at the end",tr)}哪个输出:// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com&nbsp;// [job][Tap2] Fetching Second Tap2 http://2.tap2.com&nbsp;// [job][Tap3] Fetching Second Tap3 http://2.tap3.com&nbsp;// [job][Tap3] Fetching Second Tap3 http://2.tap3.com&nbsp;// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'// [job][Tap2] Fetching Second Tap2 http://2.tap2.com&nbsp;// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com&nbsp;// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'

qq_遁去的一_1

据我了解,您希望您的员工接受其他测试人员查看您的代码后,您似乎将所有部分都放在了正确的位置,并且需要在此处进行一些小的更改// makeJobs fills up our jobs channelfunc makeJobs(jobs chan<- HT, taps []Tap) {&nbsp; &nbsp; for _, t := range taps {&nbsp; &nbsp; &nbsp; &nbsp; jobs <- t&nbsp; &nbsp; }}// getResults takes a job from our jobs channel, gets the result, and// places it on the results channelfunc getResults(tr <-chan HT, taps []Tap) {&nbsp; &nbsp; for range taps {&nbsp; &nbsp; &nbsp; &nbsp; r := <-tr&nbsp; &nbsp; &nbsp; &nbsp; status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; if r.err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = fmt.Sprintf(r.err.Error())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf(status)&nbsp; &nbsp; }}// worker defines our worker func. as long as there is a job in the// "queue" we continue to pick up&nbsp; the "next" jobfunc worker(jobs <-chan HT, results chan<- testerResponse) {&nbsp; &nbsp; for n := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; results <- n.Check()&nbsp; &nbsp; }}现在,如果您看到您的作业队列可以接受任何实现 HT 接口的类型,那么如果您想要一个新作业,请说 Tap2,您只需type Tap2 struct{...}func (p *Tap2) Check() testerResponse {...}func (p *Tap) Name() string {...}现在您可以将 Tap 和 Tap2 推送到同一个 jobQueue,因为 job Queue 接受任何实现 HT 的类型
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go