我在 Go 中使用通道来处理各种数据管道。代码如下所示:
type Channels struct {
inputs chan string
errc chan error
quit chan struct{}
}
func (c *Channels) doSomethingWithInput() {
defer close(c.quit)
defer close(c.errc)
for input := range p.inputs {
_, err := doSomethingThatSometimesErrors(input)
if err != nil {
c.errc <- err
return
}
}
doOneFinalThingThatCannotError()
return
}
func (c *Channels) inputData(s string) {
// This function implementation is my question
}
func StartProcessing(c *Channels, data ...string) error {
go c.doSomethingWithInput()
go func() {
defer close(c.inputs)
for _, i := range data {
select {
case <-c.quit:
break
default:
}
inputData(i)
}
}()
// Block until the quit channel is closed.
<-c.quit
if err := <-c.errc; err != nil {
return err
}
return nil
}
这似乎是在通道处理器之间传达退出信号的合理方式,并且基于这篇关于 Go 并发模式的博客文章。
我在使用这种模式时遇到的困难是inputData函数。将字符串添加到input通道需要等待doSomethingWithInput()读取通道,但也可能会出错。inputData需要尝试并提供inputs频道,但如果被告知退出则放弃。我能做的最好的是:
func (c *Channels) inputData(s string) {
for {
select {
case <-c.quit:
return
case c.inputs <- s:
return
}
}
}
从本质上讲,“在您的选择之间摇摆不定,直到其中一个坚持下去”。需要明确的是,我不认为这是一个糟糕的设计。只是感觉……很浪费。就像我错过了一些聪明的东西。当频道使用者出错时,如何告诉频道发送者在 Go 中退出?
鸿蒙传说
相关分类