因此,我想为每个 github 存储库运行一个单独的 goroutine 以获取其所有 PR 并将它们发送到各自的通道并合并所有这些通道以使用来自单个通道的不同存储库的所有 PR。
这是代码片段。
func (c *Client) FetchAllPRs() {
c.GetRepos()
c.GetGitData()
c.ghCollector.SetSHAMap(c.shaMap)
start := time.Now()
logging.Debug("TOTAL REPOS ", len(c.repos))
var channels []<-chan *sources.PRDetails
for _, repo := range c.repos {
channels = append(channels, c.ghCollector.GetPRNumbers(c.context, repo.Details))
}
sink := mergePRChannels(channels)
count := 0
for _ = range sink {
count += 1
}
elapsed := time.Since(start)
logging.Info(count, " took time: ", elapsed)
}
func mergePRChannels(outputsChan []<-chan *sources.PRDetails) <-chan *sources.PRDetails {
var wg sync.WaitGroup
// make return channel
merged := make(chan *sources.PRDetails)
wg.Add(len(outputsChan))
output := func(sc <-chan *sources.PRDetails) {
for sqr := range sc {
merged <- sqr
}
// once channel (square numbers sender) closes,
// call `Done` on `WaitGroup` to decrement counter
wg.Done()
}
for _, optChan := range outputsChan {
go output(optChan)
}
// run goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishes
wg.Wait()
close(merged)
}()
return merged
}
现在来自 Github 的响应是分页的,PR fetching goroutine 在其中运行一个循环。
type PRDetails struct {
RepoName string
PR githubv4.Int
}
所以发生的情况是,它对于第一个请求运行良好,即它将为所有 repos 获取前 100 个 PR 编号,但它永远不会进入 for 循环以使用 Github API 返回的 EndCursor 获取下一批。
它不应该那样做,现在我认为我传递给我的 goroutine 闭包的所有参数都是它们的副本或引用,其他一些 goroutine 可能会将 true 更改为 false,这没有任何意义。由于c.ghCollector.GetPRNumbers 它在一个循环中运行,所以我假设每次调用这个函数时,在闭包之外声明的所有变量都将为每个函数单独实例化,并且我不需要任何互斥体来读取或写入。我很困惑,没有任何意义。
我在这里做错了什么?
aluckdog
相关分类