猿问

如何使用 goroutine 池

我想使用 Go 从雅虎财经下载股票价格电子表格。我将在自己的 goroutine 中为每只股票发出 http 请求。我有一个大约 2500 个符号的列表,但与其并行发出 2500 个请求,我更喜欢一次发出 250 个请求。在 Java 中,我会创建一个线程池并在线程空闲时重用它们。我试图找到类似的东西,一个 goroutine 池,如果你愿意的话,但找不到任何资源。如果有人能告诉我如何完成手头的任务或为我指出相同的资源,我将不胜感激。谢谢!


红糖糍粑
浏览 196回答 3
3回答

DIEA

我想,最简单的方法是创建 250 个 goroutine 并将它们传递给一个通道,您可以使用该通道将链接从主 goroutine 传递到子 goroutine,并监听该通道。当所有链接都传递给 goroutine 时,您关闭一个通道,所有 goroutine 就完成了它们的工作。为了在孩子处理数据之前完成主 goroutine 的安全,您可以使用sync.WaitGroup.下面是一些代码来说明我上面所说的(不是最终的工作版本,而是说明了这一点):func worker(linkChan chan string, wg *sync.WaitGroup) {&nbsp; &nbsp;// Decreasing internal counter for wait-group as soon as goroutine finishes&nbsp; &nbsp;defer wg.Done()&nbsp; &nbsp;for url := range linkChan {&nbsp; &nbsp; &nbsp;// Analyze value and do the job here&nbsp; &nbsp;}}func main() {&nbsp; &nbsp; lCh := make(chan string)&nbsp; &nbsp; wg := new(sync.WaitGroup)&nbsp; &nbsp; // Adding routines to workgroup and running then&nbsp; &nbsp; for i := 0; i < 250; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go worker(lCh, wg)&nbsp; &nbsp; }&nbsp; &nbsp; // Processing all links by spreading them to `free` goroutines&nbsp; &nbsp; for _, link := range yourLinksSlice {&nbsp; &nbsp; &nbsp; &nbsp; lCh <- link&nbsp; &nbsp; }&nbsp; &nbsp; // Closing channel (waiting in goroutines won't continue any more)&nbsp; &nbsp; close(lCh)&nbsp; &nbsp; // Waiting for all goroutines to finish (otherwise they die as main routine dies)&nbsp; &nbsp; wg.Wait()}

潇湘沐

你可以使用Go这个git repo 中的线程池实现库这是关于如何使用通道作为线程池的好博客来自博客的片段&nbsp; &nbsp; var (&nbsp;MaxWorker = os.Getenv("MAX_WORKERS")&nbsp;MaxQueue&nbsp; = os.Getenv("MAX_QUEUE"))//Job represents the job to be runtype Job struct {&nbsp; &nbsp; Payload Payload}// A buffered channel that we can send work requests on.var JobQueue chan Job// Worker represents the worker that executes the jobtype Worker struct {&nbsp; &nbsp; WorkerPool&nbsp; chan chan Job&nbsp; &nbsp; JobChannel&nbsp; chan Job&nbsp; &nbsp; quit&nbsp; &nbsp; &nbsp; &nbsp; chan bool}func NewWorker(workerPool chan chan Job) Worker {&nbsp; &nbsp; return Worker{&nbsp; &nbsp; &nbsp; &nbsp; WorkerPool: workerPool,&nbsp; &nbsp; &nbsp; &nbsp; JobChannel: make(chan Job),&nbsp; &nbsp; &nbsp; &nbsp; quit:&nbsp; &nbsp; &nbsp; &nbsp;make(chan bool)}}// Start method starts the run loop for the worker, listening for a quit channel in// case we need to stop itfunc (w Worker) Start() {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // register the current worker into the worker queue.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; w.WorkerPool <- w.JobChannel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case job := <-w.JobChannel:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // we have received a work request.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err := job.Payload.UploadToS3(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Errorf("Error uploading to S3: %s", err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <-w.quit:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // we have received a signal to stop&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()}// Stop signals the worker to stop listening for work requests.func (w Worker) Stop() {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; w.quit <- true&nbsp; &nbsp; }()}&nbsp;
随时随地看视频慕课网APP

相关分类

Go
我要回答