慕工程0101907
这种行为的原因在于 Go 的调度程序(这个问题的较短版本在 golang-nuts)。上面的 goroutines 都在同一时间点开始执行(如计时所示,加上检查 startTime 变量的内存位置证明时间对象不是“回收”),但是一旦它们命中 http.Get() 就会取消调度. 计时是递增的,因为 http.Get() 造成了瓶颈,不允许并发执行生成的 goroutine 数量。似乎这里使用了某种 FIFO 队列。推荐观看阅读:解释 Golang I/O 多路复用 netpoller 模型队列、公平性和 Go 调度程序研究等待组的大小,我发现一些值显示出更加一致的时间(而不是增量)。所以我想知道等待组大小对总时间和个人时间的影响是什么。我将上面重构为一个程序,该程序在给定范围内对每个 waitgroupsize 进行多次实验,并将每次运行的总计时和单独计时保存到 sqlite 数据库中。生成的数据集可以很容易地用于 Jupyter Notebook 等。不幸的是,在当前设置下,我只能获得大约 40K 的请求,然后才会受到限制。看我的github对于某些数据集,如果您有兴趣但不想等待数据,因为它需要很长时间才能完成。有趣的结果是,对于小型 wg 大小,并发/顺序比率急剧下降,您会在最后看到连接开始受到限制。该运行当时被手动中止。并发运行时间/顺序运行时间与等待组大小:不同等待组大小的个别时间图。package mainimport ( "database/sql" "fmt" "log" "net/http" "os" "path/filepath" "runtime" "sync" "time" _ "github.com/mattn/go-sqlite3")///// global varsconst REQUESTS int = 100 // Single run size, performed two times (concurrent and sequential)const URL string = "SET_YOUR_OWN" // Some file on a CDN somewhere; used for the GET requestsconst DBNAME string = "netRand.db" // Name of the db file. Saved next to the executableconst WGMIN int = 1 // Start range for waitgroup size (inclusive)const WGMAX int = 101 // Stop range for waitgroup size (exclusive)const NREPEAT int = 10 // Number of times to repeat a run for a specific waitgroup size//// typestype timingResult struct { // Container for collecting results before persisting to DB WaitgroupSize int ConcurrentTimingsMs [REQUESTS]int64 ConcurrentTotalMs int64 SequentialTimingsMs [REQUESTS]int64 SequentialTotalMs int64}//// mainfunc main() { db := setupDb() defer db.Close() for i := WGMIN; i < WGMAX; i++ { // waitgroup size range for j := 0; j < NREPEAT; j++ { // repeat for more data points timings := requestTimes(i) persistTimings(timings, db) fmt.Printf("\n======== %v of %v ============\n", j+1, NREPEAT) fmt.Printf("current waitgroup size: %v\n", i) fmt.Printf("max waitgroup size: %v\n", WGMAX-1) } }}func requestTimes(waitgroupSize int) timingResult { // do NTIMES requests in go routines with waitgroupSize // do NTIMES requests sequentially timings_concurrent, total_concurrent := concurrentRequests(waitgroupSize) timings_sequential, total_sequential := sequentialRequests() return timingResult{ WaitgroupSize: waitgroupSize, ConcurrentTimingsMs: timings_concurrent, ConcurrentTotalMs: total_concurrent, SequentialTimingsMs: timings_sequential, SequentialTotalMs: total_sequential, }}func persistTimings(timings timingResult, db *sql.DB) { persistRun(timings, db) currentRunId := getCurrentRunId(db) persistConcurrentTimings(currentRunId, timings, db) persistSequentialTimings(currentRunId, timings, db)}func concurrentRequests(waitgroupSize int) ([REQUESTS]int64, int64) { start := time.Now() var wg sync.WaitGroup var timings [REQUESTS]int64 ch := make(chan int64, REQUESTS) for i := range timings { wg.Add(1) go func() { defer wg.Done() doGetChannel(URL, ch) }() // waitgroupsize is controlled using modulo // making sure experiment size is always NTIMES // independent of waitgroupsize if i%waitgroupSize == 0 { wg.Wait() } } wg.Wait() close(ch) count := 0 for ret := range ch { timings[count] = ret count++ } return timings, time.Since(start).Milliseconds()}func doGetChannel(address string, channel chan int64) { // time get request and send to channel startSub := time.Now().UnixMilli() _, err := http.Get(address) if err != nil { log.Fatalln(err) } stopSub := time.Now().UnixMilli() delta := stopSub - startSub channel <- delta}func sequentialRequests() ([REQUESTS]int64, int64) { startGo := time.Now() var timings_sequential [REQUESTS]int64 for i := range timings_sequential { timings_sequential[i] = doGetReturn(URL) } return timings_sequential, time.Since(startGo).Milliseconds()}func doGetReturn(address string) int64 { // time get request without a waitgroup/channel start := time.Now() _, err := http.Get(address) if err != nil { log.Fatalln(err) } duration := time.Since(start).Milliseconds() return duration}//// DBfunc setupDb() *sql.DB { // __________________________runs____________________ // | | // concurrent_timings(fk: run_id) sequential_timings(fk: run_id) // const createRuns string = ` CREATE TABLE IF NOT EXISTS runs ( run_id INTEGER NOT NULL PRIMARY KEY, time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, waitgroup_size INTEGER, concurrent_total_ms INTEGER, sequential_total_ms INTEGER, concurrent_sequential_ratio REAL );` const createSequentialTimings string = ` CREATE TABLE IF NOT EXISTS sequential_timings ( run INTEGER, call_number INTEGER, timing_ms INTEGER, FOREIGN KEY(run) REFERENCES runs(run_id) );` const createConcurrentTimings string = ` CREATE TABLE IF NOT EXISTS concurrent_timings ( run INTEGER, channel_position INTEGER, timing_ms INTEGER, FOREIGN KEY(run) REFERENCES runs(run_id) );` // retrieve platform appropriate connection string dbString := getConnectionString(DBNAME) db, err := sql.Open("sqlite3", dbString) if err != nil { log.Fatalln(err) } if _, err := db.Exec(createRuns); err != nil { log.Fatalln(err) } if _, err := db.Exec(createSequentialTimings); err != nil { log.Fatalln(err) } if _, err := db.Exec(createConcurrentTimings); err != nil { log.Fatalln(err) } return db}func getConnectionString(dbName string) string { // Generate platform appropriate connection string // the db is placed in the same directory as the current executable // retrieve the path to the currently executed executable ex, err := os.Executable() if err != nil { panic(err) } // retrieve path to containing dir dbDir := filepath.Dir(ex) // Append platform appropriate separator and dbName if runtime.GOOS == "windows" { dbDir = dbDir + "\\" + dbName } else { dbDir = dbDir + "/" + dbName } return dbDir}func persistRun(timings timingResult, db *sql.DB) { tx, err := db.Begin() if err != nil { log.Fatalln(err) } insertRun, err := db.Prepare(`INSERT INTO runs( waitgroup_size, sequential_total_ms, concurrent_total_ms, concurrent_sequential_ratio) VALUES(?, ?, ?, ?)`) if err != nil { log.Fatalln(err) } defer tx.Stmt(insertRun).Close() _, err = tx.Stmt(insertRun).Exec( timings.WaitgroupSize, timings.SequentialTotalMs, timings.ConcurrentTotalMs, float32(timings.ConcurrentTotalMs)/float32(timings.SequentialTotalMs), ) if err != nil { log.Fatalln(err) } err = tx.Commit() if err != nil { log.Fatalln(err) }}func getCurrentRunId(db *sql.DB) int { rows, err := db.Query("SELECT MAX(run_id) FROM runs") if err != nil { log.Fatal(err) } var run_id int for rows.Next() { err = rows.Scan(&run_id) if err != nil { log.Fatalln(err) } } rows.Close() return run_id}func persistConcurrentTimings(runId int, timings timingResult, db *sql.DB) { tx, err := db.Begin() if err != nil { log.Fatalln(err) } insertTiming, err := db.Prepare(`INSERT INTO concurrent_timings( run, channel_position, timing_ms) VALUES(?, ?, ?)`) if err != nil { log.Fatalln(err) } for i, timing := range timings.ConcurrentTimingsMs { _, err = tx.Stmt(insertTiming).Exec( runId, i, timing, ) if err != nil { log.Fatalln(err) } } err = tx.Commit() if err != nil { log.Fatalln(err) }}func persistSequentialTimings(runId int, timings timingResult, db *sql.DB) { tx, err := db.Begin() if err != nil { log.Fatalln(err) } insertTiming, err := db.Prepare(`INSERT INTO sequential_timings( run, call_number, timing_ms) VALUES(?, ?, ?)`) if err != nil { log.Fatalln(err) } for i, timing := range timings.SequentialTimingsMs { _, err = tx.Stmt(insertTiming).Exec( runId, i, timing, ) if err != nil { log.Fatalln(err) } } err = tx.Commit() if err != nil { log.Fatalln(err) }}