如何从 url 池发出并发 GET 请求

我完成了建议的游览,在 YouTube 上观看了一些教程和 gopher 会议。差不多就是这样。


我有一个项目需要我发送获取请求并将结果存储在文件中。但 URL 的数量约为 8000 万。


我只测试 1000 个 URL。


问题:尽管我遵循了一些指导方针,但我认为我无法使其并发。我不知道怎么了。但也许我错了,它是并发的,对我来说似乎并不快,速度感觉就像顺序请求。


这是我写的代码:


package main


import (

    "bufio"

    "io/ioutil"

    "log"

    "net/http"

    "os"

    "sync"

    "time"

)


var wg sync.WaitGroup // synchronization to wait for all the goroutines


func crawler(urlChannel <-chan string) {

    defer wg.Done()

    client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests


    for urlItem := range urlChannel {

        req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)                                           // generating the request

        req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent

        resp1, respErr1 := client.Do(req1)                                                                  // sending the prepared request and getting the response

        if respErr1 != nil {

            continue

        }


        defer resp1.Body.Close()


        if resp1.StatusCode/100 == 2 { // means server responded with 2xx code

            text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website

            if readErr1 != nil {

                log.Fatal(readErr1)

            }


            f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file

            if fileErr1 != nil {

                log.Fatal(fileErr1)

            }

            defer f1.Close()


            _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file

            if writeErr1 != nil {

                log.Fatal(writeErr1)

            }

        }

    }

}

我的问题是:为什么这段代码不能同时工作?我该如何解决我上面提到的问题。发出并发 GET 请求时我做错了什么吗?


茅侃侃
浏览 147回答 2
2回答

湖上湖

这里有一些代码可以让你思考。我将 URL 放在代码中,因此它是自给自足的,但实际上您可能会将它们通过管道传输到标准输入。我在这里做的一些事情我认为是改进的,或者至少值得考虑。在我们开始之前,我会指出我将完整的 url放在输入流中。一方面,这让我同时支持 http 和 https。我并没有真正看到在代码中硬编码方案而不是将其留在数据中的逻辑。首先,它可以处理任意大小的响应主体(您的版本将主体读入内存,因此它受到一些并发大请求填充内存的限制)。我用io.Copy().[编辑]text1, readErr1 := ioutil.ReadAll(resp1.Body)读取整个 http 正文。如果身体很大,它会占用大量内存。 io.Copy(f1,resp1.Body)而是将数据从 http 响应正文直接复制到文件中,而不必将整个内容保存在内存中。它可以在一次读/写或多次中完成。http.Response.Body是io.ReadCloser因为 HTTP 协议期望正文被逐步读取。 还http.Response没有完整的身体,直到它被阅读。这就是为什么它不仅仅是一个 [] 字节。当数据从 tcp 套接字“流入”时逐渐将其写入文件系统意味着有限数量的系统资源可以下载无限量的数据。但还有更多好处。io.Copy将调用ReadFrom()文件。如果您查看 linux 实现(例如):https ://golang.org/src/os/readfrom_linux.go并挖掘一下,您会发现它实际上使用了copy_file_range 该系统调用很酷,因为copy_file_range() 系统调用在两个文件描述符之间执行内核内复制,而无需将数据从内核传输到用户空间然后再返回内核的额外成本。*os.File知道如何要求内核将数据直接从 tcp 套接字传递到文件,而您的程序甚至不必接触它。请参阅https://golang.org/pkg/io/#Copy。其次,我确保使用文件名中的所有 url 组件。具有不同查询字符串的 URL 会转到不同的文件。该片段可能不会区分响应主体,因此可能会考虑将其包含在路径中。将 URL 转换为有效的文件路径没有很棒的启发式方法——如果这是一项严肃的任务,我可能会根据 url 的 shasum 或其他东西将数据存储在文件中——并创建存储在元数据文件中的结果索引。第三,我处理所有错误。 req1, _ := http.NewRequest(...可能看起来像一个方便的捷径,但它真正的意思是你不会知道任何错误的真正原因 - 充其量。我通常在渗透时在错误中添加一些描述性文本,以确保我可以轻松分辨出我返回的是哪个错误。最后,我返回成功处理的 URL,以便我可以看到最终结果。在扫描数百万个 URL 时,您可能还需要一个失败的列表,但成功的计数是发送最终数据以供汇总的良好开端。package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "bytes"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "io"&nbsp; &nbsp; "log"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "net/url"&nbsp; &nbsp; "os"&nbsp; &nbsp; "path/filepath"&nbsp; &nbsp; "time")const urls_text = `http://danf.us/https://farrellit.net/?3=2&#1`func crawler(urls <-chan *url.URL, done chan<- int) {&nbsp; &nbsp; var processed int = 0&nbsp; &nbsp; defer func() { done <- processed }()&nbsp; &nbsp; client := http.Client{Timeout: 10 * time.Second}&nbsp; &nbsp; for u := range urls {&nbsp; &nbsp; &nbsp; &nbsp; if req, err := http.NewRequest("GET", u.String(), nil); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Couldn't create new request for %s: %s", u.String(), err.Error())&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if res, err := client.Do(req); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Failed to get %s: %s", u.String(), err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; filename := filepath.Base(u.EscapedPath())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if filename == "/" || filename == "" {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; filename = "response"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("URL Filename is '%s'", filename)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; destpath := filepath.Join(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res.Status, u.Scheme, u.Hostname(), u.EscapedPath(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Sprintf("?%s",u.RawQuery), fmt.Sprintf("#%s",u.Fragment), filename,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err := os.MkdirAll(filepath.Dir(destpath), 0755); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Couldn't create directory %s: %s", filepath.Dir(destpath), err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else if f, err := os.OpenFile(destpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Couldn't open destination file %s: %s", destpath, err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if b, err := io.Copy(f, res.Body); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Could not copy %s body to %s: %s", u.String(), destpath, err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Copied %d bytes from body of %s to %s", b, u.String(), destpath)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processed++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}const workers = 3func main() {&nbsp; &nbsp; urls := make(chan *url.URL)&nbsp; &nbsp; done := make(chan int)&nbsp; &nbsp; var submitted int = 0&nbsp; &nbsp; var inputted int = 0&nbsp; &nbsp; var successful int = 0&nbsp; &nbsp; for i := 0; i < workers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go crawler(urls, done)&nbsp; &nbsp; }&nbsp; &nbsp; sc := bufio.NewScanner(bytes.NewBufferString(urls_text))&nbsp; &nbsp; for sc.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; inputted++&nbsp; &nbsp; &nbsp; &nbsp; if u, err := url.Parse(sc.Text()); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Could not parse %s as url: %w", sc.Text(), err)&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; submitted++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; urls <- u&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; close(urls)&nbsp; &nbsp; for i := 0; i < workers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; successful += <-done&nbsp; &nbsp; }&nbsp; &nbsp; log.Printf("%d urls input, %d could not be parsed. %d/%d valid URLs successful (%.0f%%)",&nbsp; &nbsp; &nbsp; &nbsp; inputted, inputted-submitted,&nbsp; &nbsp; &nbsp; &nbsp; successful, submitted,&nbsp; &nbsp; &nbsp; &nbsp; float64(successful)/float64(submitted)*100.0,&nbsp; &nbsp; )}

慕少森

设置并发管道时,要遵循的一个很好的指导原则是始终首先设置和实例化将同时执行的侦听器(在您的情况下为爬虫),然后开始通过管道向它们提供数据(在您的情况下为urlChannel) .在您的示例中,唯一防止死锁的是您已经实例化了一个缓冲通道,该通道具有与您的测试文件相同的行数(1000 行)。代码所做的是将 URL 放入urlChannel. 由于您的文件中有 1000 行,因此urlChannel可以在不阻塞的情况下获取所有行。如果您在文件中放入更多 URL,则在填满urlChannel.这是应该工作的代码版本:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "io/ioutil"&nbsp; &nbsp; "log"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "os"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func crawler(wg *sync.WaitGroup, urlChannel <-chan string) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests&nbsp; &nbsp; for urlItem := range urlChannel {&nbsp; &nbsp; &nbsp; &nbsp; req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// generating the request&nbsp; &nbsp; &nbsp; &nbsp; req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent&nbsp; &nbsp; &nbsp; &nbsp; resp1, respErr1 := client.Do(req1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // sending the prepared request and getting the response&nbsp; &nbsp; &nbsp; &nbsp; if respErr1 != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if resp1.StatusCode/100 == 2 { // means server responded with 2xx code&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if readErr1 != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(readErr1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resp1.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if fileErr1 != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(fileErr1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if writeErr1 != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(writeErr1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f1.Close()&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; file, err := os.Open("urls.txt") // the file containing the url's&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; defer file.Close() // don't forget to close the file&nbsp; &nbsp; urlChannel := make(chan string)&nbsp;&nbsp; &nbsp; _ = os.Mkdir("200", 0755) // if it's there, it will create an error, and we will simply ignore it&nbsp; &nbsp; // first, initialize crawlers&nbsp; &nbsp; wg.Add(10)&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go crawler(&wg, urlChannel)&nbsp; &nbsp; }&nbsp; &nbsp; //after crawlers are initialized, start feeding them data through the channel&nbsp; &nbsp; scanner := bufio.NewScanner(file) // each line has another url&nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; urlChannel <- scanner.Text()&nbsp; &nbsp; }&nbsp; &nbsp; close(urlChannel)&nbsp; &nbsp; wg.Wait()}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go