猿问

asyncio.start_unix_server 和 redis 的套接字错误

我正在尝试使用 asyncio 和 Unix 域套接字在 Python 中构建一个玩具内存 Redis 服务器。


baz我的最小示例只返回每个请求的值:


import asyncio



class RedisServer:

    def __init__(self):

        self.server_address = "/tmp/redis.sock"


    async def handle_req(self, reader, writer):

        await reader.readline()

        writer.write(b"$3\r\nbaz\r\n")

        await writer.drain()

        writer.close()

        await writer.wait_closed()


    async def main(self):

        server = await asyncio.start_unix_server(self.handle_req, self.server_address)

        async with server:

            await server.serve_forever()


    def run(self):

        asyncio.run(self.main())



RedisServer().run()

当我使用以下脚本使用客户端库测试两个连续的客户端请求时,它可以redis工作

import time

import redis



r = redis.Redis(unix_socket_path="/tmp/redis.sock")


r.get("foo")

time.sleep(1)

r.get("bar")

但是,如果我删除time.sleep(1),有时它会起作用,有时第二个请求会失败,并出现以下任一情况:


Traceback (most recent call last):

  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command

    sendall(self._sock, item)

  File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall

    return sock.sendall(*args, **kwargs)

BrokenPipeError: [Errno 32] Broken pipe


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "test.py", line 9, in <module>

    r.get("bar")

  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get

    return self.execute_command('GET', name)

  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command

    conn.send_command(*args)

  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command

    self.send_packed_command(self.pack_command(*args),

  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command

    raise ConnectionError("Error %s while writing to socket. %s." %

redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.



看来我的实现缺少客户端库期望的一些关键行为(可能是由于它是异步的)。我缺少什么?


潇潇雨雨
浏览 82回答 1
1回答

拉风的咖菲猫

write_eof()如果您想在每次请求后关闭套接字,则需要使用缓冲的写入数据刷新后,关闭流的写入端。您的代码稍加修改将如下所示:async def handle_req(self, reader, writer):    await reader.readline()    writer.write(b"$3\r\nbaz\r\n")    await writer.drain()    writer.write_eof()    writer.close()    await writer.wait_closed()通常,您不会在每次请求后关闭套接字。以下示例仅用于说明目的,旨在表明套接字不需要关闭。当然,您总是会读取一行,然后根据 Redis 协议解释数据。我们知道这里发送了两个 GET 命令(每行 5 行,包含 2 个元素的数组的指示符,字符串的指示符,字符串值“GET”,以及字符串指示符和相应的值,即键)async def handle_req(self, reader, writer):    print("start")    for i in range(0, 2):        for x in range(0, 5):            print(await reader.readline())        writer.write(b"$3\r\nbaz\r\n")        await writer.drain()    writer.write_eof()    writer.close()    await writer.wait_closed()在客户端发送是这样完成的:print(r.get("foo"))print(r.get("bar"))time.sleep(1)最后一次time.sleep是为了确保客户端不会立即退出。控制台上的输出是:startb'*2\r\n'b'$3\r\n'b'GET\r\n'b'$3\r\n'b'foo\r\n'b'*2\r\n'b'$3\r\n'b'GET\r\n'b'$3\r\n'b'bar\r\n'请注意,start仅输出一次,这表明我们可以处理多个请求,而不必立即关闭套接字。
随时随地看视频慕课网APP

相关分类

Python
我要回答