我有一个 XPUB/XSUB 设备和多个模拟发布者在一个进程中运行。在一个单独的过程中,我想连接订阅者并将收到的消息打印到终端。下面我将展示一个简单函数的两个变体来实现这一点。我将这些函数包装为命令行实用程序。
我的问题是 asyncio 变体永远不会接收消息。
另一方面,非异步变体工作得很好。我已经测试了 ipc 和 tcp 传输的所有情况。在我的测试中,发布过程从未改变,除非我重新启动它以更改传输。这些消息是短字符串,大约每秒发布一次,因此我们不考虑性能问题。
用户程序无限期地在线msg = await sock.receive_multipart()。在 XPUB/XSUB 设备中,我有显示消息转发的仪器sock.setsockopt(zmq.SUBSCRIBE, channel.encode()),与非异步变体连接时相同。
asyncio 变体(不起作用,如所述)
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
async def task():
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
asyncio.run(task())
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
常规阻塞变体(工作正常)
def subs(url, channel):
import zmq
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
def task():
while True:
msg = sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
task()
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
对于这个特定的工具,不需要使用 asyncio。但是,我在代码的其他地方也遇到了这个问题,异步接收永远不会收到。因此,我希望通过在这个简单的案例中弄清楚它,我将了解一般情况下出了什么问题。
我的版本是
import zmq
zmq.zmq_version() # '4.3.2'
zmq.__version__ # '19.0.2'
我使用的是 MacOS 10.13.6。
我完全没有主意了。互联网,请帮忙!
catspeake
相关分类