我之前用过 goroutine 的 sync.WaitGroup,但是我想控制 goroutine 的并发,
所以我用并发限制编写我的等待组,例如:
package wglimit
import (
"sync"
)
// WaitGroupLimit ...
type WaitGroupLimit struct {
ch chan int
wg *sync.WaitGroup
}
// New ...
func New(size int) *WaitGroupLimit {
if size <= 0 {
size = 1
}
return &WaitGroupLimit{
ch: make(chan int, size), // buffer chan to limit concurrency
wg: &sync.WaitGroup{},
}
}
// Add ...
func (wgl *WaitGroupLimit) Add(delta int) {
for i := 0; i < delta; i++ {
wgl.ch <- 1
wgl.wg.Add(1)
}
}
// Done ...
func (wgl *WaitGroupLimit) Done() {
wgl.wg.Done()
<-wgl.ch
}
// Wait ...
func (wgl *WaitGroupLimit) Wait() {
close(wgl.ch)
wgl.wg.Wait()
}
然后我用它来控制 goroutine 并发,例如:
jobs := ["1", "2", "3", "4"] // some jobs
// wg := sync.WaitGroup{} // have no concurrency limit
wg := wglimit.New(2) // limit 2 goroutine
for _, job := range jobs {
wg.Add(1)
go func(job string) {
// job worker
defer wg.Done()
}(job)
}
wg.Wait()
运行时看起来像工作。
但测试失败:
package wglimit
import (
"runtime"
"testing"
"time"
)
func TestGoLimit(t *testing.T) {
var limit int = 5
wglimit := New(limit)
for i := 0; i < 10000; i++ {
wglimit.Add(1)
go func() {
defer wglimit.Done()
time.Sleep(time.Millisecond)
if runtime.NumGoroutine() > limit+2 {
println(runtime.NumGoroutine()) // will print 9 , cocurrent limit fail ?
t.Errorf("FAIL")
}
}()
}
wglimit.Wait()
}
测试时,goroutine 数量大于我的限制,似乎并流限制失败。
我的 WaitGroupLimit 代码有什么问题,为什么?
心有法竹
相关分类