所以我们有一个单线程的 Flask 服务器运行,我们从 python 应用程序客户端接收请求。在这个flask 服务器中,我们使用rabbitMQ 和pika 库将消息分发给其他客户端。发生的事情是在 get 函数中程序因错误而崩溃:
pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - 60 类的预期内容标题,而是得到非内容标题框架')
我在堆栈溢出和其他方面搜索了很多关于此的主题,但它们都解决了多线程问题,但事实并非如此。除非在 app.run(threaded=yes) 中被调用,否则 Flask 应该只为一个线程服务。
当在很短的时间间隔内(例如每秒 5 条)发送多条消息时,程序通常会崩溃,同样重要的是要注意每秒都会收到一条消息并请求此函数:
@app.route('/api/users/getMessages', methods=['POST'])
def get_Messages():
data = json.loads(request.data)
token = data['token']
payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)
messages = []
queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')
res = {'messages':messages, 'error':0}
return jsonify(res)
在此代码中,它在以下行中正常崩溃:
queue = channel.queue_declare(queue=istid)
但是我们也尝试更改代码以使用 while 而不是 a 来表示当 body 为 None 并且它在行
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True) 中崩溃时的结束位置:在这种情况下。同样重要的是,崩溃是随机的,它可以工作几次,然后在发送消息时在 get 请求之后随机崩溃。如果有人知道与此相关的任何事情,我们将不胜感激。
另一个注意事项,我们考虑过使用带有回调的 basic_consume 而不是 basic_get ,但我们没有找到一种可行的方法,因为我们必须将消息发回并且有多个用户向同一函数发出请求。
编辑 #1: 在 rabbitMQ 文档rabbitmq 中,如果您搜索函数“def basic_get”,您会注意到有一些 TODO 注释以及对此的引用
由于实现细节,在执行回调之前不能再次调用它。
所以我怀疑这可能是正在发生的事情,但即使是这样,我也不知道如何解决。
相关分类