去lang关闭管道死锁

我正在使用Go语言进行数据导入工作,我想将每个步骤都写为闭包,并使用通道进行通信,也就是说,每个步骤都是并发的。可以通过以下结构定义问题。

  1. 从数据源 获取小部件

    1. 将源1的翻译添加到WidgetRevisions

    2. 将源2的翻译添加到WidgetRevisions

    1. 将翻译从源1添加到Widgets

    2. 将翻译从源2添加到Widgets

    3. 将来自源1的定价添加到Widget中

    4. WidgetRevisions添加到Widgets

出于这个问题的目的,我仅处理必须在新Widget上执行的前三个步骤。在此基础上,我假设第四步可以作为流水线步骤来实现,而流水线步骤本身可以通过控制* WidgetRevision * s的三步流水线来实现。

为此,我一直在编写一小段代码来为我提供以下API:

// A Pipeline is just a list of closures, and a smart 

// function to set them all off, keeping channels of

// communication between them.

p, e, d := NewPipeline()


// Add the three steps of the process

p.Add(whizWidgets)

p.Add(popWidgets)

p.Add(bangWidgets)


// Start putting things on the channel, kick off

// the pipeline, and drain the output channel

// (probably to disk, or a database somewhere)

go emit(e)

p.Execute()

drain(d)

我已经实现了它(在Gist或Go Playground上的代码),但是由于100%的成功失败率而陷入僵局


调用时会出现死锁p.Execute(),因为大概其中一个通道最终将无事可做,其中任何一个都没有发送,也无事可做...


添加一些调试输出线的emit()和drain(),我看到下面的输出,相信封调用之间的流水线是正确的,我看到了一些小工具被省略。


Emitting A Widget

Input Will Be Emitted On 0x420fdc80

Emitting A Widget

Emitting A Widget

Emitting A Widget

Output Will Drain From 0x420fdcd0

Pipeline reading from 0x420fdc80 writing to 0x420fdd20

Pipeline reading from 0x420fdd20 writing to 0x420fddc0

Pipeline reading from 0x420fddc0 writing to 0x42157000

我对这种方法了解以下几点:

  • 我相信这种设计“饿死”一个或多个协程的情况并不少见,这就是为什么这会导致僵局

  • 我更希望管道首先放入东西(API将实现 Pipeline.Process(*Widget)

    • 如果我可以完成这项工作,那么排空可能是一个“步骤”,只是没有将任何内容传递给下一个函数,这可能是一个更简洁的API

  • 我知道我还没有实现任何类型的梯级缓冲区,所以我完全有可能会过载机器的可用内存

  • 我真的不相信这是一种很好的Go风格...但是它似乎利用了很多Go功能,但这并没有真正的好处。

  • 由于WidgetRevisions也需要管道,因此我想使管道更通用,也许是一种interface{}类型的解决方案,我不知道做得足够好以确定这是否明智。

  • 建议我考虑实现互斥体以防止出现竞争情况,但是我相信我会保存下来,因为每个闭包都将在Widget结构的一个特定单元上运行,但是我很乐意接受有关该主题的教育。 。

简介:我如何解决此代码,我应该解决此代码,如果您是比我更有经验的Go程序员,您将如何解决此“顺序工作单元”问题?


POPMUISE
浏览 158回答 1
1回答
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go