我正在实施 ZMQ 的浓缩咖啡模式。
我想连接很多订阅者 <> 代理 <> 许多发布者
但是,代理中的侦听器仅接收来自一个发布者的消息。因此,订阅者只能从那个特定的发布者那里接收。我不知道我的代码有什么问题。
package playground
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"math/rand"
"time"
"testing"
)
func subscriber_thread(id int) {
subscriber, _ := zmq.NewSocket(zmq.SUB)
subscriber.Connect("tcp://localhost:6001")
subscriber.SetSubscribe("")
defer subscriber.Close()
for {
msg, err := subscriber.RecvMessage(0)
if err != nil {
panic(err)
}
fmt.Println("subscriber id:", id,"received:", msg)
}
}
func publisher_thread(n int) {
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:6000")
for {
s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
_, err := publisher.SendMessage(s)
if err != nil {
panic(err)
}
fmt.Println("publisher sent:", s)
time.Sleep(100 * time.Millisecond) // Wait for 1/10th second
}
}
// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:
func listener_thread() {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Bind("inproc://pipe")
// Print everything that arrives on pipe
for {
msg, err := pipe.RecvMessage(0)
if err != nil {
break // Interrupted
}
fmt.Printf("%q\n", msg)
}
}
func TestZmqEspresso(t *testing.T) {
go publisher_thread(0)
go publisher_thread(1)
go publisher_thread(2)
go subscriber_thread(1)
go subscriber_thread(2)
go listener_thread()
time.Sleep(100 * time.Millisecond)
subscriber, _ := zmq.NewSocket(zmq.XSUB)
subscriber.Connect("tcp://localhost:6000")
publisher, _ := zmq.NewSocket(zmq.XPUB)
publisher.Bind("tcp://*:6001")
listener, _ := zmq.NewSocket(zmq.PAIR)
listener.Connect("inproc://pipe")
zmq.Proxy(subscriber, publisher, listener)
fmt.Println("interrupted")
}
慕桂英3389331
相关分类