慕无忌1623718
这段代码没有泄露。为了演示,让我们稍微更新一下它,以便该帖子可以重现。主程序package mainimport ( "bytes" "crypto/tls" _ "expvar" "fmt" "io" "io/ioutil" "log" "math/rand" "net" "net/http" _ "net/http/pprof" "os" "runtime" "strconv" "sync" "time" "github.com/gorilla/mux")var ( //http client Client *http.Client //http Transport Transport *http.Transport)func init() { go http.ListenAndServe("localhost:6060", nil) //Get Any command line argument passed args := os.Args[1:] numCPU := runtime.NumCPU() if len(args) > 1 { numCPU, _ = strconv.Atoi(args[0]) } Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, DialContext: (&net.Dialer{ //Timeout: time.Duration() * time.Millisecond, KeepAlive: 30 * time.Second, }).DialContext, //ForceAttemptHTTP2: true, DisableKeepAlives: false, //MaxIdleConns: 0, //IdleConnTimeout: 0, //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond, //ExpectContinueTimeout: 1 * time.Second, } Client = &http.Client{ // Timeout: time.Duration(300) * time.Millisecond, Transport: Transport, } runtime.GOMAXPROCS(numCPU) rand.Seed(time.Now().UTC().UnixNano())}func main() { router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = fmt.Fprintf(w, "Hello!!!") }) router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) prepareRequest(w, r, vars["name"]) }).Methods("POST", "GET") // Register pprof handlers // router.HandleFunc("/debug/pprof/", pprof.Index) // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) // router.HandleFunc("/debug/pprof/profile", pprof.Profile) // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) // router.HandleFunc("/debug/pprof/trace", pprof.Trace) routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout") srv := &http.Server{ Addr: "localhost:8080", /*ReadTimeout: 500 * time.Millisecond, WriteTimeout: 500 * time.Millisecond, IdleTimeout: 10 * time.Second,*/ Handler: routerMiddleWare, } log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { // go func() { // make(chan []byte) <- make([]byte, 10024) // }() //other part of the code and call to goroutine var urls []string urls = append(urls, "http://localhost:7000/", "http://localhost:7000/", ) results, s, c := callUrls(urls) finalCall(w, results, s, c)}type Response struct { Status int Url string Body string}func callUrls(urls []string) ([]*Response, []string, []string) { var wg sync.WaitGroup wg.Add(len(urls)) ch := make(chan func() (*Response, string, string), len(urls)) for _, url := range urls { go func(url string) { //decide if request is valid for client to make http call using country/os isValid := true //assuming url to be called if isValid { //make post call //request body have many more paramter, just sample included. //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak. req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`))) if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500" } return } req.Header.Set("Content-Type", "application/json") req.Header.Set("Connection", "Keep-Alive") //req.Close = true response, err := Client.Do(req) if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500" } return } defer response.Body.Close() body, _ := ioutil.ReadAll(response.Body) io.Copy(ioutil.Discard, response.Body) //Close the body, forced this //Also tried without defer, and only wothout following line response.Body.Close() //do something with response body replace a few string etc. //and return wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200" } } else { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500" } } }(url) } wg.Wait() var ( results []*Response msg []string status []string ) for { r, x, y := (<-ch)() if r != nil { results = append(results, r) msg = append(msg, x) status = append(status, y) } if len(results) == len(urls) { return results, msg, status } }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) { fmt.Println("response", "response body", results, msg, status)}k/main.gopackage mainimport "net/http"func main() { y := make([]byte, 100) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(y) }) http.ListenAndServe(":7000", nil)}安装额外的可视化工具,并用来ab模拟一些负载,它就可以完成直观的演示。go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms此时您会读取 的输出expvarmon,如果它是实时的,您会看到类似的内容你可以看到那些东西在挥舞,GC 正在积极工作。应用程序已加载,内存正在消耗,等待服务器释放其 conn 并等待 gc 清理它们您可以看到memstats.Alloc, memstats.HeapAlloc,memstats.HeapInuse现在减少了,正如 GC 完成其工作时所预期的那样,并且不存在泄漏。如果你要在运行go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap后立即检查ab它表明该应用程序正在使用177Mb内存。其中大部分102Mb正在被使用net/http.Transport.getConn。你的经纪人正在负责1Mb,剩下的就是需要做的各种事情。如果您在服务器和 gc 发布后截取屏幕截图,您会看到一个更小的图表。这里就不演示了。现在让我们生成一个泄漏并再次使用这两个工具查看它。在代码中取消注释,func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { go func() { make(chan []byte) <- make([]byte, 10024) }()//...重新启动应用程序(按q,expvarmon但不是必需的)go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms表明您expvarmon可以看到相同的行为,只有数字发生了变化,并且在静止状态下,在被 gced 后,它仍然消耗大量内存,比作为比较点的 void golang http 服务器要多得多。再次,截取堆的屏幕截图,它显示您的处理程序现在正在消耗大部分内存 ~ 450Mb,注意箭头,它显示有 for452mb分配10kb和4.50Mbof 96b。它们分别对应于[]byte被推送到的切片chan []byte。最后,您可以检查堆栈跟踪以查找死 goroutine,从而泄漏内存,打开http://localhost:6060/debug/pprof/goroutine?debug=1goroutine profile: total 5001250000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281# 0x76b85c main.prepareRequest.func1+0x4c /home/mh-cbon/gow/src/test/oom/main.go:1014 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281# 0x42b685 internal/poll.runtime_pollWait+0x55 /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182# 0x4c3a3a internal/poll.(*pollDesc).wait+0x9a /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87// more...它告诉我们程序正在托管goroutine,然后它按文件位置分组列出它们,其中第一个数字是本示例第一组中50 012正在运行的实例的计数。50 000接下来是导致 goroutine 存在的堆栈跟踪。您可以看到有很多系统问题,就您的情况而言,您不必太担心。您必须寻找那些您认为如果您的程序按您认为应该的方式运行则不应运行的程序。然而,总体而言,您的代码并不令人满意,并且可以并且可能应该通过对其分配和总体设计概念进行彻底审查来改进。** 这是对原始源代码所做的更改的摘要。它添加了一个新程序k/main.go来充当后端服务器。它添加了_ "expvar"导入语句init它启动 pprof 在阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)禁用客户端超时Timeout: time.Duration(300) * time.Millisecond,,否则负载测试不会返回200s服务器地址设置为Addr: "localhost:8080",urls其中创建的值设置prepareRequest为 len=2 的静态列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它添加了对{"body":"param"}的错误检查)))它禁用错误检查io.Copy(ioutil.Discard, response.Body)
largeQ
我已经通过用 替换net/httppackage解决了这个问题fasthttp。早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我发现DoTimeoutfasthttp 客户端确实有一种方法,可以在指定的持续时间后使请求超时。这里是更新的代码:在vars.go中 ClientFastHttp *fasthttp.Client主程序package mainimport ( "./common" "crypto/tls" "fmt" "github.com/gorilla/mux" "github.com/valyala/fasthttp" "log" "math/rand" "net" "net/http" "net/http/pprof" "os" "runtime" "strconv" "sync" "time")func init() { //Get Any command line argument passed args := os.Args[1:] numCPU := runtime.NumCPU() if len(args) > 1 { numCPU, _ = strconv.Atoi(args[0]) } common.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, DialContext: (&net.Dialer{ //Timeout: time.Duration() * time.Millisecond, KeepAlive: 30 * time.Second, }).DialContext, //ForceAttemptHTTP2: true, DisableKeepAlives: false, //MaxIdleConns: 0, //IdleConnTimeout: 0, //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond, //ExpectContinueTimeout: 1 * time.Second, } common.Client = &http.Client{ Timeout: time.Duration(300) * time.Millisecond, Transport: common.Transport, } runtime.GOMAXPROCS(numCPU) rand.Seed(time.Now().UTC().UnixNano())}func main() { router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = fmt.Fprintf(w, "Hello!!!") }) router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) prepareRequest(w, r, vars["name"]) }).Methods("POST") // Register pprof handlers router.HandleFunc("/debug/pprof/", pprof.Index) router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) router.HandleFunc("/debug/pprof/profile", pprof.Profile) router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) router.HandleFunc("/debug/pprof/trace", pprof.Trace) routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout") srv := &http.Server{ Addr: "0.0.0.0:" + "80", /*ReadTimeout: 500 * time.Millisecond, WriteTimeout: 500 * time.Millisecond, IdleTimeout: 10 * time.Second,*/ Handler: routerMiddleWare, } log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { //other part of the code and call to goroutine var urls []string results, s, c := callUrls(urls) finalCall(w, results, s, c)}type Response struct { Status int Url string Body string}func callUrls(urls []string) ([]*Response, []string, []string) { var wg sync.WaitGroup wg.Add(len(urls)) ch := make(chan func() (*Response, string, string), len(urls)) for _, url := range urls { go func(url string) { //decide if request is valid for client to make http call using country/os isValid := true //assuming url to be called if isValid { //make post call //request body have many more paramter, just sample included. //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak. req := fasthttp.AcquireRequest() req.SetRequestURI(url) req.Header.Set("Content-Type", "application/json") req.Header.Set("Connection", "Keep-Alive") req.Header.SetMethod("POST") req.SetBody([]byte(`{"body":"param"}`)) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) // <- do not forget to release defer fasthttp.ReleaseResponse(resp) // <- do not forget to release //err := clientFastHttp.Do(req, response) //endregion t := time.Duration(300) err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond) body := resp.Body() if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "error", "500" } return } /*defer response.Body.Close() body, _ := ioutil.ReadAll(response.Body) _, err = io.Copy(ioutil.Discard, response.Body) //Close the body, forced this //Also tried without defer, and only wothout following line response.Body.Close()*/ //do something with response body replace a few string etc. //and return wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200" } } else { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500" } } }(url) } wg.Wait() var ( results []*Response msg []string status []string ) for { r, x, y := (<-ch)() if r != nil { results = append(results, r) msg = append(msg, x) status = append(status, y) } if len(results) == len(urls) { return results, msg, status } }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) { fmt.Println("response", "response body", results, msg, status)}