如何在 for-select 循环中更改通道

我想做一个动态通道池,它可以监听数十万个通道,所有这些都在控制之下,正如我所排除的那样,我希望它能够自动升级,如果有太多的通道监听(goroutine =>反射=>选择)但是在selectN通道观察器编码期间,我被通道替换阻止了


我想在运行时替换chan,这是在选择循环中,我已经尝试了一段时间以使其可用,但事情进展不顺利。


func Test_Change(t *testing.T) {

type A struct {

    ch chan interface{}

}

a := &A{

    ch: make(chan interface{}),

}

go func() {

    for {

        select {

        case v := <-a.ch:

            fmt.Println(v)

        }

    }

}()


newCh := make(chan interface{})

go func() {

    for i := 0; i < 200; i++ {

        a.ch <- i

    }

    a.ch = newCh

}()

go func() {

    for i := 1000; i < 1010; i++ {

        newCh <- i

    }

}()

for {

    select {}

}}

它阻止了


func Test_Change(t *testing.T) {

type A struct {

    ch chan interface{}

    bh chan interface{}

}

a := &A{

    ch: make(chan interface{}),

    bh: make(chan interface{}),

}

notify := make(chan struct{})

go func() {

    for {

        select {

        case v := <-a.ch:

            fmt.Println(v)

        case <-notify:

            fmt.Println("notify")

        }

    }

}()

newCh := make(chan interface{})

go func() {

    for i := 0; i < 200; i++ {

        a.ch <- i

    }

    a.ch = newCh

    notify <- struct{}{}

}()

go func() {

    for i := 1000; i < 1010; i++ {

        newCh <- i

    }

}()

for {

    select {}

}}

它奏效了


慕沐林林
浏览 115回答 3
3回答

慕斯709654

您已经正确确定,使用常规语法(固定数量的事例)不可能在任意大的动态 chan 上设置一个块,并且可以使用反射包。select但是,我不确定这是实现目标的最佳方法。如果您确实有数千个频道需要观看(例如,同时连接数千个远程客户端),则可以使用“扇入”模式将所有内容写入非常少的固定数量的频道,并选择该频道。而不是&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-sigterm:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cleanup()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; os.Exit(1)&nbsp; &nbsp; &nbsp; &nbsp; case msg := <-client1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // process msg...&nbsp; &nbsp; &nbsp; &nbsp; case msg := <-client2:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // process msg...&nbsp; &nbsp; &nbsp; &nbsp; // HOW CAN I DYNAMICALLY ADD AND REMOVE A CLIENT HERE?&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }想想像这样:&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-sigterm:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cleanup()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; os.Exit(1)&nbsp; &nbsp; &nbsp; &nbsp; case msg := <-clients:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // process msg...&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }func addClient(client chan Message) {&nbsp; &nbsp; // Fan-in: read all future messages from client, and write them&nbsp; &nbsp; // to clients.&nbsp; &nbsp; go func(){&nbsp; &nbsp; &nbsp; &nbsp; for msg := range client {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; clients <- msg&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()}替换通道变量的值不是线程安全的(可以是数据争用),但是让多个戈鲁廷同时写入和读取同一通道是完全可以的。clients

潇湘沐

数据竞赛是一个严重的错误和设计缺陷。您可以通过 使用 运行测试或使用 运行程序来检测数据争用。a.chgo test -racego run -race program.go可以替换在 for/select 循环中使用的 chan 的值,只要它是在事例的主体内正确完成的,而不是在另一个并发 goroutine 的代码中。&nbsp; &nbsp; replace := time.After(3 * time.Second)&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case v, ok := <-ch1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // use v...&nbsp; &nbsp; &nbsp; &nbsp; case v, ok := <-ch2:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // use v...&nbsp; &nbsp; &nbsp; &nbsp; case <-replace:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch1 = anotherChannel&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }此示例可运行代码不雅(不要这样做)。您可以将其保存在工作站上,然后尝试使用数据竞速检测器。此固定的示例代码不具有不雅性。

炎炎设计

也许这可能有效;非常非常广泛地测试它,它是可行的,但很难把它弄对。我预计它不会是完美的。而且它缺乏关于“该做”和“不该做”的大量文档。它也不是合并输入通道的版本,它总是一次只消耗一个输入通道。这可能会对性能造成问题。我给出的唯一保证是它没有比赛。虽然我把写一个同人版的任务留给读者作为练习。package mainimport (&nbsp; &nbsp; "fmt")func main() {&nbsp; &nbsp; m := New()&nbsp; &nbsp; go m.Run()&nbsp; &nbsp; input := m.Resize(0)&nbsp; &nbsp; input <- 5&nbsp; &nbsp; input <- 4&nbsp; &nbsp; close(input)&nbsp; &nbsp; input = m.Resize(10)&nbsp; &nbsp; input <- 6&nbsp; &nbsp; input <- 7&nbsp; &nbsp; close(input)&nbsp; &nbsp; input = m.Resize(2)&nbsp; &nbsp; input <- 8&nbsp; &nbsp; input <- 9&nbsp; &nbsp; close(input)&nbsp; &nbsp; m.Close()&nbsp; &nbsp; fmt.Println()}type masterOfThings struct {&nbsp; &nbsp; notify&nbsp; &nbsp; chan notification&nbsp; &nbsp; wantClose chan chan bool}func New() masterOfThings {&nbsp; &nbsp; return masterOfThings{&nbsp; &nbsp; &nbsp; &nbsp; notify:&nbsp; &nbsp; make(chan notification, 1),&nbsp; &nbsp; &nbsp; &nbsp; wantClose: make(chan chan bool),&nbsp; &nbsp; }}type notification struct {&nbsp; &nbsp; N&nbsp; &nbsp;int&nbsp; &nbsp; out chan chan interface{}}func (m masterOfThings) Resize(n int) chan<- interface{} {&nbsp; &nbsp; N := notification{&nbsp; &nbsp; &nbsp; &nbsp; N:&nbsp; &nbsp;n,&nbsp; &nbsp; &nbsp; &nbsp; out: make(chan chan interface{}, 1),&nbsp; &nbsp; }&nbsp; &nbsp; m.notify <- N&nbsp; &nbsp; return <-N.out}func (m masterOfThings) Close() {&nbsp; &nbsp; closed := make(chan bool)&nbsp; &nbsp; m.wantClose <- closed&nbsp; &nbsp; <-closed}func (m masterOfThings) Run() {&nbsp; &nbsp; var input chan interface{}&nbsp; &nbsp; inputs := []chan interface{}{}&nbsp; &nbsp; closers := []chan bool{}&nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; for _, c := range closers {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(c)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; var wantClose bool&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case m := <-m.wantClose:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; closers = append(closers, m)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wantClose = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if len(inputs) < 1 && input == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; case n, ok := <-input:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input = nil&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if len(inputs) > 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input = inputs[0]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; copy(inputs, inputs[1:])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; inputs = inputs[:len(inputs)-1]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else if wantClose {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(n)&nbsp; &nbsp; &nbsp; &nbsp; case n := <-m.notify:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nInput := make(chan interface{}, n.N)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if input == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input = nInput&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; inputs = append(inputs, nInput)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; n.out <- nInput&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go