Go net/http 在高负载下泄漏内存

我正在开发一个使用该net/http包调用客户端 URL 的 API。根据用户国家/操作系统,goroutines 中的每个请求(POST 调用)会同时调用 1 到 8 个 URL。该应用程序以大约 1000-1500 个请求的低 qps 运行,但将应用程序扩展到 3k 请求时,内存会突然增加,即使仅调用 1 个客户端 URL,应用程序也会在几分钟后停止响应(响应时间远高于 50 秒) )。我正在使用 Go 本机net/http包和gorilla/mux路由器。关于这个问题的其他问题说要关闭响应正文,但我已经使用了


        req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))

        req.Header.Set("Content-Type", "application/json")

        req.Header.Set("Connection", "Keep-Alive")

        response, err := common.Client.Do(req)

        status := 0

        if err != nil {//handle and return}

        defer response.Body.Close() //used with/without io.Copy

        status = response.StatusCode

        body, _ := ioutil.ReadAll(response.Body)

        _, err = io.Copy(ioutil.Discard, response.Body)

我需要重用连接,因此我已经在 init 方法中初始化了 http 客户端和传输全局变量,如下所示。


    common.Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration(300) * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    common.Client = &http.Client{

        Timeout:   time.Duration(300) * time.Millisecond,

        Transport: common.Transport,

    }

我读过使用 keep-alive 会导致内存泄漏,我尝试了一些组合来根据请求禁用 keep-alive/close 请求标志。但似乎没有任何作用。另外,如果我不进行任何 http 调用并time.Sleep(300 * time.Millisecond)在同时调用每个 url 的 goroutine 中使用,则应用程序可以正常工作而不会出现任何泄漏。所以我确信这与 client/http 包在高负载下连接未释放或未正确使用有关。


我应该采取什么方法来实现这一目标?创建自定义服务器和自定义处理程序类型来接受请求和路由请求是否会像几篇文章中的 C10K 方法中提到的那样工作?如果需要,我可以分享包含所有详细信息的示例代码。上面只是补充了我觉得问题所在的部分。


森栏
浏览 172回答 2
2回答

慕无忌1623718

这段代码没有泄露。为了演示,让我们稍微更新一下它,以便该帖子可以重现。主程序package mainimport (&nbsp; &nbsp; "bytes"&nbsp; &nbsp; "crypto/tls"&nbsp; &nbsp; _ "expvar"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "io"&nbsp; &nbsp; "io/ioutil"&nbsp; &nbsp; "log"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "net"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; _ "net/http/pprof"&nbsp; &nbsp; "os"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "strconv"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/gorilla/mux")var (&nbsp; &nbsp; //http client&nbsp; &nbsp; Client *http.Client&nbsp; &nbsp; //http Transport&nbsp; &nbsp; Transport *http.Transport)func init() {&nbsp; &nbsp; go http.ListenAndServe("localhost:6060", nil)&nbsp; &nbsp; //Get Any command line argument passed&nbsp; &nbsp; args := os.Args[1:]&nbsp; &nbsp; numCPU := runtime.NumCPU()&nbsp; &nbsp; if len(args) > 1 {&nbsp; &nbsp; &nbsp; &nbsp; numCPU, _ = strconv.Atoi(args[0])&nbsp; &nbsp; }&nbsp; &nbsp; Transport = &http.Transport{&nbsp; &nbsp; &nbsp; &nbsp; TLSClientConfig: &tls.Config{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; InsecureSkipVerify: true,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; DialContext: (&net.Dialer{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Timeout: time.Duration() * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KeepAlive: 30 * time.Second,&nbsp; &nbsp; &nbsp; &nbsp; }).DialContext,&nbsp; &nbsp; &nbsp; &nbsp; //ForceAttemptHTTP2:&nbsp; &nbsp; &nbsp;true,&nbsp; &nbsp; &nbsp; &nbsp; DisableKeepAlives: false,&nbsp; &nbsp; &nbsp; &nbsp; //MaxIdleConns:&nbsp; &nbsp; &nbsp; 0,&nbsp; &nbsp; &nbsp; &nbsp; //IdleConnTimeout:&nbsp; &nbsp;0,&nbsp; &nbsp; &nbsp; &nbsp; //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; //ExpectContinueTimeout: 1 * time.Second,&nbsp; &nbsp; }&nbsp; &nbsp; Client = &http.Client{&nbsp; &nbsp; &nbsp; &nbsp; // Timeout:&nbsp; &nbsp;time.Duration(300) * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; Transport: Transport,&nbsp; &nbsp; }&nbsp; &nbsp; runtime.GOMAXPROCS(numCPU)&nbsp; &nbsp; rand.Seed(time.Now().UTC().UnixNano())}func main() {&nbsp; &nbsp; router := mux.NewRouter().StrictSlash(true)&nbsp; &nbsp; router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; &nbsp; &nbsp; _, _ = fmt.Fprintf(w, "Hello!!!")&nbsp; &nbsp; })&nbsp; &nbsp; router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; &nbsp; &nbsp; vars := mux.Vars(r)&nbsp; &nbsp; &nbsp; &nbsp; prepareRequest(w, r, vars["name"])&nbsp; &nbsp; }).Methods("POST", "GET")&nbsp; &nbsp; // Register pprof handlers&nbsp; &nbsp; // router.HandleFunc("/debug/pprof/", pprof.Index)&nbsp; &nbsp; // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)&nbsp; &nbsp; // router.HandleFunc("/debug/pprof/profile", pprof.Profile)&nbsp; &nbsp; // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)&nbsp; &nbsp; // router.HandleFunc("/debug/pprof/trace", pprof.Trace)&nbsp; &nbsp; routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")&nbsp; &nbsp; srv := &http.Server{&nbsp; &nbsp; &nbsp; &nbsp; Addr: "localhost:8080",&nbsp; &nbsp; &nbsp; &nbsp; /*ReadTimeout:&nbsp; 500 * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WriteTimeout: 500 * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IdleTimeout:&nbsp; 10 * time.Second,*/&nbsp; &nbsp; &nbsp; &nbsp; Handler: routerMiddleWare,&nbsp; &nbsp; }&nbsp; &nbsp; log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {&nbsp; &nbsp; // go func() {&nbsp; &nbsp; //&nbsp; make(chan []byte) <- make([]byte, 10024)&nbsp; &nbsp; // }()&nbsp; &nbsp; //other part of the code and call to goroutine&nbsp; &nbsp; var urls []string&nbsp; &nbsp; urls = append(urls,&nbsp; &nbsp; &nbsp; &nbsp; "http://localhost:7000/",&nbsp; &nbsp; &nbsp; &nbsp; "http://localhost:7000/",&nbsp; &nbsp; )&nbsp; &nbsp; results, s, c := callUrls(urls)&nbsp; &nbsp; finalCall(w, results, s, c)}type Response struct {&nbsp; &nbsp; Status int&nbsp; &nbsp; Url&nbsp; &nbsp; string&nbsp; &nbsp; Body&nbsp; &nbsp;string}func callUrls(urls []string) ([]*Response, []string, []string) {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(len(urls))&nbsp; &nbsp; ch := make(chan func() (*Response, string, string), len(urls))&nbsp; &nbsp; for _, url := range urls {&nbsp; &nbsp; &nbsp; &nbsp; go func(url string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //decide if request is valid for client to make http call using country/os&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; isValid := true //assuming url to be called&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if isValid {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //make post call&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //request body have many more paramter, just sample included.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.Set("Content-Type", "application/json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.Set("Connection", "Keep-Alive")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //req.Close = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response, err := Client.Do(req)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer response.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; body, _ := ioutil.ReadAll(response.Body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; io.Copy(ioutil.Discard, response.Body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Close the body, forced this&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Also tried without defer, and only wothout following line&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //do something with response body replace a few string etc.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //and return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(url)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; results []*Response&nbsp; &nbsp; &nbsp; &nbsp; msg&nbsp; &nbsp; &nbsp;[]string&nbsp; &nbsp; &nbsp; &nbsp; status&nbsp; []string&nbsp; &nbsp; )&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; r, x, y := (<-ch)()&nbsp; &nbsp; &nbsp; &nbsp; if r != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results = append(results, r)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg = append(msg, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = append(status, y)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if len(results) == len(urls) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return results, msg, status&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {&nbsp; &nbsp; fmt.Println("response", "response body", results, msg, status)}k/main.gopackage mainimport "net/http"func main() {&nbsp; &nbsp; y := make([]byte, 100)&nbsp; &nbsp; http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; &nbsp; &nbsp; w.WriteHeader(http.StatusOK)&nbsp; &nbsp; &nbsp; &nbsp; w.Write(y)&nbsp; &nbsp; })&nbsp; &nbsp; http.ListenAndServe(":7000", nil)}安装额外的可视化工具,并用来ab模拟一些负载,它就可以完成直观的演示。go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms此时您会读取 的输出expvarmon,如果它是实时的,您会看到类似的内容你可以看到那些东西在挥舞,GC 正在积极工作。应用程序已加载,内存正在消耗,等待服务器释放其 conn 并等待 gc 清理它们您可以看到memstats.Alloc,&nbsp;memstats.HeapAlloc,memstats.HeapInuse现在减少了,正如 GC 完成其工作时所预期的那样,并且不存在泄漏。如果你要在运行go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap后立即检查ab它表明该应用程序正在使用177Mb内存。其中大部分102Mb正在被使用net/http.Transport.getConn。你的经纪人正在负责1Mb,剩下的就是需要做的各种事情。如果您在服务器和 gc 发布后截取屏幕截图,您会看到一个更小的图表。这里就不演示了。现在让我们生成一个泄漏并再次使用这两个工具查看它。在代码中取消注释,func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; make(chan []byte) <- make([]byte, 10024)&nbsp; &nbsp; }()//...重新启动应用程序(按q,expvarmon但不是必需的)go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms表明您expvarmon可以看到相同的行为,只有数字发生了变化,并且在静止状态下,在被 gced 后,它仍然消耗大量内存,比作为比较点的 void golang http 服务器要多得多。再次,截取堆的屏幕截图,它显示您的处理程序现在正在消耗大部分内存 ~&nbsp;450Mb,注意箭头,它显示有 for452mb分配10kb和4.50Mbof&nbsp;96b。它们分别对应于[]byte被推送到的切片chan []byte。最后,您可以检查堆栈跟踪以查找死 goroutine,从而泄漏内存,打开http://localhost:6060/debug/pprof/goroutine?debug=1goroutine profile: total 5001250000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281#&nbsp; &nbsp;0x76b85c&nbsp; &nbsp; main.prepareRequest.func1+0x4c&nbsp; /home/mh-cbon/gow/src/test/oom/main.go:1014 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281#&nbsp; &nbsp;0x42b685&nbsp; &nbsp; internal/poll.runtime_pollWait+0x55&nbsp; &nbsp; &nbsp;/home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182#&nbsp; &nbsp;0x4c3a3a&nbsp; &nbsp; internal/poll.(*pollDesc).wait+0x9a&nbsp; &nbsp; &nbsp;/home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87// more...它告诉我们程序正在托管goroutine,然后它按文件位置分组列出它们,其中第一个数字是本示例第一组中50 012正在运行的实例的计数。50 000接下来是导致 goroutine 存在的堆栈跟踪。您可以看到有很多系统问题,就您的情况而言,您不必太担心。您必须寻找那些您认为如果您的程序按您认为应该的方式运行则不应运行的程序。然而,总体而言,您的代码并不令人满意,并且可以并且可能应该通过对其分配和总体设计概念进行彻底审查来改进。** 这是对原始源代码所做的更改的摘要。它添加了一个新程序k/main.go来充当后端服务器。它添加了_ "expvar"导入语句init它启动 pprof 在阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)禁用客户端超时Timeout: &nbsp; time.Duration(300) * time.Millisecond,,否则负载测试不会返回200s服务器地址设置为Addr: "localhost:8080",urls其中创建的值设置prepareRequest为 len=2 的静态列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它添加了对{"body":"param"}的错误检查)))它禁用错误检查io.Copy(ioutil.Discard, response.Body)

largeQ

我已经通过用 替换net/httppackage解决了这个问题fasthttp。早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我发现DoTimeoutfasthttp 客户端确实有一种方法,可以在指定的持续时间后使请求超时。这里是更新的代码:在vars.go中&nbsp;ClientFastHttp *fasthttp.Client主程序package mainimport (&nbsp; &nbsp; "./common"&nbsp; &nbsp; "crypto/tls"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "github.com/gorilla/mux"&nbsp; &nbsp; "github.com/valyala/fasthttp"&nbsp; &nbsp; "log"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "net"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "net/http/pprof"&nbsp; &nbsp; "os"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "strconv"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func init() {&nbsp; &nbsp; //Get Any command line argument passed&nbsp; &nbsp; args := os.Args[1:]&nbsp; &nbsp; numCPU := runtime.NumCPU()&nbsp; &nbsp; if len(args) > 1 {&nbsp; &nbsp; &nbsp; &nbsp; numCPU, _ = strconv.Atoi(args[0])&nbsp; &nbsp; }&nbsp; &nbsp; common.Transport = &http.Transport{&nbsp; &nbsp; &nbsp; &nbsp; TLSClientConfig: &tls.Config{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; InsecureSkipVerify: true,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; DialContext: (&net.Dialer{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Timeout: time.Duration() * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KeepAlive: 30 * time.Second,&nbsp; &nbsp; &nbsp; &nbsp; }).DialContext,&nbsp; &nbsp; &nbsp; &nbsp; //ForceAttemptHTTP2:&nbsp; &nbsp; &nbsp;true,&nbsp; &nbsp; &nbsp; &nbsp; DisableKeepAlives: false,&nbsp; &nbsp; &nbsp; &nbsp; //MaxIdleConns:&nbsp; &nbsp; &nbsp; 0,&nbsp; &nbsp; &nbsp; &nbsp; //IdleConnTimeout:&nbsp; &nbsp;0,&nbsp; &nbsp; &nbsp; &nbsp; //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; //ExpectContinueTimeout: 1 * time.Second,&nbsp; &nbsp; }&nbsp; &nbsp; common.Client = &http.Client{&nbsp; &nbsp; &nbsp; &nbsp; Timeout:&nbsp; &nbsp;time.Duration(300) * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; Transport: common.Transport,&nbsp; &nbsp; }&nbsp; &nbsp; runtime.GOMAXPROCS(numCPU)&nbsp; &nbsp; rand.Seed(time.Now().UTC().UnixNano())}func main() {&nbsp; &nbsp; router := mux.NewRouter().StrictSlash(true)&nbsp; &nbsp; router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; &nbsp; &nbsp; _, _ = fmt.Fprintf(w, "Hello!!!")&nbsp; &nbsp; })&nbsp; &nbsp; router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; &nbsp; &nbsp; vars := mux.Vars(r)&nbsp; &nbsp; &nbsp; &nbsp; prepareRequest(w, r, vars["name"])&nbsp; &nbsp; }).Methods("POST")&nbsp; &nbsp; // Register pprof handlers&nbsp; &nbsp; router.HandleFunc("/debug/pprof/", pprof.Index)&nbsp; &nbsp; router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)&nbsp; &nbsp; router.HandleFunc("/debug/pprof/profile", pprof.Profile)&nbsp; &nbsp; router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)&nbsp; &nbsp; router.HandleFunc("/debug/pprof/trace", pprof.Trace)&nbsp; &nbsp; routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")&nbsp; &nbsp; srv := &http.Server{&nbsp; &nbsp; &nbsp; &nbsp; Addr: "0.0.0.0:" + "80",&nbsp; &nbsp; &nbsp; &nbsp; /*ReadTimeout:&nbsp; 500 * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; WriteTimeout: 500 * time.Millisecond,&nbsp; &nbsp; &nbsp; &nbsp; IdleTimeout:&nbsp; 10 * time.Second,*/&nbsp; &nbsp; &nbsp; &nbsp; Handler: routerMiddleWare,&nbsp; &nbsp; }&nbsp; &nbsp; log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {&nbsp; &nbsp; //other part of the code and call to goroutine&nbsp; &nbsp; var urls []string&nbsp; &nbsp; results, s, c := callUrls(urls)&nbsp; &nbsp; finalCall(w, results, s, c)}type Response struct {&nbsp; &nbsp; Status int&nbsp; &nbsp; Url&nbsp; &nbsp; string&nbsp; &nbsp; Body&nbsp; &nbsp;string}func callUrls(urls []string) ([]*Response, []string, []string) {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(len(urls))&nbsp; &nbsp; ch := make(chan func() (*Response, string, string), len(urls))&nbsp; &nbsp; for _, url := range urls {&nbsp; &nbsp; &nbsp; &nbsp; go func(url string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //decide if request is valid for client to make http call using country/os&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; isValid := true //assuming url to be called&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if isValid {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //make post call&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //request body have many more paramter, just sample included.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req := fasthttp.AcquireRequest()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.SetRequestURI(url)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.Set("Content-Type", "application/json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.Set("Connection", "Keep-Alive")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.Header.SetMethod("POST")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.SetBody([]byte(`{"body":"param"}`))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resp := fasthttp.AcquireResponse()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer fasthttp.ReleaseRequest(req)&nbsp; &nbsp;// <- do not forget to release&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer fasthttp.ReleaseResponse(resp) // <- do not forget to release&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //err := clientFastHttp.Do(req, response)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //endregion&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; t := time.Duration(300)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; body := resp.Body()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 500, Url: url, Body: ""}, "error", "500"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /*defer response.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; body, _ := ioutil.ReadAll(response.Body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _, err = io.Copy(ioutil.Discard, response.Body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Close the body, forced this&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //Also tried without defer, and only wothout following line&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response.Body.Close()*/&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //do something with response body replace a few string etc.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //and return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- func() (*Response, string, string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(url)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; results []*Response&nbsp; &nbsp; &nbsp; &nbsp; msg&nbsp; &nbsp; &nbsp;[]string&nbsp; &nbsp; &nbsp; &nbsp; status&nbsp; []string&nbsp; &nbsp; )&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; r, x, y := (<-ch)()&nbsp; &nbsp; &nbsp; &nbsp; if r != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results = append(results, r)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg = append(msg, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status = append(status, y)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if len(results) == len(urls) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return results, msg, status&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {&nbsp; &nbsp; fmt.Println("response", "response body", results, msg, status)}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go