日志记录食谱

作者:

Vinay Sajip <vinay_sajip at red-dove dot com>

本页包含许多与日志记录相关的配方,这些配方在过去被发现很有用。有关教程和参考信息的链接,请参阅其他资源

在多个模块中使用日志记录

多次调用 logging.getLogger('someLogger') 返回对同一记录器对象的引用。这不仅在同一模块中成立,而且在同一 Python 解释器进程中的不同模块之间也成立。对于对同一对象的引用来说是这样;此外,应用程序代码可以在一个模块中定义和配置父记录器,并在单独的模块中创建(但不配置)子记录器,并且所有对子记录器的记录器调用都会传递给父记录器。这是一个主模块

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

这是辅助模块

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

输出如下所示

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

从多个线程记录日志

从多个线程记录日志不需要特殊的操作。以下示例显示了从主(初始)线程和另一个线程记录日志

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

运行时,该脚本应打印如下内容

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

这显示了日志输出像人们预期的那样交错出现。当然,这种方法适用于比此处显示的更多线程。

多个处理器和格式器

记录器是普通的 Python 对象。addHandler() 方法对于可以添加的处理器数量没有最小或最大配额。有时,应用程序将所有严重性级别的消息记录到文本文件,同时将错误或更高级别的消息记录到控制台,这将很有益处。要设置此项,只需配置相应的处理器。应用程序代码中的日志记录调用将保持不变。以下是对先前基于简单模块的配置示例的略微修改

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

请注意,“应用程序”代码不关心多个处理器。所有更改只是添加和配置了一个名为 fh 的新处理器。

在编写和测试应用程序时,创建具有更高或更低严重性过滤器的新处理器非常有帮助。与其使用许多 print 语句进行调试,不如使用 logger.debug:与您稍后必须删除或注释掉的 print 语句不同,logger.debug 语句可以保留在源代码中,并在您再次需要它们之前保持休眠状态。那时,唯一需要发生的更改是修改记录器和/或处理器的严重性级别为调试。

记录到多个目标

假设您想要以不同的消息格式并在不同的情况下记录到控制台和文件。假设您想要将级别为 DEBUG 和更高级别的消息记录到文件,并将级别为 INFO 和更高级别的消息记录到控制台。我们还假设该文件应包含时间戳,但控制台消息不应包含。以下是如何实现此目的

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/tmp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

当您运行它时,在控制台上您将看到

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

并且在文件中您将看到类似如下的内容

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

如您所见,DEBUG 消息仅显示在文件中。其他消息将发送到两个目标。

此示例使用控制台和文件处理器,但您可以根据需要使用任意数量和组合的处理器。

请注意,上面选择的日志文件名 /tmp/myapp.log 意味着在 POSIX 系统上使用临时文件的标准位置。在 Windows 上,您可能需要为日志选择不同的目录名称 - 只需确保该目录存在并且您具有在其中创建和更新文件的权限。

自定义级别的处理

有时,您可能希望执行与处理器中级别的标准处理略有不同的操作,其中阈值以上的所有级别都由处理器处理。为此,您需要使用过滤器。让我们看一个您希望按如下方式安排事情的场景

  • 将严重性为 INFOWARNING 的消息发送到 sys.stdout

  • 将严重性为 ERROR 及以上的消息发送到 sys.stderr

  • 将严重性为 DEBUG 及以上的消息发送到文件 app.log

假设您使用以下 JSON 配置日志记录

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout"
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}

此配置几乎达到了我们的要求,只是 sys.stdout 将显示严重性为 ERROR 的消息,并且只有此严重性及更高的事件以及 INFOWARNING 消息才会被跟踪。为防止这种情况,我们可以设置一个过滤器来排除这些消息并将其添加到相关的处理器。可以通过添加一个与 formattershandlers 并行的 filters 部分来配置

{
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    }
}

并将 stdout 处理器上的部分更改为添加它

{
    "stdout": {
        "class": "logging.StreamHandler",
        "level": "INFO",
        "formatter": "simple",
        "stream": "ext://sys.stdout",
        "filters": ["warnings_and_below"]
    }
}

过滤器只是一个函数,因此我们可以将 filter_maker(一个工厂函数)定义如下

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

这会将传入的字符串参数转换为数值级别,并返回一个函数,该函数仅在传入记录的级别等于或低于指定的级别时才返回 True。请注意,在此示例中,我已在从命令行运行的测试脚本 main.py 中定义了 filter_maker,因此其模块将为 __main__ - 因此在过滤器配置中为 __main__.filter_maker。如果您在其他模块中定义它,则需要更改它。

添加过滤器后,我们可以运行 main.py,其完整内容如下

import json
import logging
import logging.config

CONFIG = '''
{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout",
            "filters": ["warnings_and_below"]
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}
'''

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')

像这样运行之后

python main.py 2>stderr.log >stdout.log

我们可以看到结果符合预期

$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG    - A DEBUG message
INFO     - An INFO message
WARNING  - A WARNING message
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO     - An INFO message
WARNING  - A WARNING message

配置服务器示例

这是一个使用日志配置服务器的模块示例

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

这是一个脚本,它接受一个文件名,并将该文件发送到服务器,并在前面正确加上二进制编码的长度,作为新的日志配置

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

处理阻塞的处理器

有时,您必须让日志处理器在不阻塞您正在记录的线程的情况下完成它们的工作。这在 Web 应用程序中很常见,当然在其他场景中也会发生。

一个常见的导致行为迟缓的罪魁祸首是 SMTPHandler:发送电子邮件可能需要很长时间,这有很多原因,且不受开发人员控制(例如,性能不佳的邮件或网络基础设施)。但是几乎任何基于网络的处理器都可能阻塞:即使是 SocketHandler 操作也可能在后台执行 DNS 查询,速度太慢(并且此查询可能位于套接字库代码的深处,低于 Python 层,且不受您的控制)。

一种解决方案是使用两部分方法。对于第一部分,仅将 QueueHandler 附加到从性能关键线程访问的那些记录器。它们只是写入它们的队列,该队列可以调整为足够大的容量,或者初始化为对其大小没有上限。写入队列通常会被快速接受,尽管您可能需要捕获 queue.Full 异常,以在您的代码中采取预防措施。如果您是库开发人员,并且在您的代码中具有性能关键线程,请务必记录这一点(以及仅将 QueueHandlers 附加到您的记录器的建议),以方便使用您的代码的其他开发人员。

解决方案的第二部分是 QueueListener,它被设计为 QueueHandler 的对应项。QueueListener 非常简单:它会传递一个队列和一些处理器,并且它会启动一个内部线程,该线程会监听其队列中从 QueueHandlers(或任何其他 LogRecords 的来源)发送的 LogRecords。 LogRecords 从队列中删除并传递给处理器进行处理。

拥有单独的 QueueListener 类的优点是,您可以使用相同的实例来服务多个 QueueHandlers。这比拥有现有处理器类的线程版本更节省资源,因为每个处理器会占用一个线程,而没有任何特殊好处。

以下是使用这两个类的示例(省略了导入)

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

运行时,将产生

MainThread: Look out!

注意

尽管之前的讨论并非专门针对异步代码,而是针对慢速日志处理器,但应注意,当从异步代码记录时,网络甚至文件处理器都可能导致问题(阻塞事件循环),因为某些日志记录是从 asyncio 内部完成的。如果应用程序中使用了任何异步代码,最好使用上述方法进行日志记录,以便任何阻塞代码仅在 QueueListener 线程中运行。

在 3.5 版本中更改:在 Python 3.5 之前,QueueListener 始终将从队列收到的每条消息传递给它初始化的每个处理器。(这是因为假设级别过滤都在另一侧完成,即填充队列的位置。)从 3.5 开始,可以通过将关键字参数 respect_handler_level=True 传递给侦听器的构造函数来更改此行为。完成此操作后,侦听器会将每条消息的级别与处理器的级别进行比较,并且仅在适合的情况下才将消息传递给处理器。

跨网络发送和接收日志事件

假设您要跨网络发送日志事件,并在接收端处理它们。一种简单的方法是在发送端将 SocketHandler 实例附加到根记录器

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

在接收端,您可以使用 socketserver 模块设置接收器。这是一个基本的工作示例

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

首先运行服务器,然后运行客户端。在客户端,控制台上不打印任何内容;在服务器端,您应该看到类似以下内容

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

请注意,在某些情况下,pickle 存在一些安全问题。如果这些问题影响到您,您可以通过覆盖 makePickle() 方法并在其中实现您的替代方案,以及调整上述脚本以使用您的替代序列化方案,来使用替代的序列化方案。

在生产环境中运行日志套接字侦听器

要在生产环境中运行日志侦听器,您可能需要使用进程管理工具,例如 Supervisor这是一个 Gist,它提供了使用 Supervisor 运行上述功能的基本文件。它由以下文件组成

文件

目的

prepare.sh

用于准备测试环境的 Bash 脚本

supervisor.conf

Supervisor 配置文件,其中包含侦听器和多进程 Web 应用程序的条目

ensure_app.sh

一个 Bash 脚本,用于确保 Supervisor 以上述配置运行

log_listener.py

接收日志事件并将其记录到文件的套接字侦听器程序

main.py

通过连接到侦听器的套接字执行日志记录的简单 Web 应用程序

webapp.json

Web 应用程序的 JSON 配置文件

client.py

用于练习 Web 应用程序的 Python 脚本

Web 应用程序使用 Gunicorn,这是一个流行的 Web 应用程序服务器,它启动多个工作进程来处理请求。此示例设置显示了工作进程如何写入同一日志文件而不会相互冲突——它们都通过套接字侦听器进行处理。

要测试这些文件,请在 POSIX 环境中执行以下操作

  1. 使用 下载 ZIP 按钮将 Gist 下载为 ZIP 存档。

  2. 将上述文件从存档解压到一个临时目录。

  3. 在临时目录中,运行 bash prepare.sh 以准备就绪。这将创建一个 run 子目录以包含 Supervisor 相关的文件和日志文件,以及一个 venv 子目录以包含一个虚拟环境,其中安装了 bottlegunicornsupervisor

  4. 运行 bash ensure_app.sh 以确保 Supervisor 以上述配置运行。

  5. 运行 venv/bin/python client.py 以练习 Web 应用程序,这将导致记录写入日志。

  6. 检查 run 子目录中的日志文件。您应该在与模式 app.log* 匹配的文件中看到最新的日志行。它们不会按任何特定顺序排列,因为它们是由不同的工作进程以不确定的方式同时处理的。

  7. 您可以通过运行 venv/bin/supervisorctl -c supervisor.conf shutdown 来关闭侦听器和 Web 应用程序。

在不太可能的情况下,如果配置的端口与测试环境中的其他端口冲突,您可能需要调整配置文件。

向日志输出添加上下文信息

有时,您希望日志输出除了传递给日志调用的参数之外,还包含上下文信息。例如,在网络应用程序中,可能需要在日志中记录特定于客户端的信息(例如,远程客户端的用户名或 IP 地址)。虽然您可以使用 extra 参数来实现这一点,但以这种方式传递信息并不总是很方便。虽然在每个连接的基础上创建 Logger 实例可能很诱人,但这并不是一个好主意,因为这些实例不会被垃圾回收。虽然这在实践中不是问题,但当 Logger 实例的数量取决于您希望在应用程序日志中使用的粒度级别时,如果 Logger 实例的数量变得实际上无界,则可能难以管理。

使用 LoggerAdapters 传递上下文信息

一种可以轻松地传递上下文信息以与日志事件信息一起输出的方法是使用 LoggerAdapter 类。这个类被设计得看起来像一个 Logger,因此您可以调用 debug()info()warning()error()exception()critical()log()。这些方法的签名与其在 Logger 中的对应方法相同,因此您可以互换使用这两种类型的实例。

当您创建 LoggerAdapter 的实例时,您将向其传递一个 Logger 实例和一个包含您的上下文信息的类似字典的对象。当您在 LoggerAdapter 的实例上调用其中一个日志记录方法时,它会将调用委托给传递给其构造函数的底层 Logger 实例,并安排在委托调用中传递上下文信息。以下是 LoggerAdapter 代码中的片段

def debug(self, msg, /, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

process() 方法是 LoggerAdapter 将上下文信息添加到日志输出的地方。它会传递日志调用的消息和关键字参数,并返回(可能)修改后的版本,以用于调用底层日志记录器。此方法的默认实现会保持消息不变,但在关键字参数中插入一个“extra”键,其值是传递给构造函数的类字典对象。当然,如果您在调用适配器时传递了“extra”关键字参数,它将被静默覆盖。

使用“extra”的优点是,类字典对象中的值会合并到 LogRecord 实例的 __dict__ 中,允许您将自定义字符串与了解类字典对象键的 Formatter 实例一起使用。如果您需要其他方法,例如,如果您想将上下文信息添加到消息字符串之前或之后,您只需要子类化 LoggerAdapter 并覆盖 process() 以执行您需要的操作。这是一个简单的例子

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

您可以像这样使用它

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

然后,您记录到适配器的任何事件都将在日志消息前面加上 some_conn_id 的值。

使用字典以外的对象传递上下文信息

您不需要将实际的字典传递给 LoggerAdapter - 您可以传递一个实现了 __getitem____iter__ 的类的实例,以便它看起来像一个日志记录的字典。如果您想动态生成值(而字典中的值是常量),这将非常有用。

使用过滤器传递上下文信息

您还可以使用用户定义的 Filter 将上下文信息添加到日志输出中。Filter 实例允许修改传递给它们的 LogRecords,包括添加额外的属性,然后可以使用合适的格式字符串输出这些属性,或者如果需要,可以使用自定义的 Formatter

例如,在 Web 应用程序中,正在处理的请求(或至少是它的有趣部分)可以存储在线程本地 (threading.local) 变量中,然后从 Filter 中访问,以将请求中的信息(例如,远程 IP 地址和远程用户的用户名)添加到 LogRecord,使用属性名称“ip”和“user”,就像上面的 LoggerAdapter 示例一样。在这种情况下,可以使用相同的格式字符串来获得与上面显示的类似的输出。这是一个示例脚本

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

运行时,会产生如下内容

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

使用 contextvars

自 Python 3.7 起,contextvars 模块提供了上下文本地存储,它适用于 threadingasyncio 处理需求。因此,这种类型的存储通常比线程本地更可取。以下示例展示了如何在多线程环境中,使用上下文信息(例如,Web 应用程序处理的请求属性)填充日志。

为了便于说明,假设您有不同的 Web 应用程序,每个应用程序都是独立的,但在同一个 Python 进程中运行,并使用它们通用的库。如何让这些应用程序中的每一个都有自己的日志,其中来自库(和其他请求处理代码)的所有日志消息都被定向到相应应用程序的日志文件,同时在日志中包含其他上下文信息,例如客户端 IP、HTTP 请求方法和客户端用户名?

让我们假设该库可以通过以下代码模拟

# webapplib.py
import logging
import time

logger = logging.getLogger(__name__)

def useful():
    # Just a representative event logged from the library
    logger.debug('Hello from webapplib!')
    # Just sleep for a bit so other threads get to run
    time.sleep(0.01)

我们可以通过两个简单的类 RequestWebApp 来模拟多个 Web 应用程序。这些模拟了实际的线程 Web 应用程序的工作方式 - 每个请求都由一个线程处理

# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib

logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)

class Request:
    """
    A simple dummy request class which just holds dummy HTTP request method,
    client IP address and client username
    """
    def __init__(self, method, ip, user):
        self.method = method
        self.ip = ip
        self.user = user

# A dummy set of requests which will be used in the simulation - we'll just pick
# from this list randomly. Note that all GET requests are from 192.168.2.XXX
# addresses, whereas POST requests are from 192.16.3.XXX addresses. Three users
# are represented in the sample requests.

REQUESTS = [
    Request('GET', '192.168.2.20', 'jim'),
    Request('POST', '192.168.3.20', 'fred'),
    Request('GET', '192.168.2.21', 'sheila'),
    Request('POST', '192.168.3.21', 'jim'),
    Request('GET', '192.168.2.22', 'fred'),
    Request('POST', '192.168.3.22', 'sheila'),
]

# Note that the format string includes references to request context information
# such as HTTP method, client IP and username

formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')

# Create our context variables. These will be filled at the start of request
# processing, and used in the logging that happens during that processing

ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')

class InjectingFilter(logging.Filter):
    """
    A filter which injects context-specific information into logs and ensures
    that only information for a specific webapp is included in its log
    """
    def __init__(self, app):
        self.app = app

    def filter(self, record):
        request = ctx_request.get()
        record.method = request.method
        record.ip = request.ip
        record.user = request.user
        record.appName = appName = ctx_appname.get()
        return appName == self.app.name

class WebApp:
    """
    A dummy web application class which has its own handler and filter for a
    webapp-specific log.
    """
    def __init__(self, name):
        self.name = name
        handler = logging.FileHandler(name + '.log', 'w')
        f = InjectingFilter(self)
        handler.setFormatter(formatter)
        handler.addFilter(f)
        root.addHandler(handler)
        self.num_requests = 0

    def process_request(self, request):
        """
        This is the dummy method for processing a request. It's called on a
        different thread for every request. We store the context information into
        the context vars before doing anything else.
        """
        ctx_request.set(request)
        ctx_appname.set(self.name)
        self.num_requests += 1
        logger.debug('Request processing started')
        webapplib.useful()
        logger.debug('Request processing finished')

def main():
    fn = os.path.splitext(os.path.basename(__file__))[0]
    adhf = argparse.ArgumentDefaultsHelpFormatter
    ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
                                 description='Simulate a couple of web '
                                             'applications handling some '
                                             'requests, showing how request '
                                             'context can be used to '
                                             'populate logs')
    aa = ap.add_argument
    aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
    options = ap.parse_args()

    # Create the dummy webapps and put them in a list which we can use to select
    # from randomly
    app1 = WebApp('app1')
    app2 = WebApp('app2')
    apps = [app1, app2]
    threads = []
    # Add a common handler which will capture all events
    handler = logging.FileHandler('app.log', 'w')
    handler.setFormatter(formatter)
    root.addHandler(handler)

    # Generate calls to process requests
    for i in range(options.count):
        try:
            # Pick an app at random and a request for it to process
            app = choice(apps)
            request = choice(REQUESTS)
            # Process the request in its own thread
            t = threading.Thread(target=app.process_request, args=(request,))
            threads.append(t)
            t.start()
        except KeyboardInterrupt:
            break

    # Wait for the threads to terminate
    for t in threads:
        t.join()

    for app in apps:
        print('%s processed %s requests' % (app.name, app.num_requests))

if __name__ == '__main__':
    main()

如果您运行以上代码,您应该会发现大约一半的请求进入 app1.log,其余的请求进入 app2.log,并且所有请求都记录到 app.log。每个 Web 应用程序特定的日志将仅包含该 Web 应用程序的日志条目,并且请求信息将一致地显示在日志中(即,每个虚拟请求中的信息将始终一起显示在日志行中)。以下 shell 输出说明了这一点

~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
  153 app1.log
  147 app2.log
  300 app.log
  600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim    192.168.2.20 GET  Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__  fred   192.168.2.22 GET  Request processing started
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred   192.168.2.22 GET  Hello from webapplib!
Thread-6 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147

在处理程序中传递上下文信息

每个 Handler 都有自己的过滤器链。如果你想向 LogRecord 添加上下文信息,而不泄露给其他处理器,你可以使用一个过滤器,它返回一个新的 LogRecord,而不是像以下脚本中显示的那样就地修改它。

import copy
import logging

def filter(record: logging.LogRecord):
    record = copy.copy(record)
    record.user = 'jim'
    return record

if __name__ == '__main__':
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(message)s from %(user)-8s')
    handler.setFormatter(formatter)
    handler.addFilter(filter)
    logger.addHandler(handler)

    logger.info('A log message')

从多个进程记录到单个文件

尽管日志记录是线程安全的,并且支持从单个进程中的多个线程记录到单个文件,但支持从多个进程记录到单个文件,因为在 Python 中没有标准方法可以序列化对多个进程中单个文件的访问。如果你需要从多个进程记录到单个文件,一种方法是让所有进程记录到 SocketHandler,并让一个单独的进程实现一个套接字服务器,该服务器从套接字读取并记录到文件。(如果愿意,你可以将现有进程中的一个线程专用于执行此功能。) 本节 更详细地记录了此方法,并包含一个可用的套接字接收器,你可以将其用作在自己的应用程序中进行适配的起点。

你也可以编写自己的处理器,它使用 Lock 类,该类来自 multiprocessing 模块,以序列化你的进程对文件的访问。stdlib 的 FileHandler 及其子类不使用 multiprocessing

或者,你可以使用 QueueQueueHandler 将所有日志事件发送到你的多进程应用程序中的一个进程。以下示例脚本演示了如何执行此操作;在该示例中,一个单独的侦听器进程侦听其他进程发送的事件,并根据其自身的日志配置记录它们。虽然该示例仅演示了一种执行方式(例如,你可能希望使用侦听器线程而不是单独的侦听器进程——实现方式是类似的),但它允许为侦听器和应用程序中的其他进程使用完全不同的日志配置,并且可以用作满足你自身特定要求的代码的基础。

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上述脚本的一个变体将日志记录保留在主进程中,在一个单独的线程中。

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

此变体显示了如何为特定记录器应用配置,例如 foo 记录器具有一个特殊的处理器,该处理器将所有事件存储在 foo 子系统中的文件 mplog-foo.log 中。这将由主进程中的日志记录机制使用(即使日志事件是在工作进程中生成的),以将消息定向到适当的目标位置。

使用 concurrent.futures.ProcessPoolExecutor

如果你想使用 concurrent.futures.ProcessPoolExecutor 来启动你的工作进程,你需要以稍微不同的方式创建队列。而不是

queue = multiprocessing.Queue(-1)

你应该使用

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

然后你可以将工作进程的创建从这里替换

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

到这里(请记住首先导入 concurrent.futures

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

使用 Gunicorn 和 uWSGI 部署 Web 应用程序

当使用 GunicornuWSGI(或类似)部署 Web 应用程序时,会创建多个工作进程来处理客户端请求。在此类环境中,请避免直接在你的 Web 应用程序中创建基于文件的处理器。相反,请使用 SocketHandler 从 Web 应用程序记录到单独进程中的侦听器。可以使用诸如 Supervisor 之类的进程管理工具进行设置,有关更多详细信息,请参阅在生产环境中运行日志记录套接字侦听器

使用文件轮换

有时你希望让日志文件增长到一定大小,然后打开一个新文件并记录到该文件。你可能希望保留一定数量的这些文件,并且当创建了那么多文件后,轮换这些文件,以便文件的数量和文件的大小都保持有界。对于此使用模式,日志记录包提供了 RotatingFileHandler

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

结果应该是 6 个单独的文件,每个文件都包含应用程序的部分日志历史记录。

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

当前文件始终是 logging_rotatingfile_example.out,并且每次达到大小限制时,都会使用后缀 .1 重命名它。每个现有的备份文件都会被重命名以递增后缀(.1 变为 .2 等),并且 .6 文件会被删除。

显然,此示例将日志长度设置得太小,作为一个极端的示例。你可能希望将 maxBytes 设置为适当的值。

使用替代格式化样式

当将日志记录添加到 Python 标准库时,格式化带有可变内容的消息的唯一方法是使用 %-格式化方法。此后,Python 获得了两种新的格式化方法:string.Template(在 Python 2.4 中添加)和 str.format()(在 Python 2.6 中添加)。

日志记录(从 3.2 开始)为这两种额外的格式化样式提供了改进的支持。Formatter 类已得到增强,可以使用一个名为 style 的附加可选关键字参数。此参数默认为 '%',但其他可能的值为 '{''$',它们对应于另外两种格式化样式。默认情况下(正如你所期望的那样)保持向后兼容性,但通过显式指定 style 参数,你可以指定使用 str.format()string.Template 的格式字符串。这是一个示例控制台会话,显示了各种可能性

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

请注意,最终输出到日志的日志消息的格式完全独立于单个日志消息的构造方式。它仍然可以使用 %-格式化,如此处所示

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

日志记录调用(logger.debug()logger.info() 等)仅将位置参数用于实际的日志消息本身,而关键字参数仅用于确定如何处理实际日志记录调用的选项(例如,exc_info 关键字参数用于指示应记录回溯信息,或者 extra 关键字参数用于指示要添加到日志的其他上下文信息)。因此,你不能直接使用 str.format()string.Template 语法进行日志记录调用,因为在内部,日志记录包使用 %-格式化来合并格式字符串和可变参数。在保留向后兼容性的同时,不可能更改此行为,因为现有代码中存在的所有日志记录调用都将使用 %-格式字符串。

但是,你可以使用 {}- 和 $- 格式化来构造你的单个日志消息。回想一下,对于消息,你可以使用任意对象作为消息格式字符串,并且日志记录包将调用该对象的 str() 来获取实际的格式字符串。请考虑以下两个类

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

这两个类中的任何一个都可以用来代替格式字符串,以允许使用 {}- 或 $-格式化来构建实际的“消息”部分,该部分将出现在格式化的日志输出中,而不是“%(message)s”或“{message}”或“$message”。每次你想记录某些内容时都使用类名有点笨拙,但如果你使用诸如 __ (双下划线——不要与 _ 混淆,_ 是用作 gettext.gettext() 或其同类的同义词/别名的单下划线)的别名,则会非常容易接受。

以上类不包含在 Python 中,尽管它们很容易复制并粘贴到你自己的代码中。它们可以按如下方式使用(假设它们是在一个名为 wherever 的模块中声明的)

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

虽然以上示例使用 print() 来显示格式化的工作方式,但你当然会使用 logger.debug() 或类似的方法来实际记录使用此方法。

需要注意的一点是,使用这种方法您不会付出明显的性能代价:实际的格式化不是在您进行日志调用时发生的,而是在(如果)已记录的消息实际即将被处理程序输出到日志时发生的。因此,唯一稍微不寻常且可能让您困惑的是,括号围绕的是格式化字符串和参数,而不仅仅是格式化字符串。这是因为 `__` 符号只是对 `XXXMessage` 类之一的构造函数调用的语法糖。

如果您愿意,可以使用 LoggerAdapter 来达到与上述类似的效果,如下面的示例所示

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger.log(level, Message(msg, args), **kwargs,
                            stacklevel=stacklevel+1)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

上面的脚本在 Python 3.8 或更高版本中运行时,应记录消息 Hello, world!

自定义 LogRecord

每个日志事件都由一个 LogRecord 实例表示。当一个事件被记录且未被记录器的级别过滤掉时,会创建一个 LogRecord,其中填充有关该事件的信息,然后将其传递给该记录器的处理程序(及其祖先,直到包括禁用进一步向层次结构传播的记录器)。在 Python 3.2 之前,只有两个地方会进行此创建

  • Logger.makeRecord(),在记录事件的正常过程中调用。这将直接调用 LogRecord 来创建一个实例。

  • makeLogRecord(),使用包含要添加到 LogRecord 的属性的字典调用。这通常在通过网络收到合适的字典时调用(例如,通过 SocketHandler 以 pickle 形式,或通过 HTTPHandler 以 JSON 形式)。

这通常意味着如果您需要对 LogRecord 进行任何特殊处理,则必须执行以下操作之一。

  • 创建您自己的 Logger 子类,它覆盖 Logger.makeRecord(),并在实例化任何您关心的记录器之前使用 setLoggerClass() 设置它。

  • 向记录器或处理程序添加一个 Filter,当其 filter() 方法被调用时,它会进行必要的特殊操作。

第一种方法在(例如)多个不同的库想要执行不同的操作的情况下会有些笨拙。每个库都会尝试设置自己的 Logger 子类,最后执行此操作的库将获胜。

第二种方法在许多情况下效果相当好,但它不允许您例如使用 LogRecord 的专门子类。库开发人员可以在其记录器上设置合适的过滤器,但他们必须记住每次引入新记录器时都这样做(他们只需添加新软件包或模块并执行

logger = logging.getLogger(__name__)

在模块级别)。这可能需要考虑的事情太多了。开发人员还可以将过滤器添加到附加到其顶级记录器的 NullHandler,但如果应用程序开发人员将处理程序附加到较低级别的库记录器,则不会调用此过滤器 — 因此该处理程序的输出将无法反映库开发人员的意图。

在 Python 3.2 及更高版本中,LogRecord 的创建是通过您可以指定的工厂完成的。该工厂只是一个可以使用 setLogRecordFactory() 设置和使用 getLogRecordFactory() 查询的可调用对象。该工厂的调用签名与 LogRecord 的构造函数相同,因为 LogRecord 是工厂的默认设置。

此方法允许自定义工厂控制 LogRecord 创建的所有方面。例如,您可以返回一个子类,或者仅在创建后使用类似于此的模式向记录添加一些额外的属性

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

此模式允许不同的库将工厂链接在一起,只要它们不覆盖彼此的属性或无意中覆盖作为标准提供的属性,就不应该有任何意外。但是,应该记住,链中的每个链接都会向所有日志记录操作增加运行时开销,并且只有在 Filter 的使用未提供所需结果时才应使用该技术。

QueueHandler 和 QueueListener 的子类化 - 一个 ZeroMQ 示例

子类化 QueueHandler

您可以使用 QueueHandler 子类将消息发送到其他类型的队列,例如 ZeroMQ “发布”套接字。在下面的示例中,套接字是单独创建的并传递给处理程序(作为其“队列”)

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)


handler = ZeroMQSocketHandler(sock)

当然,还有其他组织方法,例如传递处理程序创建套接字所需的数据

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

子类化 QueueListener

您还可以子类化 QueueListener 从其他类型的队列中获取消息,例如 ZeroMQ “订阅”套接字。这是一个例子

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, /, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

QueueHandler 和 QueueListener 的子类化 - 一个 pynng 示例

与上一节类似,我们可以使用 pynng 来实现侦听器和处理程序,pynngNNG 的 Python 绑定,被誉为 ZeroMQ 的精神继承者。以下代码片段说明了这一点 – 您可以在安装了 pynng 的环境中测试它们。为了多样化,我们首先介绍侦听器。

子类化 QueueListener

# listener.py
import json
import logging
import logging.handlers

import pynng

DEFAULT_ADDR = "tcp://127.0.0.1:13232"

interrupted = False

class NNGSocketListener(logging.handlers.QueueListener):

    def __init__(self, uri, /, *handlers, **kwargs):
        # Have a timeout for interruptability, and open a
        # subscriber socket
        socket = pynng.Sub0(listen=uri, recv_timeout=500)
        # The b'' subscription matches all topics
        topics = kwargs.pop('topics', None) or b''
        socket.subscribe(topics)
        # We treat the socket as a queue
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self, block):
        data = None
        # Keep looping while not interrupted and no data received over the
        # socket
        while not interrupted:
            try:
                data = self.queue.recv(block=block)
                break
            except pynng.Timeout:
                pass
            except pynng.Closed:  # sometimes happens when you hit Ctrl-C
                break
        if data is None:
            return None
        # Get the logging event sent from a publisher
        event = json.loads(data.decode('utf-8'))
        return logging.makeLogRecord(event)

    def enqueue_sentinel(self):
        # Not used in this implementation, as the socket isn't really a
        # queue
        pass

logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
    while True:
        pass
except KeyboardInterrupt:
    interrupted = True
finally:
    listener.stop()

子类化 QueueHandler

# sender.py
import json
import logging
import logging.handlers
import time
import random

import pynng

DEFAULT_ADDR = "tcp://127.0.0.1:13232"

class NNGSocketHandler(logging.handlers.QueueHandler):

    def __init__(self, uri):
        socket = pynng.Pub0(dial=uri, send_timeout=500)
        super().__init__(socket)

    def enqueue(self, record):
        # Send the record as UTF-8 encoded JSON
        d = dict(record.__dict__)
        data = json.dumps(d)
        self.queue.send(data.encode('utf-8'))

    def close(self):
        self.queue.close()

logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# Make sure the process ID is in the output
logging.basicConfig(level=logging.DEBUG,
                    handlers=[logging.StreamHandler(), handler],
                    format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
    # Just randomly select some loggers and levels and log away
    level = random.choice(levels)
    logger = logging.getLogger(random.choice(logger_names))
    logger.log(level, 'Message no. %5d' % msgno)
    msgno += 1
    delay = random.random() * 2 + 0.5
    time.sleep(delay)

您可以在单独的命令 shell 中运行以上两个代码片段。如果我们在一个 shell 中运行侦听器,并在两个单独的 shell 中运行发送者,我们应该会看到类似以下的内容。在第一个发送者 shell 中

$ python sender.py
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
CRITICAL myapp.lib2    613 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
DEBUG         myapp    613 Message no.     6
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
(and so on)

在第二个发送者 shell 中

$ python sender.py
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

在侦听器 shell 中

$ python listener.py
Press Ctrl-C to stop.
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    613 Message no.     3
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
DEBUG         myapp    613 Message no.     6
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

如您所见,来自两个发送者进程的日志记录在侦听器的输出中交错显示。

基于字典的配置示例

下面是一个日志配置字典的示例 - 它取自 Django 项目的文档。此字典传递给 dictConfig() 以使配置生效

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'simple': {
            'format': '{levelname} {message}',
            'style': '{',
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'simple',
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers': ['console'],
            'propagate': True,
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

有关此配置的更多信息,您可以查看 Django 文档的相关部分

使用旋转器和命名器来自定义日志旋转处理

以下可运行脚本给出了如何定义命名器和旋转器的示例,该脚本显示了日志文件的 gzip 压缩

import gzip
import logging
import logging.handlers
import os
import shutil

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, 'rb') as f_in:
        with gzip.open(dest, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    os.remove(source)


rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer

root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
    root.info(f'Message no. {i + 1}')

运行此操作后,您将看到六个新文件,其中五个已压缩

$ ls rotated.log*
rotated.log       rotated.log.2.gz  rotated.log.4.gz
rotated.log.1.gz  rotated.log.3.gz  rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998

一个更详细的多进程示例

以下工作示例显示了如何使用配置文件将日志记录与多进程结合使用。配置相当简单,但可以说明如何在实际的多进程场景中实现更复杂的配置。

在示例中,主进程会生成一个监听器进程和一些工作进程。主进程、监听器和工作进程各自都有三个独立的配置(所有工作进程共享同一个配置)。我们可以看到主进程中的日志记录,工作进程如何将日志记录到 QueueHandler,以及监听器如何实现 QueueListener 和更复杂的日志配置,并安排将通过队列接收到的事件分派到配置中指定的处理程序。请注意,这些配置纯粹是说明性的,但您应该能够根据自己的场景调整此示例。

这是脚本 - 文档字符串和注释有望解释其工作原理

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # The process name is transformed just to show that it's the listener
            # doing the logging to files and console
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

将 BOM 插入发送到 SysLogHandler 的消息中

RFC 5424 要求将 Unicode 消息作为一组字节发送到 syslog 守护程序,该字节具有以下结构:一个可选的纯 ASCII 组件,后跟一个 UTF-8 字节顺序标记 (BOM),后跟使用 UTF-8 编码的 Unicode。(请参阅 规范的相关部分。)

在 Python 3.1 中,代码被添加到 SysLogHandler 中以在消息中插入 BOM,但不幸的是,它的实现不正确,BOM 出现在消息的开头,因此不允许任何纯 ASCII 组件出现在它之前。

由于此行为已损坏,因此不正确的 BOM 插入代码将从 Python 3.2.4 及更高版本中删除。但是,它不会被替换,如果您想生成符合 RFC 5424 的消息,其中包含 BOM,其前有一个可选的纯 ASCII 序列,后跟使用 UTF-8 编码的任意 Unicode,那么您需要执行以下操作

  1. 将一个 Formatter 实例附加到您的 SysLogHandler 实例,并使用如下格式字符串

    'ASCII section\ufeffUnicode section'
    

    Unicode 代码点 U+FEFF,当使用 UTF-8 编码时,将被编码为 UTF-8 BOM – 字节字符串 b'\xef\xbb\xbf'

  2. 用您喜欢的任何占位符替换 ASCII 部分,但请确保替换后出现在其中的数据始终为 ASCII(这样,它在 UTF-8 编码后将保持不变)。

  3. 用您喜欢的任何占位符替换 Unicode 部分;如果替换后出现在那里的数据包含 ASCII 范围之外的字符,那也没关系 - 它将使用 UTF-8 编码。

格式化后的消息SysLogHandler 使用 UTF-8 编码进行编码。如果遵循上述规则,您应该能够生成符合 RFC 5424 的消息。如果您不这样做,日志记录可能不会报错,但您的消息将不符合 RFC 5424,并且您的 syslog 守护程序可能会报错。

实现结构化日志记录

尽管大多数日志消息旨在供人类阅读,因此不易被机器解析,但在某些情况下,您可能希望以结构化格式输出消息,该格式能够被程序解析的(无需复杂的正则表达式来解析日志消息)。使用 logging 包很容易实现这一点。可以通过多种方式来实现,但以下是一种简单的方法,它使用 JSON 以机器可解析的方式序列化事件

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

如果运行上述脚本,它会打印

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

请注意,项目的顺序可能因使用的 Python 版本而异。

如果您需要更专业的处理,可以使用自定义的 JSON 编码器,如下面的完整示例所示

import json
import logging


class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, str):
            return o.encode('unicode_escape').decode('ascii')
        return super().default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

当运行上述脚本时,它会打印

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

请注意,项目的顺序可能因使用的 Python 版本而异。

使用 dictConfig() 自定义处理程序

有时您可能希望以特定的方式自定义日志处理程序,如果您使用 dictConfig(),您或许可以做到这一点而无需子类化。例如,假设您可能希望设置日志文件的所有权。在 POSIX 上,使用 shutil.chown() 可以轻松完成,但 stdlib 中的文件处理程序不提供内置支持。您可以使用如下的普通函数自定义处理程序的创建

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

然后,您可以在传递给 dictConfig() 的日志配置中指定,通过调用此函数创建日志处理程序

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

在此示例中,我只是为了说明目的,使用 pulse 用户和组设置所有权。将其组合成一个可运行的脚本,chowntest.py

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

要运行此脚本,您可能需要以 root 身份运行

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

请注意,此示例使用 Python 3.3,因为 shutil.chown() 在此版本中出现。此方法应适用于任何支持 dictConfig() 的 Python 版本 - 即 Python 2.7、3.2 或更高版本。对于 pre-3.3 版本,您需要使用例如 os.chown() 来实现实际的所有权更改。

实际上,处理程序创建函数可能位于项目中的某个实用程序模块中。您可以使用例如以下代码来替换配置中的行

'()': owned_file_handler,

您可以使用例如

'()': 'ext://project.util.owned_file_handler',

其中 project.util 可以替换为函数所在的包的实际名称。在上面的工作脚本中,使用 'ext://__main__.owned_file_handler' 应该可以工作。在这里,实际的可调用对象由 dictConfig()ext:// 规范中解析。

此示例有望指示您如何以相同的方式,使用 os.chmod() 实现其他类型的文件更改 - 例如设置特定的 POSIX 权限位。

当然,该方法也可以扩展到 FileHandler 以外的处理程序类型 - 例如,轮换文件处理程序之一,或完全不同类型的处理程序。

在整个应用程序中使用特定的格式样式

在 Python 3.2 中,Formatter 获得了一个 style 关键字参数,该参数在默认情况下为 % 以实现向后兼容性,但允许指定 {$ 以支持 str.format()string.Template 支持的格式化方法。请注意,这控制日志消息的格式,以便最终输出到日志,并且与如何构造单个日志消息完全正交。

日志调用(debug()info() 等)仅采用实际日志消息本身的位置参数,关键字参数仅用于确定如何处理日志调用的选项(例如,exc_info 关键字参数指示应记录回溯信息,或 extra 关键字参数指示要添加到日志的其他上下文信息)。因此,您无法直接使用 str.format()string.Template 语法进行日志调用,因为内部日志包使用 % 格式来合并格式字符串和可变参数。在保持向后兼容性的同时,无法更改此行为,因为现有代码中所有已有的日志调用都将使用 % 格式字符串。

有人建议将格式样式与特定的记录器关联,但这种方法也会遇到向后兼容性问题,因为任何现有代码都可能正在使用给定的记录器名称并使用 % 格式。

为了使日志记录在任何第三方库和您的代码之间互操作,需要在单个日志调用的级别上做出关于格式的决策。这提供了几种可以容纳替代格式样式的方法。

使用 LogRecord 工厂

在 Python 3.2 中,除了上面提到的 Formatter 更改之外,logging 包还新增了允许用户使用 setLogRecordFactory() 函数来设置自己的 LogRecord 子类的功能。 你可以使用它来设置你自己的 LogRecord 子类,通过重写 getMessage() 方法来完成正确的事情。 此方法的基本类实现是进行 msg % args 格式化的地方,你可以在这里替换你自己的格式化方式;但是,你应该小心支持所有格式化样式并允许 % 格式作为默认格式,以确保与其他代码的互操作性。还应注意调用 str(self.msg),就像基本实现一样。

有关更多信息,请参阅关于 setLogRecordFactory()LogRecord 的参考文档。

使用自定义消息对象

还有另一种可能更简单的方法,你可以使用 {}- 和 $- 格式化来构造你的单独日志消息。 你可能还记得(来自 使用任意对象作为消息),在记录日志时,你可以使用任意对象作为消息格式字符串,并且 logging 包会调用该对象的 str() 来获取实际的格式字符串。 考虑以下两个类

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

这两个类中的任何一个都可以用来代替格式字符串,以允许使用 {}- 或 $- 格式化来构建实际的“消息”部分,该部分会出现在格式化的日志输出中,代替 “%(message)s” 或 “{message}” 或 “$message”。 如果你觉得每次想要记录某些内容时都使用类名有点笨拙,那么如果你为消息使用一个别名(例如 M_,或者如果你将 _ 用于本地化,则可以使用 __),那么就可以使其更易于使用。

下面给出了这种方法的示例。 首先,使用 str.format() 进行格式化

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

其次,使用 string.Template 进行格式化

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

需要注意的一点是,使用这种方法你不会付出显著的性能损失:实际的格式化不会在你进行日志记录调用时发生,而是在(并且如果)记录的消息即将由处理程序输出到日志时发生。 因此,唯一可能让你感到困惑的稍微不寻常的事情是,括号会围绕格式字符串和参数,而不仅仅是格式字符串。 这是因为 __ 表示法只是对上面显示的 XXXMessage 类之一的构造函数调用的语法糖。

使用 dictConfig() 配置过滤器

你可以使用 dictConfig() 配置过滤器,尽管乍一看可能不太明显如何操作(因此提供了此方法)。 由于 Filter 是标准库中包含的唯一过滤器类,并且它不太可能满足许多要求(它仅作为基类存在),因此你通常需要使用重写的 filter() 方法定义你自己的 Filter 子类。 为此,请在过滤器的配置字典中指定 () 键,指定一个可调用对象,该对象将用于创建过滤器(类是最明显的选择,但是你可以提供任何返回 Filter 实例的可调用对象)。 这是一个完整的示例

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

此示例显示了如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。 运行时,上面的脚本将打印

changed: hello

这表明该过滤器正在按配置工作。

需要注意的几个额外点

  • 如果无法在配置中直接引用可调用对象(例如,如果它位于不同的模块中,并且你无法在配置字典所在的位置直接导入它),则可以使用 访问外部对象 中描述的 ext://... 形式。 例如,你可以在上面的示例中使用文本 'ext://__main__.MyFilter' 代替 MyFilter

  • 除了用于过滤器外,此技术还可以用于配置自定义处理程序和格式化程序。 有关 logging 如何支持在其配置中使用用户定义对象的信息,请参阅 用户定义的对象,并参阅上面的其他菜谱 使用 dictConfig() 自定义处理程序

自定义异常格式化

有时你可能想要进行自定义异常格式化,例如,假设你希望每个记录的事件都只有一行,即使存在异常信息也是如此。你可以使用自定义格式化程序类来执行此操作,如以下示例所示

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super().formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super().format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

运行时,这将生成一个包含两行的文件

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

虽然上述处理方法很简单,但它指明了如何根据自己的喜好格式化异常信息。 traceback 模块可能对更专业的需要有所帮助。

语音播报日志消息

在某些情况下,可能需要以可听见而非可见的格式呈现日志消息。 如果你的系统中有文本转语音 (TTS) 功能,即使它没有 Python 绑定,也很容易做到这一点。 大多数 TTS 系统都有一个可以运行的命令行程序,可以使用 subprocess 从处理程序调用它。 此处假定 TTS 命令行程序不希望与用户交互或花费很长时间才能完成,并且日志消息的频率不会太高以至于使用户被消息淹没,并且可以接受一次说出一条消息而不是并发地说出消息,以下示例实现等待一条消息被说出后才处理下一条消息,这可能会导致其他处理程序保持等待状态。 这是一个简短的示例,展示了该方法,该方法假定可以使用 espeak TTS 包

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

运行时,此脚本应以女声说 “Hello”,然后说 “Goodbye”。

当然,上述方法可以适用于其他 TTS 系统,甚至可以适用于其他可以通过命令行运行的外部程序处理消息的系统。

缓冲日志消息并有条件地输出它们

在某些情况下,你可能希望将消息记录在临时区域中,并且仅在发生特定条件时才输出它们。 例如,你可能想要开始在函数中记录调试事件,如果函数完成且没有错误,则你不想用收集的调试信息来混乱日志,但是如果有错误,则你希望输出所有调试信息以及错误。

这里有一个示例,展示了如何使用装饰器来实现你希望的日志记录行为。它使用了logging.handlers.MemoryHandler,它允许缓冲记录的事件,直到满足某个条件时,缓冲的事件会被刷新 - 传递给另一个处理器(target 处理器)进行处理。默认情况下,当 MemoryHandler 的缓冲区被填满或遇到级别大于或等于指定阈值的事件时,它会刷新。如果你想要自定义刷新行为,可以将此方法与 MemoryHandler 的更专门的子类一起使用。

示例脚本有一个简单的函数 foo,它只是循环遍历所有日志级别,向 sys.stderr 写入即将记录的级别,然后在该级别实际记录一条消息。你可以向 foo 传递一个参数,如果为 true,它将在 ERROR 和 CRITICAL 级别记录 - 否则,它只会在 DEBUG、INFO 和 WARNING 级别记录。

该脚本只是安排用一个装饰器来装饰 foo,该装饰器将执行所需的条件日志记录。装饰器接收一个记录器作为参数,并在调用被装饰函数期间附加一个内存处理器。装饰器还可以使用目标处理器、应发生刷新的级别和缓冲区容量(缓冲记录的数量)进行参数化。这些参数默认分别为写入 sys.stderrStreamHandlerlogging.ERROR100

这是脚本:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

当运行此脚本时,应观察到以下输出:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

如你所见,只有当记录的事件的严重性为 ERROR 或更高时,才会发生实际的日志输出,但在这种情况下,任何先前较低严重性的事件也会被记录。

你当然可以使用传统的装饰方式:

@log_if_errors(logger)
def foo(fail=False):
    ...

将带有缓冲的日志消息发送到电子邮件

为了说明如何通过电子邮件发送日志消息,以便每封邮件发送一定数量的消息,你可以继承 BufferingHandler。在以下示例中,你可以根据你的特定需求进行调整,它提供了一个简单的测试工具,允许你使用命令行参数来运行脚本,指定通过 SMTP 发送邮件时通常需要的内容。(使用 -h 参数运行下载的脚本,查看所需的和可选的参数。)

import logging
import logging.handlers
import smtplib

class BufferingSMTPHandler(logging.handlers.BufferingHandler):
    def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
                 subject, capacity):
        logging.handlers.BufferingHandler.__init__(self, capacity)
        self.mailhost = mailhost
        self.mailport = port
        self.username = username
        self.password = password
        self.fromaddr = fromaddr
        if isinstance(toaddrs, str):
            toaddrs = [toaddrs]
        self.toaddrs = toaddrs
        self.subject = subject
        self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))

    def flush(self):
        if len(self.buffer) > 0:
            try:
                smtp = smtplib.SMTP(self.mailhost, self.mailport)
                smtp.starttls()
                smtp.login(self.username, self.password)
                msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
                for record in self.buffer:
                    s = self.format(record)
                    msg = msg + s + "\r\n"
                smtp.sendmail(self.fromaddr, self.toaddrs, msg)
                smtp.quit()
            except Exception:
                if logging.raiseExceptions:
                    raise
            self.buffer = []

if __name__ == '__main__':
    import argparse

    ap = argparse.ArgumentParser()
    aa = ap.add_argument
    aa('host', metavar='HOST', help='SMTP server')
    aa('--port', '-p', type=int, default=587, help='SMTP port')
    aa('user', metavar='USER', help='SMTP username')
    aa('password', metavar='PASSWORD', help='SMTP password')
    aa('to', metavar='TO', help='Addressee for emails')
    aa('sender', metavar='SENDER', help='Sender email address')
    aa('--subject', '-s',
       default='Test Logging email from Python logging module (buffering)',
       help='Subject of email')
    options = ap.parse_args()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    h = BufferingSMTPHandler(options.host, options.port, options.user,
                             options.password, options.sender,
                             options.to, options.subject, 10)
    logger.addHandler(h)
    for i in range(102):
        logger.info("Info index = %d", i)
    h.flush()
    h.close()

如果你运行此脚本并且你的 SMTP 服务器设置正确,你应该发现它向你指定的收件人发送了 11 封电子邮件。前 10 封电子邮件将分别包含 10 条日志消息,第 11 封电子邮件将包含 2 条消息。这构成了脚本中指定的 102 条消息。

通过配置使用 UTC (GMT) 格式化时间

有时你希望使用 UTC 格式化时间,这可以使用如下所示的 UTCFormatter 类来完成:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

然后你可以在代码中使用 UTCFormatter 而不是 Formatter。如果你想通过配置来做到这一点,你可以使用 dictConfig() API,其方法如下面的完整示例所示:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

运行此脚本时,应打印如下内容:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

显示时间如何以本地时间和 UTC 格式化,每个处理器一个。

使用上下文管理器进行选择性日志记录

有时,临时更改日志记录配置并在执行某些操作后恢复它会很有用。为此,上下文管理器是保存和恢复日志记录上下文的最明显方式。这是一个简单的上下文管理器示例,它允许你选择性地更改日志记录级别,并在上下文管理器的范围内纯粹地添加日志记录处理器:

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

如果你指定了级别值,则在上下文管理器覆盖的 with 块的范围内,记录器的级别将设置为该值。如果你指定了一个处理器,则在进入该块时将其添加到记录器,并在退出该块时将其删除。你还可以要求管理器在块退出时为你关闭处理器 - 如果你不再需要该处理器,则可以这样做。

为了说明它是如何工作的,我们可以将以下代码块添加到上面:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

我们最初将记录器的级别设置为 INFO,因此显示消息 #1,而不显示消息 #2。然后,我们在下面的 with 块中临时将级别更改为 DEBUG,因此显示消息 #3。在该块退出后,记录器的级别将恢复为 INFO,因此不显示消息 #4。在下一个 with 块中,我们再次将级别设置为 DEBUG,但还添加了一个写入 sys.stdout 的处理器。因此,消息 #5 在控制台上显示两次(一次通过 stderr,一次通过 stdout)。在 with 语句完成后,状态恢复到之前的状态,因此显示消息 #6(如消息 #1),而消息 #7 不显示(如消息 #2)。

如果我们运行生成的脚本,结果如下:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

如果我们再次运行它,但将 stderr 管道到 /dev/null,我们会看到以下内容,这是唯一写入 stdout 的消息:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

再次,但将 stdout 管道到 /dev/null,我们会得到:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

在这种情况下,按预期,打印到 stdout 的消息 #5 不会显示。

当然,这里描述的方法可以推广,例如临时附加日志记录过滤器。请注意,上面的代码在 Python 2 和 Python 3 中都可以工作。

CLI 应用程序启动模板

这是一个示例,展示了如何:

  • 使用基于命令行参数的日志级别

  • 在单独的文件中调度到多个子命令,所有子命令都以一致的方式在同一级别进行日志记录

  • 利用简单、最小的配置

假设我们有一个命令行应用程序,其任务是停止、启动或重新启动某些服务。为了便于说明,可以将其组织为一个文件 app.py,它是应用程序的主脚本,各个命令在 start.pystop.pyrestart.py 中实现。进一步假设我们想通过命令行参数控制应用程序的详细程度,默认为 logging.INFO。以下是编写 app.py 的一种方法:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

并且 startstoprestart 命令可以在单独的模块中实现,如下所示用于启动:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

用于停止的:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

同样用于重新启动:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

如果我们以默认日志级别运行此应用程序,我们将得到如下输出:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

第一个单词是日志级别,第二个单词是记录事件的位置的模块或包名称。

如果我们更改日志级别,则可以更改发送到日志的信息。例如,如果我们想要更多信息:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

如果我们想要更少的信息:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

在这种情况下,这些命令不会向控制台打印任何内容,因为它们没有记录任何 WARNING 级别或更高级别的消息。

用于日志记录的 Qt GUI

不时出现的一个问题是如何记录到 GUI 应用程序。 Qt 框架是一个流行的跨平台 UI 框架,它使用 PySide2PyQt5 库进行 Python 绑定。

以下示例展示了如何记录到 Qt GUI。这引入了一个简单的 QtHandler 类,它接受一个可调用对象,该对象应是主线程中执行 GUI 更新的槽。还创建了一个工作线程,以展示如何从 UI 本身(通过按钮进行手动日志记录)以及在后台执行工作的工作线程(此处只是以随机级别记录消息,并在其间随机短暂延迟)向 GUI 记录日志。

工作线程使用 Qt 的 QThread 类实现,而不是 threading 模块,因为在某些情况下必须使用 QThread,它可以更好地与其他 Qt 组件集成。

此代码应适用于 PySide6PyQt6PySide2PyQt5 的最新版本。您应该能够将此方法应用于早期版本的 Qt。请参阅代码片段中的注释以获取更详细的信息。

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between different Qt packages
try:
    from PySide6 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    try:
        from PyQt6 import QtCore, QtGui, QtWidgets
        Signal = QtCore.pyqtSignal
        Slot = QtCore.pyqtSlot
    except ImportError:
        try:
            from PySide2 import QtCore, QtGui, QtWidgets
            Signal = QtCore.Signal
            Slot = QtCore.Slot
        except ImportError:
            from PyQt5 import QtCore, QtGui, QtWidgets
            Signal = QtCore.pyqtSignal
            Slot = QtCore.pyqtSlot

logger = logging.getLogger(__name__)


#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            try:
                if random.random() < 0.1:
                    raise ValueError('Exception raised: %d' % i)
                else:
                    level = random.choice(LEVELS)
                    logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            except ValueError as e:
                logger.exception('Failed: %s', e, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super().__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        if hasattr(f, 'Monospace'):
            f.setStyleHint(f.Monospace)
        else:
            f.setStyleHint(f.StyleHint.Monospace)  # for Qt6
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    if hasattr(app, 'exec'):
        rc = app.exec()
    else:
        rc = app.exec_()
    sys.exit(rc)

if __name__=='__main__':
    main()

使用 RFC5424 支持记录到 syslog

尽管 RFC 5424 的日期可以追溯到 2009 年,但大多数 syslog 服务器默认配置为使用较旧的 RFC 3164,后者可以追溯到 2001 年。当 logging 于 2003 年添加到 Python 时,它支持当时较早的(也是唯一的)协议。自从 RFC5424 发布以来,由于它在 syslog 服务器中没有得到广泛部署,因此 SysLogHandler 功能尚未更新。

RFC 5424 包含一些有用的功能,例如对结构化数据的支持,如果您需要能够记录到支持它的 syslog 服务器,则可以使用类似于以下内容的子类处理程序来实现

import datetime
import logging.handlers
import re
import socket
import time

class SysLogHandler5424(logging.handlers.SysLogHandler):

    tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
    escaped = re.compile(r'([\]"\\])')

    def __init__(self, *args, **kwargs):
        self.msgid = kwargs.pop('msgid', None)
        self.appname = kwargs.pop('appname', None)
        super().__init__(*args, **kwargs)

    def format(self, record):
        version = 1
        asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
        m = self.tz_offset.match(time.strftime('%z'))
        has_offset = False
        if m and time.timezone:
            hrs, mins = m.groups()
            if int(hrs) or int(mins):
                has_offset = True
        if not has_offset:
            asctime += 'Z'
        else:
            asctime += f'{hrs}:{mins}'
        try:
            hostname = socket.gethostname()
        except Exception:
            hostname = '-'
        appname = self.appname or '-'
        procid = record.process
        msgid = '-'
        msg = super().format(record)
        sdata = '-'
        if hasattr(record, 'structured_data'):
            sd = record.structured_data
            # This should be a dict where the keys are SD-ID and the value is a
            # dict mapping PARAM-NAME to PARAM-VALUE (refer to the RFC for what these
            # mean)
            # There's no error checking here - it's purely for illustration, and you
            # can adapt this code for use in production environments
            parts = []

            def replacer(m):
                g = m.groups()
                return '\\' + g[0]

            for sdid, dv in sd.items():
                part = f'[{sdid}'
                for k, v in dv.items():
                    s = str(v)
                    s = self.escaped.sub(replacer, s)
                    part += f' {k}="{s}"'
                part += ']'
                parts.append(part)
            sdata = ''.join(parts)
        return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'

您需要熟悉 RFC 5424 才能完全理解以上代码,并且您可能有一些略有不同的需求(例如,如何将结构化数据传递给日志)。但是,以上内容应该可以根据您的具体需求进行调整。使用上述处理程序,您可以使用类似以下方式传递结构化数据

sd = {
    'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
    'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)

如何将记录器视为输出流

有时,您需要与期望写入类似文件的对象的第三方 API 接口,但您希望将 API 的输出定向到记录器。您可以使用一个类来包装具有类似文件 API 的记录器来实现此目的。这是一个演示此类类的简短脚本

import logging

class LoggerWriter:
    def __init__(self, logger, level):
        self.logger = logger
        self.level = level

    def write(self, message):
        if message != '\n':  # avoid printing bare newlines, if you like
            self.logger.log(self.level, message)

    def flush(self):
        # doesn't actually do anything, but might be expected of a file-like
        # object - so optional depending on your situation
        pass

    def close(self):
        # doesn't actually do anything, but might be expected of a file-like
        # object - so optional depending on your situation. You might want
        # to set a flag so that later calls to write raise an exception
        pass

def main():
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger('demo')
    info_fp = LoggerWriter(logger, logging.INFO)
    debug_fp = LoggerWriter(logger, logging.DEBUG)
    print('An INFO message', file=info_fp)
    print('A DEBUG message', file=debug_fp)

if __name__ == "__main__":
    main()

运行此脚本时,它会打印

INFO:demo:An INFO message
DEBUG:demo:A DEBUG message

您还可以使用 LoggerWriter 通过执行类似以下的操作来重定向 sys.stdoutsys.stderr

import sys

sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)

您应该在为您的需求配置日志记录之后执行此操作。在上面的示例中,basicConfig() 调用执行此操作(使用 sys.stderr 值,在它被 LoggerWriter 实例覆盖之前)。然后,您将获得这样的结果

>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>

当然,上面的示例显示了根据 basicConfig() 使用的格式的输出,但是您可以在配置日志记录时使用不同的格式化程序。

请注意,使用上述方案,您在某种程度上会受到缓冲和您正在拦截的写入调用序列的影响。例如,对于上述 LoggerWriter 的定义,如果您有以下代码片段

sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0

然后运行该脚本会导致

WARNING:demo:Traceback (most recent call last):

WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>

WARNING:demo:
WARNING:demo:main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main

WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero

如您所见,此输出并不理想。这是因为写入 sys.stderr 的底层代码会进行多次写入,每次写入都会导致单独的记录行(例如,上面的最后三行)。要解决此问题,您需要缓冲数据,并且仅在新行出现时输出日志行。让我们使用一个稍微好一点的 LoggerWriter 实现

class BufferingLoggerWriter(LoggerWriter):
    def __init__(self, logger, level):
        super().__init__(logger, level)
        self.buffer = ''

    def write(self, message):
        if '\n' not in message:
            self.buffer += message
        else:
            parts = message.split('\n')
            if self.buffer:
                s = self.buffer + parts.pop(0)
                self.logger.log(self.level, s)
            self.buffer = parts.pop()
            for part in parts:
                self.logger.log(self.level, part)

这只是缓冲数据直到看到新行,然后记录完整的行。使用这种方法,您将获得更好的输出

WARNING:demo:Traceback (most recent call last):
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo:    main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo:    1/0
WARNING:demo:ZeroDivisionError: division by zero

要避免的模式

尽管前面的章节描述了您可能需要执行或处理的方法,但值得一提的是一些无益的使用模式,因此在大多数情况下应避免这些模式。以下各节没有特定的顺序。

多次打开同一个日志文件

在 Windows 上,您通常无法多次打开同一个文件,因为这会导致“文件正在被另一个进程使用”的错误。但是,在 POSIX 平台上,如果您多次打开同一个文件,则不会收到任何错误。这可能是意外发生的,例如通过

  • 多次添加引用同一个文件的文件处理程序(例如,通过复制/粘贴/忘记更改错误)。

  • 打开两个看起来不同的文件,因为它们具有不同的名称,但是它们是相同的,因为一个是另一个的符号链接。

  • 派生一个进程,之后父进程和子进程都具有对同一个文件的引用。例如,这可能是通过使用 multiprocessing 模块来实现的。

多次打开一个文件可能在大多数情况下看起来可以工作,但实际上会导致许多问题

  • 日志输出可能会被搞乱,因为多个线程或进程尝试写入同一个文件。尽管日志记录可以防止多个线程同时使用同一个处理程序实例,但是如果两个不同的线程使用两个不同的处理程序实例(恰好指向同一个文件)尝试同时写入,则不会有这种保护。

  • 删除文件(例如,在文件轮换期间)的尝试会静默失败,因为还有另一个引用指向它。这会导致混乱和浪费调试时间 - 日志条目最终出现在意外的位置,或者完全丢失。或者,本应移动的文件仍保留在原位,并且尽管应该进行基于大小的轮换,但其大小却出乎意料地增长。

使用 从多个进程记录到单个文件 中概述的技术来规避此类问题。

在类中将记录器用作属性或将它们作为参数传递

尽管在某些不寻常的情况下您可能需要这样做,但通常没有必要这样做,因为记录器是单例。代码始终可以使用 logging.getLogger(name) 按名称访问给定的记录器实例,因此传递实例并将它们保留为实例属性毫无意义。请注意,在其他语言(例如 Java 和 C#)中,记录器通常是静态类属性。但是,这种模式在 Python 中没有意义,因为模块(而不是类)是软件分解的单位。

在库中将 NullHandler 之外的处理程序添加到记录器

通过添加处理程序、格式化程序和筛选器来配置日志记录是应用程序开发人员的责任,而不是库开发人员的责任。如果您正在维护库,请确保除了 NullHandler 实例之外,不要将处理程序添加到任何记录器。

创建大量记录器

记录器是脚本执行期间永远不会释放的单例,因此创建大量记录器会占用无法释放的内存。与其为每个已处理的文件或建立的网络连接创建一个记录器,不如使用用于将上下文信息传递到日志中的现有机制,并将创建的记录器限制为那些描述应用程序内区域的记录器(通常是模块,但有时比模块更精细)。

其他资源

另请参阅

模块 logging

日志记录模块的 API 参考。

模块 logging.config

日志记录模块的配置 API。

模块 logging.handlers

日志记录模块中包含的有用处理程序。

基本教程

高级教程