手记

Python中FastAPI项目的日志记录详解

开发一个服务时,日志记录起着至关重要的作用,因为它允许我们追踪事件的发生,识别错误,并确定是谁调用了API。虽然我理解它的重要性,但我从未完全理解其机制,所以我这次想更深入地研究一下。

我的项目使用Python FastAPI,我会用这个logging包来练习。

首先,让我给大家介绍一下日志记录组件。

  • Loggers:日志模块中的一个主要组件,负责所有日志的设置,例如可以记录什么或日志的级别等。值得注意的是,“日志记录器 绝对不应该 直接实例化,而应始终通过模块级函数logging.getLogger(name)来创建”。
  • Handlers:将日志记录发送到目标。各种处理器可供不同类型的日志使用。以下是三种常见的处理器,请参阅 这里 了解详细信息。
    • StreamHandler:将日志输出发送到流,如 sys.stdout, sys.stderr 或任何文件对象。换句话说,你可以将任何希望在控制台上看到的内容添加到日志中。
    • FileHandler:将日志输出发送到磁盘文件。你可以通过参数来指定输出文件的名字。
    • RotatingFileHandler:用于处理日志文件过大的情况,它会定期将日志文件轮转。
  • Formatters:指定最终输出中日志记录的布局。
  • Filters:用来决定需要记录哪些内容到输出。

当然,我们也可以不使用客户的日志工具来记录日志。直接通过 logging 模块设置不同的日志级别,比如。

来自logging包的日志级别数值如下:

    import logging  
    logging.warning("这里有个警告!")  
    logging.info("这是信息:')"")  
    logging.error("这里有个错误QQ")

接下来,我们可以把这些设置都整合起来。

    import logging  

    logger = logging.getLogger(__name__)  

    #### handler ####  
    console_handler = logging.StreamHandler()  
    # 我们需要使用 addHandler 将处理器和记录器结合  
    logger.addHandler(console_handler)  

    #### formatter ####  
    formatter = logging.Formatter(  
        "%(asctime)s [%(levelname)s] %(message)s"  
     )  
    # 我们需要使用 setFormatter 将处理器和格式化器结合  
    console_handler.setFormatter(formatter)  

    #### 记录警告信息 ####  
    logger.warning("保持冷静!")

随着处理程序数量的增加,单独管理它们可能会变得复杂起来。例如,我添加了另一个处理程序 fileHandler 来输出名为 _mylog 的日志内容。因此,我们可以用 basicConfig 来统一管理它们。

    导入 logging 模块 

    #### 处理器 ####  
    fileHandler = logging.FileHandler("my_log.log")  
    consoleHandler = logging.StreamHandler()  

    logging.basicConfig 设置了日志的基本配置(  
        级别=logging.WARNING,  
        格式='%(asctime)s [%(levelname)s] %(message)s',  
        处理器=[  
            fileHandler,  
            consoleHandler  
        ]  
    )

如果我们要在特定的日志记录器里使用这个配置,我们就可以设置那个日志记录器。

    import logging  

    logger = logging.getLogger("my_customer_logger")  

    # 处理程序  
    fileHandler = logging.FileHandler("my_log.log")  
    consoleHandler = logging.StreamHandler()  

    logging.basicConfig(  
        level=logging.WARNING,  
        format="%(asctime)s [%(levelname)s] %(message)s",  
        handlers=[  
            fileHandler,  
            consoleHandler  
        ]  
    )  

    logger.error('哎呀,出错了!!!')

如果你不想指定日志器名称,这里有一个简单的默认选项。

  • 使用 logging.getLogger() 创建日志记录器,默认为根日志器。
  • 使用 logging.getLogger(__name__),将当前模块名作为日志名称。

随着我们的服务变得越来越复杂化,日志记录系统也变得更加专门化。这时,我们可以用配置文件来设置日志记录。

    // log_config.py

    日志配置 = {  
        "version": 1,  
        "disable_existing_loggers": False,  
        "格式化器": {  
            "默认": {  
                "()": "uvicorn.logging.DefaultFormatter",  
                "fmt": "[%(levelname)s] - %(asctime)s - %(name)s - %(message)s",  
            },  
        },  
        "过滤器": {  
            "敏感数据过滤器": {  
                "()": SensitiveDataFilter,  
            }  
        },  
        "处理器": {  
            "控制台": {  
                "class": "logging.StreamHandler",  
                "formatter": "默认",  
                "level": "DEBUG",  
                "stream": "ext://sys.stdout",  
                "filters": ["敏感数据过滤器"],  
            },  
            "文件": {  
                "formatter": "默认",  
                "class": "logging.handlers.RotatingFileHandler",  
                "level": "DEBUG",  
                "filename": "my_log.log",  
                "mode": "a",  
            },  
        },  
        "日志器": {  
            "my_customer_logger": {  
                "处理器": ["文件", "控制台"],  
                "level": "DEBUG",  
                "propagate": False,  
            }  
        },  
    }

我们可以通过 logging.config.dictConfig(LOGGING_CONFIG) 来应用这个配置。

你可能已经注意到filters被加到了上面代码中的日志记录器。

如果我们想处理敏感信息并且不想让这些敏感数据出现在日志中,我们可以写一个过滤器。以下示例演示如何将特定键和模式标记为“*****”。(参考这份文档:这份文档

    class SensitiveDataFilter(logging.Filter):  
        # 定义一个包含敏感数据键的列表  
        SENSITIVE_KEYS = (  
            "credentials",  
            "authorization",  
            "token",  
            "password",  
            "access_token",  
        )  
        TOKEN_PATTERN = rf"token=([^;]+)"  

        def filter(self, record):  
            try:  
                record.args = self.mask_sensitive_args(record.args)  
                record.msg = self.mask_sensitive_msg(record.msg)  
                return True  
            except Exception as e:  
                return True  

        def mask_sensitive_args(self, args):  
            if isinstance(args, dict):  
                new_args = args.copy()  
                for key in args.keys():  
                    if key.lower() in self.SENSITIVE_KEYS:  
                        new_args[key] = "******"  
                    else:  
                        # 屏蔽字典值中的敏感数据  
                        new_args[key] = self.mask_sensitive_msg(args[key])  
                return new_args  
            # 当record.args包含多个参数时  
            return tuple([self.mask_sensitive_msg(arg) for arg in args])  

        def mask_sensitive_msg(self, message):  
            # 屏蔽多个record.args中的敏感数据  
            if isinstance(message, dict):  
                return self.mask_sensitive_args(message)  
            if isinstance(message, str):  
                # 将token替换为f"token=******"  
                replace = f"token=******"  
                message = re.sub(self.TOKEN_PATTERN, replace, message)  
            return message
在FastAPI中添加日志

我们可以使用Middleware来记录每个请求和响应的信息。

FastAPI 是基于 Starlette 构建的,因此我们可以查看 Starlette 的文档。我们可以将自定义参数如 request_id 存储在 request.state 中。要访问请求体,可以使用 await request.body()。需要注意的是,GET 请求可能没有请求体,因此我们需要处理这种情况,将其赋值为空 {}。另一方面,由于 FastAPI 中的流式响应,读取响应体更加复杂,我们可以通过使用 iterate_in_threadpool 以更高效的方式处理迭代器。

    async def 日志中间处理程序(request: Request, call_next):  
        req_id = str(uuid.uuid4())  
        try:  
            #### 请求日志 ####  
            request.state.req_id = req_id  
            request.state.body = json.loads(await request.body() or "{}")  
            记录请求日志(request)  

            #### 响应处理 ####  
            response = await 调用下一个(request)  
            response_body = ""  
            if response.headers.get("content-type") == "application/json":  
                response_body = [chunk async for chunk in response.body_iterator]  
                response.body_iterator = iterate_in_threadpool(iter(response_body))  
            return response  
        except Exception as e:  
            # Unexpected error handling  
            记录错误(req_id, {'错误消息': 'ERR_UNEXPECTED'})  
            return 抛出 HTTPException(status_code=500, detail='ERR_UNEXPECTED')

我们可以用一个类来定义日志参数,包括请求中的各种参数。请求信息将从RequestInfo中提取,包括来自请求的各种参数。

    class RequestInfo:  
        def __init__(self, request) -> None:  
            self.request = request  # 初始化请求对象

        @property  
        def method(self) -> str:  
            return str(self.request.method)  # 获取请求方法

        @property  
        def route(self) -> str:  
            return self.request["path"]  # 获取请求路径

        @property  
        def ip(self) -> str:  
            return str(self.request.client.host)  # 返回客户端的IP地址

        @property  
        def url(self) -> str:  
            return str(self.request.url)  # 获取请求URL

        @property  
        def host(self) -> str:  
            return str(self.request.url.hostname)  # 获取URL的主机名

        @property  
        def headers(self) -> dict:  
            return {key: value for key, value in self.request.headers.items()}  # 获取请求头

        @property  
        def body(self) -> dict:  
            return self.request.state.body  # 状态体,包含请求的正文部分

    class RequestLog(BaseModel):  
        req_id: str  # 请求ID
        method: str  # 请求方法
        route: str  # 请求路径
        ip: str  # 客户端IP地址
        url: str  # 请求URL
        host: str  # URL的主机名
        body: dict  # 请求正文
        headers: dict  # 请求头

        # 请求日志类,记录请求的相关信息

    class ErrorLog(BaseModel):  
        req_id: str  # 请求ID
        error_message: str  # 错误信息

        # 错误日志类,记录请求ID和错误信息

接下来,创建 log_requestlog_error 模块来使用我们之前定义的记录器。根据响应是标准请求还是错误,记录相应的信息或错误详情。

logger = logging.getLogger("my_customer_logger")  

def log_request(request: Request):  
    # 记录请求信息
    request_info = RequestInfo(request)
    # 创建请求日志
    request_log = RequestLog(
        req_id=request.state.req_id,
        method=request_info.method,
        route=request_info.route,
        ip=request_info.ip,
        url=request_info.url,
        host=request_info.host,
        body=request_info.body,
        headers=request_info.headers,
    )
    # 通过logger记录请求日志
    logger.info(request_log.dict())

def log_error(uuid: str, response_body: dict):  
    # 记录错误信息
    error_log = ErrorLog(
        req_id=uuid,
        error_message=response_body["error_message"],
    )
    # 通过logger记录错误日志
    logger.error(error_log.dict())
    # 记录异常堆栈信息
    logger.error(traceback.format_exc())

最后,我想分享一个我遇到的bug。无论我怎么设置日志配置,却发现只有根日志器在工作,我的自定义日志器却没有被使用。

问题是由于 alembic 引起的,它是一个关系型数据库管理工具。

在 alembic 的教程(https://alembic.sqlalchemy.org/en/latest/tutorial.html)中

[loggers]、[handlers]、[formatters]、[logger*]、[handler]、[formatter_] —
这些部分都是 Python 标准日志配置文件的一部分,其机制在配置文件格式文档中有详细说明。
这些指令和数据库连接指令一样,在 env.py 脚本中的 logging.config.fileConfig() 调用中直接使用。
你可以自由地修改这个脚本。
注意:例如 logger_xxx, handler_xxx, formatter_xxx。

在 env.py 文件中定义了日志配置。

    # 解释Python日志配置文件并配置日志记录器。
    if config.config_file_name is not None:  
        fileConfig(config.config_file_name)

删掉这一行,读一下之前提到的配置文件。

    // main.py  
    // 加载日志配置
    logging.config.dictConfig(LOGGING_CONFIG)

读完文章啦,感谢您阅读,欢迎任何评论或建议哦 (^_^)

0人推荐
随时随地看视频
慕课网APP