慕标5832272
此代码没有泄漏。为了演示,让我们稍微更新一下**,这样帖子就可以重现了。main.gopackage mainimport ( "bytes" "crypto/tls" _ "expvar" "fmt" "io" "io/ioutil" "log" "math/rand" "net" "net/http" _ "net/http/pprof" "os" "runtime" "strconv" "sync" "time" "github.com/gorilla/mux")var ( //http client Client *http.Client //http Transport Transport *http.Transport)func init() { go http.ListenAndServe("localhost:6060", nil) //Get Any command line argument passed args := os.Args[1:] numCPU := runtime.NumCPU() if len(args) > 1 { numCPU, _ = strconv.Atoi(args[0]) } Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, DialContext: (&net.Dialer{ //Timeout: time.Duration() * time.Millisecond, KeepAlive: 30 * time.Second, }).DialContext, //ForceAttemptHTTP2: true, DisableKeepAlives: false, //MaxIdleConns: 0, //IdleConnTimeout: 0, //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond, //ExpectContinueTimeout: 1 * time.Second, } Client = &http.Client{ // Timeout: time.Duration(300) * time.Millisecond, Transport: Transport, } runtime.GOMAXPROCS(numCPU) rand.Seed(time.Now().UTC().UnixNano())}func main() { router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = fmt.Fprintf(w, "Hello!!!") }) router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) prepareRequest(w, r, vars["name"]) }).Methods("POST", "GET") // Register pprof handlers // router.HandleFunc("/debug/pprof/", pprof.Index) // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) // router.HandleFunc("/debug/pprof/profile", pprof.Profile) // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) // router.HandleFunc("/debug/pprof/trace", pprof.Trace) routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout") srv := &http.Server{ Addr: "localhost:8080", /*ReadTimeout: 500 * time.Millisecond, WriteTimeout: 500 * time.Millisecond, IdleTimeout: 10 * time.Second,*/ Handler: routerMiddleWare, } log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { // go func() { // make(chan []byte) <- make([]byte, 10024) // }() //other part of the code and call to goroutine var urls []string urls = append(urls, "http://localhost:7000/", "http://localhost:7000/", ) results, s, c := callUrls(urls) finalCall(w, results, s, c)}type Response struct { Status int Url string Body string}func callUrls(urls []string) ([]*Response, []string, []string) { var wg sync.WaitGroup wg.Add(len(urls)) ch := make(chan func() (*Response, string, string), len(urls)) for _, url := range urls { go func(url string) { //decide if request is valid for client to make http call using country/os isValid := true //assuming url to be called if isValid { //make post call //request body have many more paramter, just sample included. //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak. req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`))) if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500" } return } req.Header.Set("Content-Type", "application/json") req.Header.Set("Connection", "Keep-Alive") //req.Close = true response, err := Client.Do(req) if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500" } return } defer response.Body.Close() body, _ := ioutil.ReadAll(response.Body) io.Copy(ioutil.Discard, response.Body) //Close the body, forced this //Also tried without defer, and only wothout following line response.Body.Close() //do something with response body replace a few string etc. //and return wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200" } } else { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500" } } }(url) } wg.Wait() var ( results []*Response msg []string status []string ) for { r, x, y := (<-ch)() if r != nil { results = append(results, r) msg = append(msg, x) status = append(status, y) } if len(results) == len(urls) { return results, msg, status } }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) { fmt.Println("response", "response body", results, msg, status)}k/main.gopackage mainimport "net/http"func main() { y := make([]byte, 100) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(y) }) http.ListenAndServe(":7000", nil)}安装额外的可视化工具,并用于ab模拟一些负载,它将完成直观演示的工作。go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms那时你阅读了 的输出expvarmon,如果它是现场的,你有类似的东西你可以看到东西在挥动,gc 正在积极工作。应用程序已加载,正在消耗内存,等待服务器释放其 conn 和 gc 清理它们您可以看到memstats.Alloc, memstats.HeapAlloc,memstats.HeapInuse现在减少了,正如 gc 完成工作并且不存在泄漏时所预期的那样。如果你要检查go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap,就在ab跑之后它表明该应用程序正在使用177Mb内存。其中大部分102Mb被net/http.Transport.getConn.您的处理程序正在计算1Mb,其余的是需要的各种东西。如果您要在服务器发布和 gc 之后截取屏幕截图,您会看到一个更小的图表。这里不演示。现在让我们生成一个泄漏并再次使用这两个工具查看它。在代码中取消注释,func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { go func() { make(chan []byte) <- make([]byte, 10024) }()//...重新启动应用程序(按q,expvarmon虽然不是必需的)go get -u github.com/divan/expvarmongo run main.go &go run k/main.go &ab -n 50000 -c 2500 http://localhost:8080/y# in a different window, for live previewexpvarmon -ports=6060 -i 500ms表明在expvarmon你可以看到相同的行为,只是数字发生了变化,而在静止状态,它被 gced 后,它仍然消耗了大量的内存,比一个 void golang http server 拿一个比较点要多得多。同样,对堆进行截图,它显示您的处理程序现在正在消耗大部分内存 ~ 450Mb,注意箭头,它显示有 for 452mbof10kb分配和4.50Mbof 96b。它们分别对应于[]byte被推送到chan []byte.最后,您可以检查堆栈跟踪以查找死的 goroutine,从而导致内存泄漏,打开http://localhost:6060/debug/pprof/goroutine?debug=1goroutine profile: total 5001250000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281# 0x76b85c main.prepareRequest.func1+0x4c /home/mh-cbon/gow/src/test/oom/main.go:1014 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281# 0x42b685 internal/poll.runtime_pollWait+0x55 /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182# 0x4c3a3a internal/poll.(*pollDesc).wait+0x9a /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87// more...它告诉我们程序正在托管50 012goroutine,然后它按文件位置分组列出它们,其中第一个数字是运行实例的计数,50 000在本示例的第一组中。紧随其后的是导致 goroutine 存在的堆栈跟踪。你可以看到有一堆系统的东西,在你的情况下,你不应该太担心它。如果你的程序按你认为的那样工作,你必须寻找那些你认为不应该存在的人。但是,总体而言,您的代码并不令人满意,并且可能并且可能应该通过对其分配和整体设计概念的彻底审查来改进。** 这是对原始源代码所做更改的摘要。它添加了一个新程序k/main.go来充当后端服务器。它添加了_ "expvar"导入语句它启动 pprof 在init阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)禁用客户端超时Timeout: time.Duration(300) * time.Millisecond,,否则负载测试不返回 200s服务器地址设置为Addr: "localhost:8080",urls在其中创建的值prepareRequest设置为 len=2 的静态列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它为{"body":"param"}添加了错误检查)))它禁用错误检查io.Copy(ioutil.Discard, response.Body)
精慕HU
我已经通过将net/http包替换为fasthttp. 早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我看到确实有一种DoTimeout用于 fasthttp 客户端的方法,它在指定的持续时间后使请求超时。这里更新的代码:在vars.go中 ClientFastHttp *fasthttp.Clientmain.gopackage mainimport ( "./common" "crypto/tls" "fmt" "github.com/gorilla/mux" "github.com/valyala/fasthttp" "log" "math/rand" "net" "net/http" "net/http/pprof" "os" "runtime" "strconv" "sync" "time")func init() { //Get Any command line argument passed args := os.Args[1:] numCPU := runtime.NumCPU() if len(args) > 1 { numCPU, _ = strconv.Atoi(args[0]) } common.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, DialContext: (&net.Dialer{ //Timeout: time.Duration() * time.Millisecond, KeepAlive: 30 * time.Second, }).DialContext, //ForceAttemptHTTP2: true, DisableKeepAlives: false, //MaxIdleConns: 0, //IdleConnTimeout: 0, //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond, //ExpectContinueTimeout: 1 * time.Second, } common.Client = &http.Client{ Timeout: time.Duration(300) * time.Millisecond, Transport: common.Transport, } runtime.GOMAXPROCS(numCPU) rand.Seed(time.Now().UTC().UnixNano())}func main() { router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = fmt.Fprintf(w, "Hello!!!") }) router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) prepareRequest(w, r, vars["name"]) }).Methods("POST") // Register pprof handlers router.HandleFunc("/debug/pprof/", pprof.Index) router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) router.HandleFunc("/debug/pprof/profile", pprof.Profile) router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) router.HandleFunc("/debug/pprof/trace", pprof.Trace) routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout") srv := &http.Server{ Addr: "0.0.0.0:" + "80", /*ReadTimeout: 500 * time.Millisecond, WriteTimeout: 500 * time.Millisecond, IdleTimeout: 10 * time.Second,*/ Handler: routerMiddleWare, } log.Fatal(srv.ListenAndServe())}func prepareRequest(w http.ResponseWriter, r *http.Request, name string) { //other part of the code and call to goroutine var urls []string results, s, c := callUrls(urls) finalCall(w, results, s, c)}type Response struct { Status int Url string Body string}func callUrls(urls []string) ([]*Response, []string, []string) { var wg sync.WaitGroup wg.Add(len(urls)) ch := make(chan func() (*Response, string, string), len(urls)) for _, url := range urls { go func(url string) { //decide if request is valid for client to make http call using country/os isValid := true //assuming url to be called if isValid { //make post call //request body have many more paramter, just sample included. //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak. req := fasthttp.AcquireRequest() req.SetRequestURI(url) req.Header.Set("Content-Type", "application/json") req.Header.Set("Connection", "Keep-Alive") req.Header.SetMethod("POST") req.SetBody([]byte(`{"body":"param"}`)) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) // <- do not forget to release defer fasthttp.ReleaseResponse(resp) // <- do not forget to release //err := clientFastHttp.Do(req, response) //endregion t := time.Duration(300) err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond) body := resp.Body() if err != nil { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "error", "500" } return } /*defer response.Body.Close() body, _ := ioutil.ReadAll(response.Body) _, err = io.Copy(ioutil.Discard, response.Body) //Close the body, forced this //Also tried without defer, and only wothout following line response.Body.Close()*/ //do something with response body replace a few string etc. //and return wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200" } } else { wg.Done() ch <- func() (*Response, string, string) { return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500" } } }(url) } wg.Wait() var ( results []*Response msg []string status []string ) for { r, x, y := (<-ch)() if r != nil { results = append(results, r) msg = append(msg, x) status = append(status, y) } if len(results) == len(urls) { return results, msg, status } }}func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) { fmt.Println("response", "response body", results, msg, status)}