我正在使用Go语言进行数据导入工作,我想将每个步骤都写为闭包,并使用通道进行通信,也就是说,每个步骤都是并发的。可以通过以下结构定义问题。
从数据源 获取小部件
将源1的翻译添加到WidgetRevisions
将源2的翻译添加到WidgetRevisions
将翻译从源1添加到Widgets。
将翻译从源2添加到Widgets。
将来自源1的定价添加到Widget中。
将WidgetRevisions添加到Widgets。
出于这个问题的目的,我仅处理必须在新Widget上执行的前三个步骤。在此基础上,我假设第四步可以作为流水线步骤来实现,而流水线步骤本身可以通过控制* WidgetRevision * s的三步流水线来实现。
为此,我一直在编写一小段代码来为我提供以下API:
// A Pipeline is just a list of closures, and a smart
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()
// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)
我已经实现了它(在Gist或Go Playground上的代码),但是由于100%的成功失败率而陷入僵局
调用时会出现死锁p.Execute(),因为大概其中一个通道最终将无事可做,其中任何一个都没有发送,也无事可做...
添加一些调试输出线的emit()和drain(),我看到下面的输出,相信封调用之间的流水线是正确的,我看到了一些小工具被省略。
Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000
我对这种方法了解以下几点:
我相信这种设计“饿死”一个或多个协程的情况并不少见,这就是为什么这会导致僵局
我更希望管道首先放入东西(API将实现 Pipeline.Process(*Widget)
如果我可以完成这项工作,那么排空可能是一个“步骤”,只是没有将任何内容传递给下一个函数,这可能是一个更简洁的API
我知道我还没有实现任何类型的梯级缓冲区,所以我完全有可能会过载机器的可用内存
我真的不相信这是一种很好的Go风格...但是它似乎利用了很多Go功能,但这并没有真正的好处。
由于WidgetRevisions也需要管道,因此我想使管道更通用,也许是一种interface{}
类型的解决方案,我不知道做得足够好以确定这是否明智。
建议我考虑实现互斥体以防止出现竞争情况,但是我相信我会保存下来,因为每个闭包都将在Widget结构的一个特定单元上运行,但是我很乐意接受有关该主题的教育。 。
简介:我如何解决此代码,我应该解决此代码,如果您是比我更有经验的Go程序员,您将如何解决此“顺序工作单元”问题?
相关分类