多个戈鲁丁在一个通道上有选择地监听

我看过这个这个这个这个,但在这种情况下没有一个真正帮助我。我有多个戈鲁丁,如果通道中的值是针对该特定戈鲁廷的,则需要执行一些任务。

var uuidChan chan string


func handleEntity(entityUuid string) {

    go func() {

        for {

            select {

            case uuid := <-uuidChan:

                if uuid == entityUuid {

                    // logic

                    println(uuid)

                    return

                }

            case <-time.After(time.Second * 5):

                println("Timeout")

                return

            }

        }

    }()

}


func main() {

    uuidChan = make(chan (string))

    for i := 0; i < 5; i++ {

        handleEntity(fmt.Sprintf("%d", i))

    }

    for i := 0; i < 4; i++ {

        uuidChan <- fmt.Sprintf("%d", i)

    }

}

https://play.golang.org/p/Pu5MhSP9Qtj


在上面的逻辑中,uuid 由其中一个通道接收,但没有任何反应。为了解决这个问题,我尝试更改逻辑,以便在某个 uuid 的逻辑不在该例程中时将 uuid 重新插入到通道中。我知道这是一种不好的做法,这也行不通。


func handleEntity(entityUuid string) {

    go func() {

        var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.

        for {

            select {

            case uuid := <-uuidChan:

                if uuid == entityUuid {

                    // logic

                    println(uuid)

                    return

                } else {

                    notMe = append(notMe, uuid)

                }

            case <-time.After(time.Second * 5):

                println("Timeout")

                defer func() {

                    for _, uuid := range notMe {

                        uuidChan <- uuid

                    }

                }()

                return

            }

        }

    }()

}

https://play.golang.org/p/5On-Vd7UzqP


正确的方法是什么?


芜湖不芜
浏览 74回答 2
2回答

泛舟湖上清波郎朗

你有一个盒子里面有一个标签,所以收件人应该先阅读标签,然后决定如何处理它。如果将标签放在盒子内 -&nbsp;您将迫使制造商打开盒子(请参阅解决方案1)。我鼓励您提供更好的邮政服务,并将标签至少放在盒子外面(请参阅解决方案3) - 或者更好地立即将盒子转移到正确的地址(请参阅解决方案2):有很多解决方案可以解决这个问题,你只能受到想象力的限制:1.由于你只有一个通道,里面有一个带有ID的数据,对于一个ID的消费者来说,你只能从通道中读取一次数据(假设通道内的数据很重要) - 你有一个简单的sulotion: 使用读取 goroutine 从通道读取数据,然后应用逻辑来决定如何处理此数据 - 例如,将其发送到另一个 goroutine 或运行任务。试试这个:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; uuidChan := make(chan string)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; t := time.NewTimer(5 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; defer t.Stop()&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case uuid, ok := <-uuidChan:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Channel closed.")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }// logic:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Multiple goroutines listening selectively on one channel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go consume(uuid, &wg)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // switch uuid {case 1: go func1(); case 2: go func2()}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <-t.C:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Timeout")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; for i := 0; i < 4; i++ {&nbsp; &nbsp; &nbsp; &nbsp; uuidChan <- fmt.Sprintf("%d", i)&nbsp; &nbsp; }&nbsp; &nbsp; close(uuidChan) // free up the goroutine&nbsp; &nbsp; wg.Wait() // wait until all consumers are done&nbsp; &nbsp; fmt.Println("All done.")}// Multiple goroutines listening selectively on one channelfunc consume(uuid string, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()// logic: or decide here based on uuid&nbsp; &nbsp; fmt.Println("job #:", uuid) // job}输出:job #: 0job #: 2job #: 1Channel closed.job #: 3All done.使用每个戈鲁丁的通道,试试这个:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; // for {&nbsp; &nbsp; select {&nbsp; &nbsp; case uuid, ok := <-uuidChan:&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("closed")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return // free up goroutine on chan closed&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(uuid)&nbsp; &nbsp; &nbsp; &nbsp; return // job done&nbsp; &nbsp; case <-time.After(1 * time.Second):&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Timeout")&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; // }}func main() {&nbsp; &nbsp; const max = 5&nbsp; &nbsp; slice := make([]chan string, max)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; for i := 0; i < max; i++ {&nbsp; &nbsp; &nbsp; &nbsp; slice[i] = make(chan string, 1)&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go handleEntity(slice[i], &wg)&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 4; i++ {&nbsp; &nbsp; &nbsp; &nbsp; slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println("All done.")}输出:3012TimeoutAll done.使用和信号广播:所以我们有一个盒子,并使用名为共享var的名称,我们在盒子的顶部添加接收者的地址。这里使用名为“首先将框设置为所需的ID”,然后使用信号广播通知所有正在收听的goroutines唤醒并检查和时间,以查看一个是否已寻址并且是否过期,然后全部返回等待状态,并且已寻址或已过期的通道继续读取无缓冲的信道或退出。然后使用 来表示剩余戈鲁廷的过期,最后让它们全部加入。请注意,第一个应该在之后调用 - 这意味着 goroutines 应该在第一个调用之前运行,所以一种方法是简单地使用另一个命名的 short for 。labelsync.Condlabellabellabellabeltime.AfterFuncwg.Wait()c.Broadcast()c.Wait()c.Broadcast()sync.WaitGroupw4wwait for waitpackage mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func handleEntity(entityUuid string) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; t0 := time.Now()&nbsp; &nbsp; var expired, addressed bool&nbsp; &nbsp; w4w.Done()&nbsp; &nbsp; m.Lock()&nbsp; &nbsp; for !expired && !addressed {&nbsp; &nbsp; &nbsp; &nbsp; c.Wait()&nbsp; &nbsp; &nbsp; &nbsp; addressed = label == entityUuid&nbsp; &nbsp; &nbsp; &nbsp; expired = time.Since(t0) > d&nbsp; &nbsp; }&nbsp; &nbsp; m.Unlock()&nbsp; &nbsp; fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)&nbsp; &nbsp; if !expired && addressed {&nbsp; &nbsp; &nbsp; &nbsp; uuid := <-uuidChan&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("matched =", entityUuid, uuid)&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("done", entityUuid)}func main() {&nbsp; &nbsp; for i := 0; i < 5; i++ {&nbsp; &nbsp; &nbsp; &nbsp; w4w.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go handleEntity(fmt.Sprintf("%d", i))&nbsp; &nbsp; }&nbsp; &nbsp; w4w.Wait()&nbsp; &nbsp; time.AfterFunc(d, func() {&nbsp; &nbsp; &nbsp; &nbsp; // m.Lock()&nbsp; &nbsp; &nbsp; &nbsp; // label = "none"&nbsp; &nbsp; &nbsp; &nbsp; // m.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("expired")&nbsp; &nbsp; &nbsp; &nbsp; c.Broadcast() // expired&nbsp; &nbsp; })&nbsp; &nbsp; for i := 0; i < 4; i++ {&nbsp; &nbsp; &nbsp; &nbsp; m.Lock()&nbsp; &nbsp; &nbsp; &nbsp; label = fmt.Sprintf("%d", i)&nbsp; &nbsp; &nbsp; &nbsp; m.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; c.Broadcast() // notify all&nbsp; &nbsp; &nbsp; &nbsp; uuidChan <- label&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("...")&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println("all done")}var (&nbsp; &nbsp; label&nbsp; &nbsp; string&nbsp; &nbsp; uuidChan = make(chan string)&nbsp; &nbsp; m&nbsp; &nbsp; &nbsp; &nbsp; sync.Mutex&nbsp; &nbsp; c&nbsp; &nbsp; &nbsp; &nbsp; = sync.NewCond(&m)&nbsp; &nbsp; w4w, wg&nbsp; sync.WaitGroup&nbsp; &nbsp; d&nbsp; &nbsp; &nbsp; &nbsp; = 1 * time.Second)输出:id = 0 addressed = true expired = falsematched = 0 0done 0id = 1 addressed = true expired = falsematched = 1 1done 1id = 2 addressed = true expired = falsematched = 2 2done 2id = 3 addressed = true expired = falsematched = 3 3done 3...expiredid = 4 addressed = false expired = truedone 4all done

杨魅力

也许你想映射你的频道,以发送消息到正确的戈鲁丁马上:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time")func worker(u string, c chan string) {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("got %s in %s\n", <-c, u)&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; workers := make(map[string]chan string)&nbsp; &nbsp; for _, u := range []string{"foo", "bar", "baz"} {&nbsp; &nbsp; &nbsp; &nbsp; workers[u] = make(chan string)&nbsp; &nbsp; &nbsp; &nbsp; go worker(u, workers[u])&nbsp; &nbsp; }&nbsp; &nbsp; workers["foo"] <- "hello"&nbsp; &nbsp; workers["bar"] <- "world"&nbsp; &nbsp; workers["baz"] <- "!"&nbsp; &nbsp; fmt.Println()&nbsp; &nbsp; time.Sleep(time.Second)}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go