控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Context。
1. 使用最基本通过channel通知实现并发控制
无缓冲通道
无缓冲的通道指的是通道的大小为0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine
和接收 goroutine
同时准备好,才可以完成发送和接收操作。
从上面无缓冲的通道定义来看,发送 goroutine
和接收 gouroutine
必须是同步的,同时准备后,如果没有同时准备好的话,先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。
正式通过无缓冲通道来实现多 goroutine
并发控制
func main() { ch := make(chan struct{}) go func() { fmt.Println("do something..") time.Sleep(time.Second * 1) ch <- struct{}{} }() <-ch fmt.Println("I am finished") }
当主 goroutine
运行到 <-ch
接受 channel
的值的时候,如果该 channel
中没有数据,就会一直阻塞等待,直到有值。 这样就可以简单实现并发控制
2. 通过sync包中的WaitGroup实现并发控制
在 sync
包中,提供了 WaitGroup
,它会等待它收集的所有 goroutine
任务全部完成。在WaitGroup里主要有三个方法
Add, 可以添加或减少 goroutine的数量
Done, 相当于Add(-1)
Wait, 执行后会堵塞主线程,直到WaitGroup 里的值减至0
在主 goroutine
中 Add(delta int)
索要等待goroutine
的数量。在每一个 goroutine
完成后 Done()
表示这一个goroutine
已经完成,当所有的 goroutine
都完成后,在主 goroutine
中 WaitGroup
返回返回。
func main(){ var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { wg.Add(1) go func(url string) { defer wg.Done() http.Get(url) }(url) } wg.Wait() }
但是在Golang官网中,有这么一句话
A WaitGroup must not be copied after first use.
翻译够来过来就是,在 WaitGroup
第一次使用后,不能被拷贝,因为会出现一下问题
func main() { wg := sync.WaitGroup{} for i := 0; i < 5; i++ { wg.Add(1) go func(wg sync.WaitGroup, i int) { log.Printf("i:%d", i) wg.Done() }(wg, i) } wg.Wait() log.Println("exit") }
运行结果如下
2009/11/10 23:00:00 i:4 2009/11/10 23:00:00 i:0 2009/11/10 23:00:00 i:1 2009/11/10 23:00:00 i:2 2009/11/10 23:00:00 i:3 fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0x1040a13c, 0x44bc) /usr/local/go/src/runtime/sema.go:47 +0x40 sync.(*WaitGroup).Wait(0x1040a130, 0x121460) /usr/local/go/src/sync/waitgroup.go:131 +0x80 main.main() /tmp/sandbox894380819/main.go:19 +0x120
它提示我所有的 goroutine
都已经睡眠了,出现了死锁。这是因为 wg
给拷贝传递到了 goroutine
中,导致只有 Add
操作,其实 Done
操作是在 wg
的副本执行的。因此 Wait
就死锁了。
改正方法一:
将匿名函数中wg
的传入类型改为*sync.WaitGrou
,这样就能引用到正确的WaitGroup了。改正方法二:
将匿名函数中的wg
的传入参数去掉,因为Go支持闭包类型,在匿名函数中可以直接使用外面的wg
变量
go 中五种引用类型有 slice, channel, function, map, interface
interface是Go语言中最成功的设计之一,空的interface可以被当作“鸭子”类型使用,它使得Go这样的静态语言拥有了一定的动态性,但却又不损失静态语言在类型安全方面拥有的编译时检查的优势。依赖于接口而不是实现,优先使用组合而不是继承,这是程序抽象的基本原则。但是长久以来以C++为代表的“面向对象”语言曲解了这些原则,让人们走入了误区。为什么要将方法和数据绑死?为什么要有多重继承这么变态的设计?面向对象中最强调的应该是对象间的消息传递,却为什么被演绎成了封装继承和多态。面向对象是否实现程序程序抽象的合理途径,又或者是因为它存在我们就认为它合理了。历史原因,中间出现了太多的错误。不管怎么样,Go的interface给我们打开了一扇新的窗。
3. 在Go 1.7 以后引进的强大的Context上下文,实现并发控制
3.1 简介
在一些简单场景下使用 channel
和 WaitGroup
已经足够了,但是当面临一些复杂多变的网络并发场景下 channel
和 WaitGroup
显得有些力不从心了。比如一个网络请求 Request
,每个 Request
都需要开启一个 goroutine
做一些事情,这些 goroutine
又可能会开启其他的 goroutine
,比如数据库和RPC服务。所以我们需要一种可以跟踪 goroutine
的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context
,称之为上下文非常贴切,它就是goroutine
的上下文。它是包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context
里,再将它传给要执行的 goroutine
。
context
包主要是用来处理多个 goroutine
之间共享数据,及多个 goroutine
的管理。
3.2 package context
context
包的核心是 struct Context
,接口声明如下:
// A Context carries a deadline, cancelation signal, and request-scoped values// across API boundaries. Its methods are safe for simultaneous use by multiple// goroutines.type Context interface { // Done returns a channel that is closed when this `Context` is canceled // or times out. Done() <-chan struct{} // Err indicates why this Context was canceled, after the Done channel // is closed. Err() error // Deadline returns the time when this Context will be canceled, if any. Deadline() (deadline time.Time, ok bool) // Value returns the value associated with key or nil if none. Value(key interface{}) interface{} }
Done()
返回一个只能接受数据的channel
类型,当该context关闭或者超时时间到了的时候,该channel就会有一个取消信号Err()
在Done()
之后,返回context
取消的原因。Deadline()
设置该context cancel
的时间点Value()
方法允许Context
对象携带request
作用域的数据,该数据必须是线程安全的。
Context
对象是线程安全的,你可以把一个 Context
对象传递给任意个数的 gorotuine
,对它执行 取消 操作时,所有 goroutine
都会接收到取消信号。
一个 Context
不能拥有 Cancel
方法,同时我们也只能 Done channel
接收数据。
背后的原因是一致的:接收取消信号的函数和发送信号的函数通常不是一个。
一个典型的场景是:父操作为子操作操作启动 goroutine
,子操作也就不能取消父操作。
3.3 继承 context
context 包提供了一些函数,协助用户从现有的 Context
对象创建新的 Context
对象。
这些 Context
对象形成一棵树:当一个 Context
对象被取消时,继承自它的所有 Context
都会被取消。
Background
是所有 Context
对象树的根,它不能被取消。它的声明如下:
// Background returns an empty Context. It is never canceled, has no deadline,// and has no values. Background is typically used in main, init, and tests,// and as the top-level `Context` for incoming requests.func Background() Context
WithCancel
和 WithTimeout
函数 会返回继承的 Context
对象, 这些对象可以比它们的父 Context 更早地取消。
当请求处理函数返回时,与该请求关联的 Context
会被取消。 当使用多个副本发送请求时,可以使用 WithCancel
取消多余的请求。 WithTimeout
在设置对后端服务器请求超时时间时非常有用。 下面是这三个函数的声明:
// WithCancel returns a copy of parent whose Done channel is closed as soon as// parent.Done is closed or cancel is called.func WithCancel(parent Context) (ctx Context, cancel CancelFunc)// A CancelFunc cancels a Context.type CancelFunc func()// WithTimeout returns a copy of parent whose Done channel is closed as soon as// parent.Done is closed, cancel is called, or timeout elapses. The new// Context's Deadline is the sooner of now+timeout and the parent's deadline, if// any. If the timer is still running, the cancel function releases its// resources.func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue
函数能够将请求作用域的数据与 Context
对象建立关系。声明如下:
// WithValue returns a copy of parent whose Value method returns val for key.func WithValue(parent Context, key interface{}, val interface{}) Context
3.4 context例子
当然,想要知道 Context 包是如何工作的,最好的方法是看一个例子。
package mainimport ( "context" "fmt" "sync" "time")type Message struct { netId int Data string}type ServerConn struct { sendCh chan Message handleCh chan Message wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc netId int}func main() { conn := &ServerConn{ sendCh: make(chan Message), handleCh: make(chan Message), wg: &sync.WaitGroup{}, netId: 100, } conn.ctx, conn.cancel = context.WithCancel(context.WithValue(context.Background(), "key", conn.netId)) loopers := []func(*ServerConn, *sync.WaitGroup){readLoop, writeLoop, handleLoop} for _, looper := range loopers { conn.wg.Add(1) go looper(conn, conn.wg) } go func() { time.Sleep(time.Second * 3) conn.cancel() }() conn.wg.Wait() }func readLoop(c *ServerConn, wg *sync.WaitGroup) { netId, _ := c.ctx.Value("key").(int) handlerCh := c.handleCh ctx, _ := context.WithCancel(c.ctx) cDone := ctx.Done() defer wg.Done() for { time.Sleep(time.Second * 1) select { case <-cDone: fmt.Println("readLoop close") return default: handlerCh <- Message{netId, "Hello world"} } } }func handleLoop(c *ServerConn, wg *sync.WaitGroup) { handlerCh := c.handleCh sendCh := c.sendCh ctx, _ := context.WithCancel(c.ctx) cDone := ctx.Done() defer wg.Done() for { select { case handleData, ok := <-handlerCh: if ok { handleData.netId++ handleData.Data = "I am whole world" sendCh <- handleData } case <-cDone: fmt.Println("handleLoop close") return } } }func writeLoop(c *ServerConn, wg *sync.WaitGroup) { sendCh := c.sendCh ctx, _ := context.WithCancel(c.ctx) cDone := ctx.Done() defer wg.Done() for { select { case sendData, ok := <-sendCh: if ok { fmt.Println(sendData) } case <-cDone: fmt.Println("writeLoop close") return } } }
在上面的例子中,�模仿了Golang后台程序主要业务流程, 当一个TCP连接到来时通过启动三个goroutine
来分别处理收发和处理数据。而这三个goroutine
的是并发运行的,通过channel
、sync.WaitGroup
和context
控制数据的处理。
在�每一个循环中产生一个goroutine
,每一个goroutine
中都传入context
,在每个goroutine
中通过传入ctx
创建一个子Context
,并且通过select
一直监控该Context
的运行情况,当在父Context
退出的时候,代码中并没有�明显调用子Context
的Cancel
函数,但是分析结果,子Context
还是被正确合理的关闭了,这是因为,所有基于这个Context
或者衍生的子Context
都会收到通知,这时就可以进行清理操作了,最终释放goroutine
,这就优雅的解决了goroutine
启动后不可控的问题。
下面是运行结果:
Screen Shot 2017-09-17 at 19.29.44.png
3.5 Context 使用原则
不要把
Context
放在结构体中,要以参数的方式传递以
Context
作为参数的函数方法,应该把Context
作为第一个参数,放在第一位。给一个函数方法传递
Context
的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
Context
的Value
相关方法应该传递必须的数据,不要什么数据都使用这个传递Context
是线程安全的,可以放心的在多个goroutine
中传递
作者:wiseAaron
链接:https://www.jianshu.com/p/6032f2db6be5