如何确保在 goroutines 中启动的 goroutines 彼此同步?

这是我第一次使用 Go 的并发特性,我正在深入研究。


我想对 API 进行并发调用。该请求基于我想要收到的帖子的标签(可以有 1 <= N 个标签)。响应正文如下所示:


{

    "posts": [

        {

            "id": 1,

            "author": "Name",

            "authorId": 1,

            "likes": num_likes,

            "popularity": popularity_decimal,

            "reads": num_reads,

            "tags": [ "tag1", "tag2" ]

        },

        ...

    ]

}

我的计划是将一堆通道菊花链在一起,并产生一些从这些通道读取和或写入的 goroutine:


- for each tag, add it to a tagsChannel inside a goroutine

- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint

- for each response of that request, pass the underlying slice of posts into another goroutine

- for each individual post inside the slice of posts, add the post to a postChannel

- inside another goroutine, iterate over postChannel and insert each post into a data structure


这是我到目前为止所拥有的:


func (srv *server) Get() {

    // Using red-black tree prevents any duplicates, fast insertion

    // and retrieval times, and is sorted already on ID.

    rbt := tree.NewWithIntComparator()

    // concurrent approach

    tagChan := make(chan string)                       // tags -> tagChan

    postChan := make(chan models.Post)                 // tagChan -> GET -> post -> postChan

    errChan := make(chan error)                        // for synchronizing errors across goroutines

    wg := &sync.WaitGroup{}                            // for synchronizing goroutines

    wg.Add(4)

    // create a go func to synchronize our wait groups

    // once all goroutines are finished, we can close our errChan

    go func() {

        wg.Wait()

        close(errChan)

    }()

    go insertTags(tags, tagChan, wg)

    go fetch(postChan, tagChan, errChan, wg)

    go addPostToTree(rbt, postChan, wg)

    for err := range errChan {

        if err != nil {

            srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)

        }

    }

}


SMILET
浏览 100回答 1
1回答

米脂

首先,整个设计相当复杂。说到最后我的想法。您的代码中有两个问题:posts通道永远不会关闭,因此addPostToTree可能永远不会存在循环,从而导致一个 waitGroup 永远不会减少(在您的情况下,程序挂起)。程序有可能无限期地等待死锁(认为其他 goroutine 会释放它,但所有 goroutine 都卡住了)。解决方法:您可以关闭postChan频道。但是怎么做?始终建议制作人始终关闭频道,但您有多个制作人。所以最好的选择是,等待所有生产者完成,然后关闭通道。为了等待所有生产者完成,您需要创建另一个 waitGroup 并使用它来跟踪子例程。代码:// fetch completes a GET request to the endpointfunc fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {&nbsp; &nbsp; postsWG := &sync.WaitGroup{}&nbsp; &nbsp; for tag := range tags {&nbsp; &nbsp; &nbsp; &nbsp; ep, err := formURL(tag)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; errs <- err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; postsWG.Add(1) // QUESTION should I use a separate wait group here?&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resp, err := http.Get(ep.String())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; errs <- err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; container := models.PostContainer{}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = json.NewDecoder(resp.Body).Decode(&container)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer resp.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go insertPosts(posts, container.Posts, postsWG)&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; postsWG.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(posts)&nbsp; &nbsp; &nbsp; &nbsp; group.Done()&nbsp; &nbsp; }()}现在,我们还有另一个问题,主要的 waitGroup 应该使用3而不是初始化4。这是因为主例程只增加了 3 个例程wg.Add(3),因此它必须只跟踪这些例程。对于子例程,我们使用不同的 waitGroup,因此这不再是父例程的头疼问题。代码:errChan := make(chan error)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // for synchronizing errors across goroutines&nbsp; &nbsp; wg := &sync.WaitGroup{}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // for synchronizing goroutines&nbsp; &nbsp; wg.Add(3)&nbsp; &nbsp; // create a go func to synchronize our wait groups&nbsp; &nbsp; // once all goroutines are finished, we can close our errChanTLDR——复杂设计 - 由于主等待组在一个地方启动,但每个 goroutine 都在根据需要修改这个 waitGroup。因此,没有单一的所有者,这使得调试和维护超级复杂(+ 不能确保它没有错误)。我建议将其分解并为每个子例程设置单独的跟踪器。这样,正在运行更多例程的调用者只能专注于跟踪其子 goroutine。然后,该例程将仅在其完成后才通知其父 waitGroup(及其子程序完成,而不是让子程序直接通知祖父母)。另外,fetch在进行 HTTP 调用并获得响应后的方法中,为什么要创建另一个 goroutine 来处理这些数据?无论哪种方式,这个 goroutine 在数据插入发生之前都无法退出,也不会执行数据处理发生的其他操作。据我了解,第二个 goroutine 是多余的。group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go insertPosts(posts, container.Posts, group)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer group.Done()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go