您将如何调用带有参数的函数(在我的代码中称为 func)作为 event_loop 的一部分?考虑到这些函数应该在服务器传入请求时多次调用。
async def my_func_1(json_message: dict, writer: StreamWriter) -> bool:
return True
async def my_func_2(json_message: dict, writer: StreamWriter) -> bool:
return True
async def my_func_3(json_message: dict, writer: StreamWriter) -> bool:
return True
switcher = {
"Forward": my_func_1,
"Backward": my_func_2,
"Up": my_func_3
}
async def dispatcher(reader: StreamReader, writer: StreamWriter):
try:
msg = await reader.readline()
message = ujson.decode(msg.decode())
except Exception:
print("unable to parse json from read stream:" + str(msg.decode()))
if "method" in message:
func = switcher.get(message['method'], UnknownMethod)
# how would you invoke func with arguments as part of the event_loop?
# considering these functions should be invoked multiple times.
def init():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(dispatcher, '127.0.0.1', 666, loop=loop)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
if __name__ == '__main__':
init()
感谢所有的帮助
慕工程0101907
相关分类