日志处理手册

作者:

Vinay Sajip <vinay_sajip at red-dove dot com>

本页面包含一些与日志处理相关的实用技巧,它们在过去已被证明很有用。有关教程和参考信息的链接,请参阅 其他资源

在多个模块中使用日志

多次调用 logging.getLogger('someLogger') 会返回对同一个日志记录器对象的引用。这不仅在同一个模块中如此,在同一个 Python 解释器进程中跨模块也是如此。它适用于对同一个对象的引用;此外,应用程序代码可以在一个模块中定义和配置父日志记录器,并在一个单独的模块中创建(但不配置)子日志记录器,所有对子日志记录器的调用都将传递给父日志记录器。这是一个主模块

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

这是辅助模块

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

输出如下所示

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

从多个线程记录日志

从多个线程记录日志不需要特殊操作。以下示例显示了从主(初始)线程和另一个线程记录日志

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

运行时,脚本应该打印出类似以下内容

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

这显示了日志输出如预期般交错。当然,这种方法适用于比此处所示更多的线程。

多个处理程序和格式化程序

日志记录器是普通的 Python 对象。addHandler() 方法对您可以添加的处理程序数量没有最低或最高配额。有时,应用程序将所有严重级别的所有消息记录到文本文件,同时将错误或更高级别的消息记录到控制台,这会很有益。要设置此功能,只需配置适当的处理程序即可。应用程序代码中的日志记录调用将保持不变。这是对以前基于模块的简单配置示例的稍微修改

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

请注意,“应用程序”代码不关心多个处理程序。唯一改变的是添加和配置了一个名为 *fh* 的新处理程序。

创建具有更高或更低严重性过滤器的新处理程序的能力在编写和测试应用程序时非常有用。与其使用许多 print 语句进行调试,不如使用 logger.debug:与您以后必须删除或注释掉的 print 语句不同,logger.debug 语句可以在源代码中保持完整并保持休眠状态,直到您再次需要它们。届时,唯一需要做的更改是修改日志记录器和/或处理程序的严重性级别以进行调试。

日志记录到多个目的地

假设您想在不同情况下以不同的消息格式将日志记录到控制台和文件。假设您想将 DEBUG 及更高级别的消息记录到文件,并将 INFO 及更高级别的消息记录到控制台。我们还假设文件应包含时间戳,但控制台消息不应包含。以下是您实现此目的的方法

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/tmp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

运行此程序时,您将在控制台上看到

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

在文件中,您将看到类似以下内容

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

如您所见,DEBUG 消息只显示在文件中。其他消息则发送到两个目的地。

此示例使用控制台和文件处理程序,但您可以使用任意数量和组合的处理程序。

请注意,上面选择的日志文件名 /tmp/myapp.log 意味着在 POSIX 系统上使用临时文件的标准位置。在 Windows 上,您可能需要为日志选择一个不同的目录名 - 只需确保该目录存在,并且您有权限在该目录中创建和更新文件。

自定义级别处理

有时,您可能希望在处理程序中对级别进行与标准处理略有不同的操作,即所有高于某个阈值的级别都由处理程序处理。为此,您需要使用过滤器。让我们看一个您希望按以下方式安排场景的示例

  • 将严重性为 INFOWARNING 的消息发送到 sys.stdout

  • 将严重性为 ERROR 及以上的信息发送到 sys.stderr

  • DEBUG 及以上严重级别的消息发送到文件 app.log

假设您使用以下 JSON 配置日志记录

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout"
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}

这个配置 几乎 达到了我们想要的效果,除了 sys.stdout 会显示严重级别为 ERROR 的消息,并且只有此严重级别及更高的事件以及 INFOWARNING 消息才会被跟踪。为了防止这种情况,我们可以设置一个过滤器来排除这些消息,并将其添加到相关的处理程序中。这可以通过在 formattershandlers 旁边添加一个 filters 部分来配置

{
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    }
}

并修改 stdout 处理程序的部分以添加它

{
    "stdout": {
        "class": "logging.StreamHandler",
        "level": "INFO",
        "formatter": "simple",
        "stream": "ext://sys.stdout",
        "filters": ["warnings_and_below"]
    }
}

过滤器只是一个函数,所以我们可以将 filter_maker(一个工厂函数)定义如下

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

这将传入的字符串参数转换为数字级别,并返回一个函数,该函数仅在传入记录的级别等于或低于指定级别时返回 True。请注意,在此示例中,我在一个从命令行运行的测试脚本 main.py 中定义了 filter_maker,因此其模块将是 __main__ - 因此过滤器配置中的 __main__.filter_maker。如果您在不同的模块中定义它,则需要更改它。

添加过滤器后,我们可以运行完整的 main.py 脚本,如下所示

import json
import logging
import logging.config

CONFIG = '''
{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout",
            "filters": ["warnings_and_below"]
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}
'''

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')

并像这样运行它之后

python main.py 2>stderr.log >stdout.log

我们可以看到结果符合预期

$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG    - A DEBUG message
INFO     - An INFO message
WARNING  - A WARNING message
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO     - An INFO message
WARNING  - A WARNING message

配置服务器示例

这是一个使用日志配置服务器的模块示例

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

这是一个脚本,它接受一个文件名,并将该文件发送到服务器,前面适当地加上二进制编码的长度,作为新的日志配置

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

处理阻塞的处理程序

有时您必须让日志处理程序完成其工作,而不会阻塞您正在记录的线程。这在 Web 应用程序中很常见,当然也发生在其他场景中。

一个常见的罪魁祸首,表现出缓慢行为的是 SMTPHandler:发送电子邮件可能需要很长时间,原因有很多,超出了开发人员的控制范围(例如,性能不佳的邮件或网络基础设施)。但几乎任何基于网络的处理程序都可能阻塞:即使是 SocketHandler 操作也可能会在底层执行过慢的 DNS 查询(此查询可能深入到套接字库代码中,在 Python 层之下,并且超出您的控制)。

一种解决方案是采用两步法。第一步,仅将 QueueHandler 附加到那些从性能关键型线程访问的日志记录器。它们只是写入它们的队列,队列的大小可以设置得足够大,或者初始化时没有大小上限。写入队列通常会很快被接受,尽管您可能需要在代码中捕获 queue.Full 异常作为预防措施。如果您是库开发人员,并且代码中存在性能关键型线程,请务必记录这一点(以及建议仅将 QueueHandlers 附加到您的日志记录器),以造福使用您代码的其他开发人员。

解决方案的第二部分是 QueueListener,它被设计为 QueueHandler 的对应物。QueueListener 非常简单:它被传递一个队列和一些处理程序,然后启动一个内部线程,该线程监听其队列中来自 QueueHandlers(或任何其他 LogRecords 来源)发送的 LogRecords。LogRecords 从队列中移除并传递给处理程序进行处理。

拥有一个单独的 QueueListener 类的好处是,您可以使用同一个实例来服务多个 QueueHandlers。这比拥有现有处理程序类的线程版本(每个处理程序将消耗一个线程而没有任何特殊好处)更节省资源。

下面是一个使用这两个类的示例(省略了导入)

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

运行时将生成

MainThread: Look out!

备注

尽管前面的讨论并非专门针对异步代码,而是关于慢速日志处理程序,但应该注意的是,从异步代码记录日志时,网络甚至文件处理程序都可能导致问题(阻塞事件循环),因为某些日志记录是从 asyncio 内部完成的。如果应用程序中使用任何异步代码,最好采用上述方法进行日志记录,这样任何阻塞代码只在 QueueListener 线程中运行。

版本 3.5 中的变更: 在 Python 3.5 之前,QueueListener 总是将从队列接收到的每条消息传递给它所初始化的每个处理程序。(这是因为假设所有级别过滤都在另一端完成,即队列被填充的地方。)从 3.5 开始,可以通过向监听器的构造函数传递关键字参数 respect_handler_level=True 来改变此行为。这样做后,监听器会将每条消息的级别与处理程序的级别进行比较,并且只有在适当的情况下才将消息传递给处理程序。

版本 3.14 中的变更: QueueListener 可以通过 with 语句启动(和停止)。例如

with QueueListener(que, handler) as listener:
    # The queue listener automatically starts
    # when the 'with' block is entered.
    pass
# The queue listener automatically stops once
# the 'with' block is exited.

跨网络发送和接收日志事件

假设您想通过网络发送日志事件,并在接收端进行处理。一种简单的方法是在发送端将 SocketHandler 实例附加到根日志记录器

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

在接收端,您可以使用 socketserver 模块设置接收器。这是一个基本的实用示例

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

首先运行服务器,然后运行客户端。在客户端,控制台上不打印任何内容;在服务器端,您应该会看到类似以下内容

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

请注意,在某些场景中,pickle 存在一些安全问题。如果这些问题影响到您,您可以通过覆盖 makePickle() 方法并在其中实现您的替代方案,以及调整上述脚本以使用您的替代序列化方案,来使用替代的序列化方案。

在生产环境中运行日志套接字监听器

要在生产环境中运行日志监听器,您可能需要使用像 Supervisor 这样的进程管理工具。这里是一个 Gist,它提供了使用 Supervisor 运行上述功能的骨干文件。它包含以下文件

文件

用途

prepare.sh

一个用于准备测试环境的 Bash 脚本

supervisor.conf

Supervisor 配置文件,其中包含监听器和多进程 Web 应用程序的条目

ensure_app.sh

一个 Bash 脚本,用于确保 Supervisor 正在运行上述配置

log_listener.py

接收日志事件并将其记录到文件的套接字监听程序

main.py

一个通过连接到监听器的套接字执行日志记录的简单 Web 应用程序

webapp.json

Web 应用程序的 JSON 配置文件

client.py

一个用于运行 Web 应用程序的 Python 脚本

这个网络应用程序使用 Gunicorn,它是一个流行的网络应用程序服务器,会启动多个工作进程来处理请求。这个示例设置展示了工作进程如何写入同一个日志文件而不会相互冲突——它们都通过套接字监听器。

要在 POSIX 环境中测试这些文件,请执行以下操作

  1. 使用 Download ZIP 按钮将 Gist 下载为 ZIP 归档文件。

  2. 将上述文件从归档文件中解压到一个临时目录中。

  3. 在临时目录中,运行 bash prepare.sh 进行准备。这将创建一个 run 子目录,用于存放 Supervisor 相关文件和日志文件,以及一个 venv 子目录,用于存放一个虚拟环境,其中安装了 bottlegunicornsupervisor

  4. 运行 bash ensure_app.sh 以确保 Supervisor 正在运行上述配置。

  5. 运行 venv/bin/python client.py 来运行 Web 应用程序,这将导致记录被写入日志。

  6. 检查 run 子目录中的日志文件。您应该会在与模式 app.log* 匹配的文件中看到最新的日志行。它们不会以任何特定顺序排列,因为它们已由不同的工作进程以非确定性方式并发处理。

  7. 您可以通过运行 venv/bin/supervisorctl -c supervisor.conf shutdown 来关闭监听器和 Web 应用程序。

万一配置的端口与测试环境中的其他东西发生冲突,您可能需要调整配置文件。

默认配置使用端口 9020 上的 TCP 套接字。您可以改为使用 Unix 域套接字而不是 TCP 套接字,方法如下

  1. listener.json 中,添加一个 socket 键,其值为您要使用的域套接字路径。如果存在此键,则监听器将监听相应的域套接字而不是 TCP 套接字(port 键将被忽略)。

  2. webapp.json 中,更改套接字处理程序配置字典,使 host 值为域套接字路径,并将 port 值设置为 null

向日志输出添加上下文信息

有时,您希望日志输出除了传递给日志记录调用的参数外,还包含上下文信息。例如,在网络应用程序中,可能希望在日志中记录客户端特定的信息(例如,远程客户端的用户名或 IP 地址)。尽管您可以使用 *extra* 参数来实现此目的,但以这种方式传递信息并不总是方便。虽然创建每个连接的 Logger 实例可能很诱人,但这不是一个好主意,因为这些实例不会被垃圾回收。虽然这在实践中不是问题,但当 Logger 实例的数量取决于您希望在日志记录应用程序中使用的粒度级别时,如果 Logger 实例的数量变得实际上无限大,则可能难以管理。

使用 LoggerAdapters 传递上下文信息

将上下文信息与日志事件信息一起输出的一种简单方法是使用 LoggerAdapter 类。此类的设计类似于 Logger,因此您可以调用 debug()info()warning()error()exception()critical()log()。这些方法具有与其在 Logger 中的对应方法相同的签名,因此您可以互换使用这两种类型的实例。

当您创建 LoggerAdapter 实例时,您会传递给它一个 Logger 实例和一个包含您的上下文信息的类字典对象。当您在 LoggerAdapter 实例上调用其中一个日志记录方法时,它会将调用委托给传递给其构造函数的底层 Logger 实例,并安排在委托调用中传递上下文信息。以下是 LoggerAdapter 代码片段

def debug(self, msg, /, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess() 方法是上下文信息被添加到日志输出的地方。它被传递日志调用消息和关键字参数,并传回(可能)修改后的版本,以便在调用底层日志记录器时使用。此方法的默认实现保持消息不变,但在关键字参数中插入一个 'extra' 键,其值是传递给构造函数的类字典对象。当然,如果您在调用适配器时传递了 'extra' 关键字参数,它将被静默覆盖。

使用 'extra' 的优点是,类字典对象中的值会合并到 LogRecord 实例的 __dict__ 中,从而允许您使用自定义字符串与了解类字典对象键的 Formatter 实例。如果您需要不同的方法,例如,如果您想将上下文信息前置或附加到消息字符串,您只需要子类化 LoggerAdapter 并重写 process() 以实现您所需的功能。这是一个简单的示例

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

你可以这样使用它

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

然后,您记录到适配器的所有事件都将把 some_conn_id 的值前置到日志消息中。

使用非字典对象传递上下文信息

您不需要将实际的字典传递给 LoggerAdapter - 您可以传递一个实现了 __getitem____iter__ 的类的实例,这样它在日志记录看来就像一个字典。如果您想动态生成值(而字典中的值将是常量),这将很有用。

使用过滤器传递上下文信息

您还可以使用用户定义的 Filter 向日志输出添加上下文信息。Filter 实例允许修改传递给它们的 LogRecords,包括添加额外的属性,然后可以使用合适的格式字符串输出这些属性,或者如果需要,可以使用自定义 Formatter

例如,在 Web 应用程序中,正在处理的请求(或至少是其中有趣的部分)可以存储在线程本地 (threading.local) 变量中,然后从 Filter 中访问,以添加例如来自请求的信息——例如,远程 IP 地址和远程用户的用户名——到 LogRecord 中,使用属性名 'ip' 和 'user',如上面的 LoggerAdapter 示例所示。在这种情况下,可以使用相同的格式字符串获得与上面所示类似的输出。这是一个示例脚本

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

运行时,会生成类似如下的内容

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

使用 contextvars

自 Python 3.7 起,contextvars 模块提供了上下文本地存储,它适用于 threadingasyncio 处理需求。因此,这种类型的存储通常优于线程本地存储。以下示例展示了在多线程环境中,日志如何通过上下文信息(例如,Web 应用程序处理的请求属性)填充。

为了说明目的,假设您有不同的 Web 应用程序,它们彼此独立但在同一个 Python 进程中运行,并使用它们共用的一个库。这些应用程序如何拥有自己的日志,其中所有来自库(和其他请求处理代码)的日志消息都指向适当的应用程序日志文件,同时在日志中包含额外的上下文信息,例如客户端 IP、HTTP 请求方法和客户端用户名?

我们假设库可以通过以下代码进行模拟

# webapplib.py
import logging
import time

logger = logging.getLogger(__name__)

def useful():
    # Just a representative event logged from the library
    logger.debug('Hello from webapplib!')
    # Just sleep for a bit so other threads get to run
    time.sleep(0.01)

我们可以通过两个简单的类 RequestWebApp 来模拟多个 Web 应用程序。这些类模拟了真实的线程化 Web 应用程序的工作方式——每个请求都由一个线程处理

# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib

logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)

class Request:
    """
    A simple dummy request class which just holds dummy HTTP request method,
    client IP address and client username
    """
    def __init__(self, method, ip, user):
        self.method = method
        self.ip = ip
        self.user = user

# A dummy set of requests which will be used in the simulation - we'll just pick
# from this list randomly. Note that all GET requests are from 192.168.2.XXX
# addresses, whereas POST requests are from 192.16.3.XXX addresses. Three users
# are represented in the sample requests.

REQUESTS = [
    Request('GET', '192.168.2.20', 'jim'),
    Request('POST', '192.168.3.20', 'fred'),
    Request('GET', '192.168.2.21', 'sheila'),
    Request('POST', '192.168.3.21', 'jim'),
    Request('GET', '192.168.2.22', 'fred'),
    Request('POST', '192.168.3.22', 'sheila'),
]

# Note that the format string includes references to request context information
# such as HTTP method, client IP and username

formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')

# Create our context variables. These will be filled at the start of request
# processing, and used in the logging that happens during that processing

ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')

class InjectingFilter(logging.Filter):
    """
    A filter which injects context-specific information into logs and ensures
    that only information for a specific webapp is included in its log
    """
    def __init__(self, app):
        self.app = app

    def filter(self, record):
        request = ctx_request.get()
        record.method = request.method
        record.ip = request.ip
        record.user = request.user
        record.appName = appName = ctx_appname.get()
        return appName == self.app.name

class WebApp:
    """
    A dummy web application class which has its own handler and filter for a
    webapp-specific log.
    """
    def __init__(self, name):
        self.name = name
        handler = logging.FileHandler(name + '.log', 'w')
        f = InjectingFilter(self)
        handler.setFormatter(formatter)
        handler.addFilter(f)
        root.addHandler(handler)
        self.num_requests = 0

    def process_request(self, request):
        """
        This is the dummy method for processing a request. It's called on a
        different thread for every request. We store the context information into
        the context vars before doing anything else.
        """
        ctx_request.set(request)
        ctx_appname.set(self.name)
        self.num_requests += 1
        logger.debug('Request processing started')
        webapplib.useful()
        logger.debug('Request processing finished')

def main():
    fn = os.path.splitext(os.path.basename(__file__))[0]
    adhf = argparse.ArgumentDefaultsHelpFormatter
    ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
                                 description='Simulate a couple of web '
                                             'applications handling some '
                                             'requests, showing how request '
                                             'context can be used to '
                                             'populate logs')
    aa = ap.add_argument
    aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
    options = ap.parse_args()

    # Create the dummy webapps and put them in a list which we can use to select
    # from randomly
    app1 = WebApp('app1')
    app2 = WebApp('app2')
    apps = [app1, app2]
    threads = []
    # Add a common handler which will capture all events
    handler = logging.FileHandler('app.log', 'w')
    handler.setFormatter(formatter)
    root.addHandler(handler)

    # Generate calls to process requests
    for i in range(options.count):
        try:
            # Pick an app at random and a request for it to process
            app = choice(apps)
            request = choice(REQUESTS)
            # Process the request in its own thread
            t = threading.Thread(target=app.process_request, args=(request,))
            threads.append(t)
            t.start()
        except KeyboardInterrupt:
            break

    # Wait for the threads to terminate
    for t in threads:
        t.join()

    for app in apps:
        print('%s processed %s requests' % (app.name, app.num_requests))

if __name__ == '__main__':
    main()

如果您运行上述代码,您会发现大约一半的请求进入 app1.log,其余的进入 app2.log,并且所有请求都记录到 app.log。每个特定于 Web 应用程序的日志将只包含该 Web 应用程序的日志条目,并且请求信息将一致地显示在日志中(即,每个模拟请求中的信息将始终一起出现在日志行中)。这由以下 shell 输出所示

~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
  153 app1.log
  147 app2.log
  300 app.log
  600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim    192.168.2.20 GET  Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__  fred   192.168.2.22 GET  Request processing started
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred   192.168.2.22 GET  Hello from webapplib!
Thread-6 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147

在处理程序中传递上下文信息

每个 Handler 都有自己的过滤器链。如果您想向 LogRecord 添加上下文信息而不将其泄漏给其他处理程序,您可以使用一个返回新的 LogRecord 而不是就地修改它的过滤器,如下面的脚本所示

import copy
import logging

def filter(record: logging.LogRecord):
    record = copy.copy(record)
    record.user = 'jim'
    return record

if __name__ == '__main__':
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(message)s from %(user)-8s')
    handler.setFormatter(formatter)
    handler.addFilter(filter)
    logger.addHandler(handler)

    logger.info('A log message')

从多个进程记录到单个文件

尽管日志记录是线程安全的,并且在一个进程中从多个线程记录到单个文件 支持的,但从 多个进程 记录到单个文件 受支持,因为在 Python 中没有标准方法可以跨多个进程序列化对单个文件的访问。如果您需要从多个进程记录到单个文件,一种方法是让所有进程记录到 SocketHandler,并有一个单独的进程实现一个套接字服务器,该服务器从套接字读取并记录到文件。(如果您愿意,可以将其中一个现有进程中的一个线程专门用于执行此功能。)本节 更详细地记录了这种方法,并包含一个可工作的套接字接收器,可以作为您在自己的应用程序中进行调整的起点。

您也可以编写自己的处理程序,该处理程序使用 Lock 类从 multiprocessing 模块来序列化进程对文件的访问。标准库 FileHandler 及其子类不使用 multiprocessing

或者,您可以使用 QueueQueueHandler 将所有日志事件发送到多进程应用程序中的一个进程。以下示例脚本演示了如何做到这一点;在示例中,一个单独的监听器进程监听其他进程发送的事件,并根据自己的日志配置进行记录。尽管该示例只演示了一种方法(例如,您可能希望使用监听器线程而不是单独的监听器进程——实现方式类似),但它允许监听器和应用程序中的其他进程使用完全不同的日志配置,并可以作为满足您特定要求的代码的基础

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上述脚本的一个变体将日志记录保存在主进程中,在一个单独的线程中

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

这个变体展示了您如何例如为特定的日志记录器应用配置——例如,foo 日志记录器有一个特殊的处理程序,它将 foo 子系统中的所有事件存储在一个文件 mplog-foo.log 中。这将由主进程中的日志记录机制使用(即使日志事件是在工作进程中生成的)来将消息定向到适当的目的地。

使用 concurrent.futures.ProcessPoolExecutor

如果您想使用 concurrent.futures.ProcessPoolExecutor 来启动您的工作进程,您需要稍微不同地创建队列。而不是

queue = multiprocessing.Queue(-1)

你应该使用

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

然后你可以将 worker 创建从这里替换

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

到这里(记得首先导入 concurrent.futures

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

使用 Gunicorn 和 uWSGI 部署 Web 应用程序

当使用 GunicornuWSGI(或类似工具)部署 Web 应用程序时,会创建多个工作进程来处理客户端请求。在这种环境中,避免直接在 Web 应用程序中创建基于文件的处理程序。相反,使用 SocketHandler 从 Web 应用程序向单独进程中的监听器记录日志。这可以使用 Supervisor 等进程管理工具进行设置——有关详细信息,请参阅 在生产环境中运行日志套接字监听器

使用文件轮换

有时,您希望让日志文件增长到一定大小,然后打开一个新文件并记录到其中。您可能希望保留一定数量的这些文件,并且当创建了这么多文件时,轮换文件,以便文件数量和文件大小都保持有界。对于这种使用模式,日志记录包提供了 RotatingFileHandler

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

结果应该是 6 个单独的文件,每个文件都包含应用程序日志历史的一部分

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

最新文件始终是 logging_rotatingfile_example.out,每次达到大小限制时,它都会重命名并加上后缀 .1。每个现有备份文件都会重命名以递增后缀(.1 变为 .2,依此类推),而 .6 文件则被删除。

显然,这个例子将日志长度设置得太小,作为一个极端的例子。您需要将 *maxBytes* 设置为适当的值。

使用替代格式样式

当日志记录被添加到 Python 标准库时,唯一一种用于格式化带有可变内容的消息的方法是使用 %-格式化方法。从那时起,Python 获得了两种新的格式化方法:string.Template(在 Python 2.4 中添加)和 str.format()(在 Python 2.6 中添加)。

日志记录(从 3.2 版开始)为这两种额外的格式样式提供了改进的支持。Formatter 类已得到增强,可以接受一个额外的可选关键字参数 style。它默认为 '%',但其他可能的值是 '{''$',它们对应于另外两种格式样式。默认情况下保持向后兼容性(如您所料),但通过显式指定样式参数,您可以指定与 str.format()string.Template 配合使用的格式字符串。这是一个示例控制台会话,以展示可能性

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

请注意,日志消息的最终输出格式与单个日志消息的构建方式完全独立。它仍然可以使用 %-格式化,如下所示

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

日志调用(logger.debug()logger.info() 等)仅接受实际日志消息本身的位置参数,关键字参数仅用于确定如何处理实际日志调用的选项(例如,exc_info 关键字参数表示应记录回溯信息,或 extra 关键字参数表示要添加到日志中的额外上下文信息)。因此,您不能直接使用 str.format()string.Template 语法进行日志调用,因为在内部日志包使用 %-格式化来合并格式字符串和可变参数。在保持向后兼容性的情况下无法更改这一点,因为现有代码中的所有日志调用都将使用 %-格式字符串。

然而,有一种方法可以使用 {} 和 $ 格式来构造您的单个日志消息。请记住,对于消息,您可以使用任意对象作为消息格式字符串,并且日志包将对该对象调用 str() 以获取实际的格式字符串。考虑以下两个类

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

这些类中的任何一个都可以代替格式字符串使用,以允许使用 {} 或 $ 格式来构建在格式化日志输出中代替 “%(message)s” 或 “{message}” 或 “$message” 出现的实际“消息”部分。每次您想记录一些东西时都使用类名会有点笨拙,但是如果您使用别名(例如 __(双下划线——不要与 _ 混淆,_ 是 gettext.gettext() 或其同类的同义词/别名)),那将非常可口。

上述类不包含在 Python 中,尽管它们很容易复制粘贴到您自己的代码中。它们可以按如下方式使用(假设它们在名为 wherever 的模块中声明)

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

虽然上述示例使用 print() 来展示格式化如何工作,但您当然会使用 logger.debug() 或类似方法来实际使用这种方法进行日志记录。

需要注意的一点是,这种方法不会带来显著的性能损失:实际的格式化并不是在您进行日志记录调用时发生的,而是在(如果)日志消息实际即将由处理程序输出到日志时发生的。因此,唯一可能让您感到有点不寻常的是括号环绕着格式字符串和参数,而不仅仅是格式字符串。这是因为 __ 符号只是对其中一个 XXXMessage 类的构造函数调用的语法糖。

如果您愿意,可以使用 LoggerAdapter 来达到与上述类似的效果,如以下示例所示

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger.log(level, Message(msg, args), **kwargs,
                            stacklevel=stacklevel+1)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

当使用 Python 3.8 或更高版本运行上述脚本时,它应该记录消息 Hello, world!

自定义 LogRecord

每个日志事件都由一个 LogRecord 实例表示。当一个事件被记录并且未被日志记录器级别过滤掉时,会创建一个 LogRecord,其中填充了有关事件的信息,然后传递给该日志记录器(及其祖先,包括禁用进一步向上级传播的日志记录器)的处理程序。在 Python 3.2 之前,此创建仅在两个地方完成

  • Logger.makeRecord(),在正常日志记录事件过程中调用。这直接调用 LogRecord 来创建实例。

  • makeLogRecord(),用一个包含要添加到 LogRecord 的属性的字典调用。这通常在通过网络接收到合适的字典时调用(例如,通过 SocketHandler 以 pickle 形式,或通过 HTTPHandler 以 JSON 形式)。

这通常意味着如果您需要对 LogRecord 进行任何特殊操作,您必须执行以下操作之一。

  • 创建您自己的 Logger 子类,它覆盖 Logger.makeRecord(),并在实例化任何您关心的日志记录器之前使用 setLoggerClass() 设置它。

  • 向日志记录器或处理程序添加 Filter,当其 filter() 方法被调用时,执行必要的特殊操作。

第一种方法在(例如)几个不同的库想要做不同的事情的场景中会有点笨拙。每个库都会尝试设置自己的 Logger 子类,而最后设置的那个会获胜。

第二种方法对于许多情况来说效果很好,但不允许您例如使用 LogRecord 的专门子类。库开发人员可以在其日志记录器上设置合适的过滤器,但他们必须记住每次引入新的日志记录器时都这样做(他们只需通过添加新的包或模块并执行

logger = logging.getLogger(__name__)

在模块级别)。这可能需要考虑的事情太多了。开发人员还可以将过滤器添加到附加到其顶级日志记录器的 NullHandler,但如果应用程序开发人员将处理程序附加到较低级别的库日志记录器,则不会调用此过滤器——因此该处理程序的输出将无法反映库开发人员的意图。

在 Python 3.2 及更高版本中,LogRecord 的创建是通过工厂完成的,您可以指定该工厂。工厂只是一个您可以使用 setLogRecordFactory() 设置并使用 getLogRecordFactory() 查询的可调用对象。工厂以与 LogRecord 构造函数相同的签名被调用,因为 LogRecord 是工厂的默认设置。

这种方法允许自定义工厂控制 LogRecord 创建的所有方面。例如,您可以返回一个子类,或者在创建后向记录添加一些额外的属性,使用类似以下的模式

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

这种模式允许不同的库将工厂链接在一起,只要它们不覆盖彼此的属性或无意中覆盖标准提供的属性,就不会出现意外。但是,应该记住,链中的每个链接都会增加所有日志操作的运行时开销,并且该技术只应在使用 Filter 无法提供所需结果时才使用。

子类化 QueueHandler 和 QueueListener - 一个 ZeroMQ 示例

子类化 QueueHandler

您可以使用 QueueHandler 子类将消息发送到其他类型的队列,例如 ZeroMQ “发布”套接字。在下面的示例中,套接字是单独创建的,并作为其“队列”传递给处理程序

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)


handler = ZeroMQSocketHandler(sock)

当然,还有其他组织方式,例如传入处理程序创建套接字所需的数据

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

子类化 QueueListener

您还可以子类化 QueueListener 以从其他类型的队列获取消息,例如 ZeroMQ “订阅”套接字。这是一个示例

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, /, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

子类化 QueueHandler 和 QueueListener - 一个 pynng 示例

与上述部分类似,我们可以使用 pynng(一个 Python 绑定到 NNG,被誉为 ZeroMQ 的精神继承者)实现一个监听器和处理程序。以下代码片段展示了这一点——您可以在安装了 pynng 的环境中测试它们。为了多样化,我们首先介绍监听器。

子类化 QueueListener

# listener.py
import json
import logging
import logging.handlers

import pynng

DEFAULT_ADDR = "tcp://:13232"

interrupted = False

class NNGSocketListener(logging.handlers.QueueListener):

    def __init__(self, uri, /, *handlers, **kwargs):
        # Have a timeout for interruptability, and open a
        # subscriber socket
        socket = pynng.Sub0(listen=uri, recv_timeout=500)
        # The b'' subscription matches all topics
        topics = kwargs.pop('topics', None) or b''
        socket.subscribe(topics)
        # We treat the socket as a queue
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self, block):
        data = None
        # Keep looping while not interrupted and no data received over the
        # socket
        while not interrupted:
            try:
                data = self.queue.recv(block=block)
                break
            except pynng.Timeout:
                pass
            except pynng.Closed:  # sometimes happens when you hit Ctrl-C
                break
        if data is None:
            return None
        # Get the logging event sent from a publisher
        event = json.loads(data.decode('utf-8'))
        return logging.makeLogRecord(event)

    def enqueue_sentinel(self):
        # Not used in this implementation, as the socket isn't really a
        # queue
        pass

logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
    while True:
        pass
except KeyboardInterrupt:
    interrupted = True
finally:
    listener.stop()

子类化 QueueHandler

# sender.py
import json
import logging
import logging.handlers
import time
import random

import pynng

DEFAULT_ADDR = "tcp://:13232"

class NNGSocketHandler(logging.handlers.QueueHandler):

    def __init__(self, uri):
        socket = pynng.Pub0(dial=uri, send_timeout=500)
        super().__init__(socket)

    def enqueue(self, record):
        # Send the record as UTF-8 encoded JSON
        d = dict(record.__dict__)
        data = json.dumps(d)
        self.queue.send(data.encode('utf-8'))

    def close(self):
        self.queue.close()

logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# Make sure the process ID is in the output
logging.basicConfig(level=logging.DEBUG,
                    handlers=[logging.StreamHandler(), handler],
                    format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
    # Just randomly select some loggers and levels and log away
    level = random.choice(levels)
    logger = logging.getLogger(random.choice(logger_names))
    logger.log(level, 'Message no. %5d' % msgno)
    msgno += 1
    delay = random.random() * 2 + 0.5
    time.sleep(delay)

您可以在不同的命令行 shell 中运行上述两个代码片段。如果我们在一个 shell 中运行监听器,并在两个不同的 shell 中运行发送器,我们应该会看到类似以下的内容。在第一个发送器 shell 中

$ python sender.py
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
CRITICAL myapp.lib2    613 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
DEBUG         myapp    613 Message no.     6
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
(and so on)

在第二个发送器 shell 中

$ python sender.py
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

在监听器 shell 中

$ python listener.py
Press Ctrl-C to stop.
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    613 Message no.     3
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
DEBUG         myapp    613 Message no.     6
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

如您所见,来自两个发送进程的日志记录在监听器的输出中交错。

一个基于字典的配置示例

下面是一个日志配置字典的示例——它取自 Django 项目的文档。此字典被传递给 dictConfig() 以使配置生效

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'simple': {
            'format': '{levelname} {message}',
            'style': '{',
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'simple',
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers': ['console'],
            'propagate': True,
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

有关此配置的更多信息,您可以查看 Django 文档的 相关部分

使用轮换器和命名器自定义日志轮换处理

以下可运行脚本提供了一个如何定义命名器和轮换器的示例,该脚本演示了日志文件的 gzip 压缩

import gzip
import logging
import logging.handlers
import os
import shutil

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, 'rb') as f_in:
        with gzip.open(dest, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    os.remove(source)


rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer

root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
    root.info(f'Message no. {i + 1}')

运行此程序后,您将看到六个新文件,其中五个已压缩

$ ls rotated.log*
rotated.log       rotated.log.2.gz  rotated.log.4.gz
rotated.log.1.gz  rotated.log.3.gz  rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998

一个更精细的多进程示例

以下工作示例展示了如何使用配置文件在多进程中使用日志记录。配置相当简单,但足以说明在真实的多进程场景中如何实现更复杂的配置。

在示例中,主进程生成一个监听进程和一些工作进程。主进程、监听器和工作进程都有三个独立的配置(所有工作进程共享相同的配置)。我们可以看到主进程中的日志记录、工作进程如何记录到 QueueHandler 以及监听器如何实现 QueueListener 和更复杂的日志配置,并安排将通过队列接收的事件分派给配置中指定的处理程序。请注意,这些配置纯粹是说明性的,但您应该能够根据自己的场景调整此示例。

这是脚本——文档字符串和注释希望能够解释它是如何工作的

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # The process name is transformed just to show that it's the listener
            # doing the logging to files and console
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

在发送给 SysLogHandler 的消息中插入 BOM

RFC 5424 要求将 Unicode 消息作为一组字节发送到 syslog 守护程序,这些字节具有以下结构:一个可选的纯 ASCII 组件,后跟一个 UTF-8 字节序标记 (BOM),后跟使用 UTF-8 编码的 Unicode。(请参阅规范的相关部分。)

在 Python 3.1 中,代码被添加到 SysLogHandler 以在消息中插入 BOM,但不幸的是,它的实现不正确,BOM 出现在消息的开头,因此不允许在它之前出现任何纯 ASCII 组件。

由于这种行为已损坏,不正确的 BOM 插入代码将从 Python 3.2.4 及更高版本中移除。但是,它不会被替换,如果您想生成包含 BOM、之前可选的纯 ASCII 序列以及之后任意 Unicode 的 RFC 5424 兼容消息,并使用 UTF-8 编码,那么您需要执行以下操作

  1. Formatter 实例附加到您的 SysLogHandler 实例,格式字符串如下

    'ASCII section\ufeffUnicode section'
    

    Unicode 代码点 U+FEFF,当使用 UTF-8 编码时,将被编码为 UTF-8 BOM——字节字符串 b'\xef\xbb\xbf'

  2. 用您喜欢的任何占位符替换 ASCII 部分,但请确保替换后出现在其中的数据始终是 ASCII(这样,它在 UTF-8 编码后将保持不变)。

  3. 用您喜欢的任何占位符替换 Unicode 部分;如果替换后出现的数据包含 ASCII 范围之外的字符,那也没关系——它将使用 UTF-8 编码。

格式化后的消息 SysLogHandler 使用 UTF-8 编码。如果您遵循上述规则,您应该能够生成 RFC 5424 兼容消息。如果您不这样做,日志记录可能不会抱怨,但您的消息将不符合 RFC 5424,并且您的 syslog 守护程序可能会抱怨。

实现结构化日志

尽管大多数日志消息旨在供人阅读,因此不易被机器解析,但在某些情况下,您可能希望以结构化格式输出消息,该格式 能够 被程序解析(无需复杂的正则表达式来解析日志消息)。使用日志包可以轻松实现这一点。实现此目的有多种方法,但以下是一种简单的方法,它使用 JSON 以机器可解析的方式序列化事件

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

如果运行上述脚本,它会打印

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

请注意,项目顺序可能因使用的 Python 版本而异。

如果您需要更专业的处理,您可以使用自定义 JSON 编码器,如以下完整示例所示

import json
import logging


class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, str):
            return o.encode('unicode_escape').decode('ascii')
        return super().default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

当运行上述脚本时,它会打印

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

请注意,项目顺序可能因使用的 Python 版本而异。

使用 dictConfig() 定制处理程序

有时您希望以特定方式定制日志处理程序,如果您使用 dictConfig(),您可能无需子类化即可实现此目的。例如,考虑您可能希望设置日志文件的所有权。在 POSIX 上,这可以通过使用 shutil.chown() 轻松完成,但 stdlib 中的文件处理程序不提供内置支持。您可以使用简单的函数(例如)自定义处理程序创建:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

然后,您可以在传递给 dictConfig() 的日志配置中指定通过调用此函数来创建日志处理程序:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

在此示例中,为了说明目的,我使用 pulse 用户和组来设置所有权。将其整合到一个可运行的脚本 chowntest.py 中:

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

要运行此程序,您可能需要以 root 身份运行:

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

请注意,此示例使用 Python 3.3,因为 shutil.chown() 是在该版本中出现的。此方法应该适用于任何支持 dictConfig() 的 Python 版本——即 Python 2.7、3.2 或更高版本。对于 3.3 之前的版本,您需要使用例如 os.chown() 来实现实际的所有权更改。

实际上,处理程序创建函数可能位于项目中的某个实用程序模块中。配置中的那一行:

'()': owned_file_handler,

您可以使用例如:

'()': 'ext://project.util.owned_file_handler',

其中 project.util 可以替换为函数所在的包的实际名称。在上面的工作脚本中,使用 'ext://__main__.owned_file_handler' 应该可以工作。在这里,实际的可调用对象由 dictConfig()ext:// 规范解析。

此示例也希望能指明如何以相同的方式实现其他类型的文件更改——例如设置特定的 POSIX 权限位——使用 os.chmod()

当然,这种方法也可以扩展到 FileHandler 以外的其他类型的处理程序——例如,一个轮转文件处理程序,或完全不同类型的处理程序。

在整个应用程序中使用特定的格式样式

在 Python 3.2 中,Formatter 获得了一个 style 关键字参数,它在默认情况下为了向后兼容性设置为 %,但允许指定 {$ 以支持 str.format()string.Template 支持的格式化方法。请注意,这控制了日志消息的最终输出格式,并且与单个日志消息的构建方式完全正交。

日志调用(debug()info() 等)只接受实际日志消息本身的位置参数,关键字参数仅用于确定如何处理日志调用的选项(例如,exc_info 关键字参数用于指示应记录回溯信息,或 extra 关键字参数用于指示要添加到日志的额外上下文信息)。因此,您不能直接使用 str.format()string.Template 语法进行日志调用,因为日志包内部使用 %-格式化来合并格式字符串和可变参数。在保持向后兼容性的情况下,无法更改此行为,因为现有代码中的所有日志调用都将使用 %-格式字符串。

有人建议将格式样式与特定日志记录器关联起来,但这种方法也遇到了向后兼容性问题,因为任何现有代码都可能正在使用给定的日志记录器名称并使用 % 格式化。

为了使日志记录在任何第三方库和您的代码之间可互操作,关于格式的决定需要在单个日志调用级别做出。这提供了几种容纳替代格式样式的方法。

使用 LogRecord 工厂

在 Python 3.2 中,除了上面提到的 Formatter 更改之外,日志包还增加了允许用户使用 setLogRecordFactory() 函数设置自己的 LogRecord 子类的功能。您可以利用此功能设置自己的 LogRecord 子类,通过重写 getMessage() 方法来做正确的事情。此方法的基类实现是 msg % args 格式化发生的地方,您可以在此处替换您的替代格式;但是,您应该注意支持所有格式样式并将 % 格式化作为默认值,以确保与其他代码的互操作性。还应注意调用 str(self.msg),就像基类实现一样。

有关更多信息,请参阅 setLogRecordFactory()LogRecord 的参考文档。

使用自定义消息对象

还有另一种可能更简单的方法,您可以使用 {} 和 $ 格式来构造您的单个日志消息。您可能还记得(从 使用任意对象作为消息)在日志记录时,您可以使用任意对象作为消息格式字符串,日志包将对该对象调用 str() 以获取实际的格式字符串。考虑以下两个类:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

其中任何一个都可以代替格式字符串使用,以允许使用 {}- 或 $- 格式来构建实际的“消息”部分,该部分在格式化日志输出中出现在“%(message)s”或“{message}”或“$message”的位置。如果您觉得每次要记录某些内容时都使用类名有点笨拙,那么如果您为消息使用别名,例如 M_(或者可能是 __,如果您将 _ 用于本地化),则可以使其更易于接受。

下面给出了这种方法的例子。首先,使用 str.format() 进行格式化:

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

其次,使用 string.Template 进行格式化:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

需要注意的一点是,这种方法不会造成明显的性能损失:实际的格式化操作不是在您进行日志调用时发生的,而是在(如果)日志消息即将由处理程序输出到日志时发生的。因此,唯一可能让您感到有点不寻常的是,括号环绕着格式字符串和参数,而不仅仅是格式字符串。这是因为 __ 符号只是上面所示的 XXXMessage 类之一的构造函数调用的语法糖。

使用 dictConfig() 配置过滤器

可以 使用 dictConfig() 配置过滤器,尽管乍一看可能不明显如何操作(因此有了这个示例)。由于 Filter 是标准库中唯一的过滤器类,并且它不太可能满足许多要求(它只作为基类存在),您通常需要定义自己的 Filter 子类并重写 filter() 方法。为此,请在过滤器的配置字典中指定 () 键,指定一个可调用对象,该对象将用于创建过滤器(类是最明显的选择,但您可以提供任何返回 Filter 实例的可调用对象)。以下是一个完整的示例:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

此示例展示了如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。运行时,上述脚本将打印:

changed: hello

这表明过滤器正在按配置工作。

另外几点注意事项:

  • 如果无法在配置中直接引用可调用对象(例如,如果它位于不同的模块中,并且您无法在配置字典中直接导入它),则可以使用 访问外部对象 中描述的 ext://... 形式。例如,在上面的示例中,您可以使用文本 'ext://__main__.MyFilter' 代替 MyFilter

  • 除了过滤器之外,此技术还可用于配置自定义处理程序和格式化程序。有关日志记录如何支持在其配置中使用用户定义对象的更多信息,请参阅 用户定义对象,并参阅上面的另一本食谱 使用 dictConfig() 定制处理程序

自定义异常格式化

有时您可能希望进行自定义异常格式化——例如,假设您希望每个日志事件只有一行,即使存在异常信息。您可以使用自定义格式化程序类来实现这一点,如下例所示:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super().formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super().format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

运行时,这将生成一个只有两行的文件:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: division by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: division by zero'|

虽然上述处理方式很简单,但它指明了如何根据您的喜好格式化异常信息。traceback 模块可能对更专业的需要有所帮助。

播报日志消息

在某些情况下,希望日志消息以可听而非可见的格式呈现。如果您的系统中有文本转语音(TTS)功能,即使它没有 Python 绑定,这也很容易实现。大多数 TTS 系统都有一个可以运行的命令行程序,可以通过处理程序使用 subprocess 来调用。这里假设 TTS 命令行程序不会期望与用户交互或花费很长时间才能完成,并且日志消息的频率不会太高以至于让用户消息泛滥,并且可以接受消息一个接一个地播报而不是同时播报。下面的示例实现等待一个消息播报完毕后才处理下一个消息,这可能会导致其他处理程序等待。这是一个简短的示例,展示了这种方法,它假设 espeak TTS 包可用:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

运行此脚本时,它应该以女性声音说出“Hello”和“Goodbye”。

当然,上述方法可以适用于其他 TTS 系统,甚至可以适用于通过命令行运行外部程序来处理消息的完全不同的系统。

缓冲日志消息并有条件地输出它们

有时,您可能希望将日志消息存储在临时区域中,并且只有在满足特定条件时才输出它们。例如,您可能希望在函数中开始记录调试事件,如果函数无错误完成,您不想用收集到的调试信息来杂乱日志,但如果发生错误,您希望将所有调试信息以及错误信息都输出。

下面是一个示例,展示了如何使用装饰器来实现这种日志行为。它利用了 logging.handlers.MemoryHandler,它允许缓冲日志事件直到某个条件发生,此时缓冲的事件会被 flush——传递给另一个处理程序(target 处理程序)进行处理。默认情况下,MemoryHandler 在其缓冲区满或遇到级别大于或等于指定阈值的事件时进行 flush。如果您希望自定义 flush 行为,可以使用 MemoryHandler 的更专业的子类。

示例脚本有一个简单的函数 foo,它只是遍历所有日志级别,向 sys.stderr 写入要记录的级别,然后实际在该级别记录一条消息。您可以向 foo 传递一个参数,如果为真,将记录 ERROR 和 CRITICAL 级别——否则,它只记录 DEBUG、INFO 和 WARNING 级别。

该脚本只安排用一个装饰器来装饰 foo,这个装饰器将完成所需的条件日志记录。装饰器接受一个日志记录器作为参数,并在被装饰函数调用的持续时间内附加一个内存处理程序。装饰器可以额外使用目标处理程序、触发 flush 的级别和缓冲区容量(记录数量)进行参数化。这些参数默认分别为写入 sys.stderrStreamHandlerlogging.ERROR100

这是脚本:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

运行此脚本时,应观察到以下输出:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

如您所见,只有当事件的严重性为 ERROR 或更高时,才会发生实际的日志输出,但在这种情况下,之前所有严重性较低的事件也会被记录。

当然,您可以使用传统的装饰方法:

@log_if_errors(logger)
def foo(fail=False):
    ...

将日志消息通过电子邮件发送,并进行缓冲

为了说明如何通过电子邮件发送日志消息,以便每封电子邮件发送指定数量的消息,您可以子类化 BufferingHandler。在以下示例中,您可以根据自己的特定需求进行调整,提供了一个简单的测试工具,允许您使用命令行参数运行脚本,这些参数指定您通常需要通过 SMTP 发送邮件的内容。(运行下载的脚本时带上 -h 参数以查看所需和可选参数。)

import logging
import logging.handlers
import smtplib

class BufferingSMTPHandler(logging.handlers.BufferingHandler):
    def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
                 subject, capacity):
        logging.handlers.BufferingHandler.__init__(self, capacity)
        self.mailhost = mailhost
        self.mailport = port
        self.username = username
        self.password = password
        self.fromaddr = fromaddr
        if isinstance(toaddrs, str):
            toaddrs = [toaddrs]
        self.toaddrs = toaddrs
        self.subject = subject
        self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))

    def flush(self):
        if len(self.buffer) > 0:
            try:
                smtp = smtplib.SMTP(self.mailhost, self.mailport)
                smtp.starttls()
                smtp.login(self.username, self.password)
                msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
                for record in self.buffer:
                    s = self.format(record)
                    msg = msg + s + "\r\n"
                smtp.sendmail(self.fromaddr, self.toaddrs, msg)
                smtp.quit()
            except Exception:
                if logging.raiseExceptions:
                    raise
            self.buffer = []

if __name__ == '__main__':
    import argparse

    ap = argparse.ArgumentParser()
    aa = ap.add_argument
    aa('host', metavar='HOST', help='SMTP server')
    aa('--port', '-p', type=int, default=587, help='SMTP port')
    aa('user', metavar='USER', help='SMTP username')
    aa('password', metavar='PASSWORD', help='SMTP password')
    aa('to', metavar='TO', help='Addressee for emails')
    aa('sender', metavar='SENDER', help='Sender email address')
    aa('--subject', '-s',
       default='Test Logging email from Python logging module (buffering)',
       help='Subject of email')
    options = ap.parse_args()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    h = BufferingSMTPHandler(options.host, options.port, options.user,
                             options.password, options.sender,
                             options.to, options.subject, 10)
    logger.addHandler(h)
    for i in range(102):
        logger.info("Info index = %d", i)
    h.flush()
    h.close()

如果您运行此脚本并且您的 SMTP 服务器设置正确,您应该会发现它会向您指定的收件人发送十一封电子邮件。前十封电子邮件每封将包含十条日志消息,第十一封将包含两条消息。这构成了脚本中指定的 102 条消息。

通过配置使用 UTC(GMT)格式化时间

有时您希望使用 UTC 格式化时间,这可以通过使用 UTCFormatter 这样的类来完成,如下所示:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

然后您可以在代码中使用 UTCFormatter 代替 Formatter。如果您想通过配置来实现这一点,您可以使用 dictConfig() API,其方法示例如下:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

运行此脚本时,它应该打印类似以下内容:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

显示时间如何以本地时间和 UTC 两种格式显示,每个处理程序各一个。

使用上下文管理器进行选择性日志记录

有时,暂时更改日志配置并在完成某些操作后将其恢复会很有用。为此,上下文管理器是保存和恢复日志上下文最明显的方式。下面是这样一个上下文管理器的简单示例,它允许您纯粹在上下文管理器的范围内可选地更改日志级别并添加日志处理程序:

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

如果您指定了级别值,则在上下文管理器所覆盖的 with 块范围内,日志记录器的级别将设置为该值。如果您指定了处理程序,则在进入块时将其添加到日志记录器,并在退出块时将其删除。您还可以要求管理器在块退出时为您关闭处理程序——如果您不再需要该处理程序,就可以这样做。

为了说明它的工作原理,我们可以在上面添加以下代码块:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

我们最初将日志记录器的级别设置为 INFO,因此消息 #1 出现而消息 #2 不出现。然后,我们在接下来的 with 块中临时将级别更改为 DEBUG,因此消息 #3 出现。块退出后,日志记录器的级别恢复到 INFO,因此消息 #4 不出现。在下一个 with 块中,我们再次将级别设置为 DEBUG,但同时也添加了一个写入 sys.stdout 的处理程序。因此,消息 #5 在控制台上出现两次(一次通过 stderr,一次通过 stdout)。with 语句完成后,状态恢复到以前的样子,因此消息 #6 出现(像消息 #1),而消息 #7 不出现(就像消息 #2)。

如果我们运行生成的脚本,结果如下:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

如果我们再次运行它,但将 stderr 重定向到 /dev/null,我们将看到以下内容,这是唯一写入 stdout 的消息:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

再次运行,但将 stdout 重定向到 /dev/null,我们得到:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

在这种情况下,打印到 stdout 的消息 #5 不会出现,正如预期的那样。

当然,这里描述的方法可以泛化,例如暂时附加日志过滤器。请注意,上述代码在 Python 2 和 Python 3 中都适用。

CLI 应用程序入门模板

这是一个示例,展示了如何:

  • 根据命令行参数使用日志级别

  • 以一致的方式将请求分派到单独文件中的多个子命令,所有子命令都以相同的级别记录日志

  • 利用简单、最少的配置

假设我们有一个命令行应用程序,其任务是停止、启动或重启某些服务。为了说明目的,这可以组织为一个文件 app.py,它是应用程序的主脚本,各个命令在 start.pystop.pyrestart.py 中实现。进一步假设我们希望通过命令行参数控制应用程序的详细程度,默认值为 logging.INFO。这是 app.py 的一种编写方式:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

startstoprestart 命令可以在单独的模块中实现,例如启动模块如下:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

因此,停止模块如下:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

重启模块也类似:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

如果我们使用默认日志级别运行此应用程序,我们将获得如下输出:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

第一个词是日志级别,第二个词是事件记录位置的模块或包名称。

如果我们更改日志级别,那么我们可以更改发送到日志的信息。例如,如果我们想要更多信息:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

如果我们想要更少的信息:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

在这种情况下,命令不会向控制台打印任何内容,因为它们没有记录任何 WARNING 级别或更高级别的日志。

日志记录的 Qt GUI

一个时常出现的问题是如何将日志记录到 GUI 应用程序中。Qt 框架是一个流行的跨平台 UI 框架,它通过 PySide2PyQt5 库提供 Python 绑定。

以下示例展示了如何将日志记录到 Qt GUI。这引入了一个简单的 QtHandler 类,它接受一个可调用对象,该对象应该是主线程中进行 GUI 更新的槽。还创建了一个工作线程,以展示如何从 UI 本身(通过按钮进行手动日志记录)以及在后台进行工作的工作线程(此处只是以随机级别和随机短延迟记录消息)向 GUI 记录日志。

工作线程是使用 Qt 的 QThread 类而不是 threading 模块实现的,因为在某些情况下必须使用 QThread,它提供了与其它 Qt 组件更好的集成。

该代码应适用于 PySide6PyQt6PySide2PyQt5 的任何最新版本。您应该能够将此方法应用于早期版本的 Qt。请参阅代码片段中的注释以获取更详细的信息。

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between different Qt packages
try:
    from PySide6 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    try:
        from PyQt6 import QtCore, QtGui, QtWidgets
        Signal = QtCore.pyqtSignal
        Slot = QtCore.pyqtSlot
    except ImportError:
        try:
            from PySide2 import QtCore, QtGui, QtWidgets
            Signal = QtCore.Signal
            Slot = QtCore.Slot
        except ImportError:
            from PyQt5 import QtCore, QtGui, QtWidgets
            Signal = QtCore.pyqtSignal
            Slot = QtCore.pyqtSlot

logger = logging.getLogger(__name__)


#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            try:
                if random.random() < 0.1:
                    raise ValueError('Exception raised: %d' % i)
                else:
                    level = random.choice(LEVELS)
                    logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            except ValueError as e:
                logger.exception('Failed: %s', e, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super().__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        if hasattr(f, 'Monospace'):
            f.setStyleHint(f.Monospace)
        else:
            f.setStyleHint(f.StyleHint.Monospace)  # for Qt6
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    if hasattr(app, 'exec'):
        rc = app.exec()
    else:
        rc = app.exec_()
    sys.exit(rc)

if __name__=='__main__':
    main()

支持 RFC5424 的 syslog 日志记录

尽管 RFC 5424 发布于 2009 年,但大多数 syslog 服务器默认配置为使用较旧的 RFC 3164,该协议可追溯到 2001 年。当 Python 在 2003 年添加 logging 时,它支持当时较早(也是唯一存在)的协议。由于 RFC5424 发布后,它在 syslog 服务器中并未得到广泛部署,因此 SysLogHandler 的功能尚未更新。

RFC 5424 包含一些有用的功能,例如支持结构化数据,如果您需要能够向支持它的 syslog 服务器记录日志,您可以使用如下所示的子类化处理程序:

import datetime
import logging.handlers
import re
import socket
import time

class SysLogHandler5424(logging.handlers.SysLogHandler):

    tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
    escaped = re.compile(r'([\]"\\])')

    def __init__(self, *args, **kwargs):
        self.msgid = kwargs.pop('msgid', None)
        self.appname = kwargs.pop('appname', None)
        super().__init__(*args, **kwargs)

    def format(self, record):
        version = 1
        asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
        m = self.tz_offset.match(time.strftime('%z'))
        has_offset = False
        if m and time.timezone:
            hrs, mins = m.groups()
            if int(hrs) or int(mins):
                has_offset = True
        if not has_offset:
            asctime += 'Z'
        else:
            asctime += f'{hrs}:{mins}'
        try:
            hostname = socket.gethostname()
        except Exception:
            hostname = '-'
        appname = self.appname or '-'
        procid = record.process
        msgid = '-'
        msg = super().format(record)
        sdata = '-'
        if hasattr(record, 'structured_data'):
            sd = record.structured_data
            # This should be a dict where the keys are SD-ID and the value is a
            # dict mapping PARAM-NAME to PARAM-VALUE (refer to the RFC for what these
            # mean)
            # There's no error checking here - it's purely for illustration, and you
            # can adapt this code for use in production environments
            parts = []

            def replacer(m):
                g = m.groups()
                return '\\' + g[0]

            for sdid, dv in sd.items():
                part = f'[{sdid}'
                for k, v in dv.items():
                    s = str(v)
                    s = self.escaped.sub(replacer, s)
                    part += f' {k}="{s}"'
                part += ']'
                parts.append(part)
            sdata = ''.join(parts)
        return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'

您需要熟悉 RFC 5424 才能完全理解上述代码,而且您可能有一些稍微不同的需求(例如,如何将结构化数据传递到日志中)。尽管如此,上述内容应该能够适应您的特定需求。使用上述处理程序,您可以通过以下方式传递结构化数据:

sd = {
    'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
    'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)

如何将日志记录器视为输出流

有时,您需要与需要写入文件类对象的第三方 API 进行接口,但您希望将 API 的输出定向到日志记录器。您可以使用一个将日志记录器封装在文件类 API 中的类来实现此目的。以下是一个简短的脚本,演示了这样一个类:

import logging

class LoggerWriter:
    def __init__(self, logger, level):
        self.logger = logger
        self.level = level

    def write(self, message):
        if message != '\n':  # avoid printing bare newlines, if you like
            self.logger.log(self.level, message)

    def flush(self):
        # doesn't actually do anything, but might be expected of a file-like
        # object - so optional depending on your situation
        pass

    def close(self):
        # doesn't actually do anything, but might be expected of a file-like
        # object - so optional depending on your situation. You might want
        # to set a flag so that later calls to write raise an exception
        pass

def main():
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger('demo')
    info_fp = LoggerWriter(logger, logging.INFO)
    debug_fp = LoggerWriter(logger, logging.DEBUG)
    print('An INFO message', file=info_fp)
    print('A DEBUG message', file=debug_fp)

if __name__ == "__main__":
    main()

运行此脚本时,它会打印:

INFO:demo:An INFO message
DEBUG:demo:A DEBUG message

您还可以使用 LoggerWriter 来重定向 sys.stdoutsys.stderr,方法如下:

import sys

sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)

您应该在根据需要配置日志记录 之后 执行此操作。在上面的示例中,basicConfig() 调用执行此操作(使用 sys.stderr 值,在其被 LoggerWriter 实例覆盖之前)。然后,您将得到如下结果:

>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>

当然,上面的示例显示了根据 basicConfig() 使用的格式的输出,但您可以在配置日志记录时使用不同的格式化程序。

请注意,使用上述方案,您在某种程度上受制于正在拦截的缓冲和写入调用序列。例如,使用上面 LoggerWriter 的定义,如果您有以下代码片段:

sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0

然后运行脚本的结果是:

WARNING:demo:Traceback (most recent call last):

WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>

WARNING:demo:
WARNING:demo:main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main

WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero

如您所见,此输出并不理想。这是因为写入 sys.stderr 的底层代码进行了多次写入,每次写入都会产生一个单独的日志行(例如,上面的最后三行)。为了解决这个问题,您需要缓冲内容,并且只在看到换行符时才输出日志行。让我们使用一个稍微更好的 LoggerWriter 实现:

class BufferingLoggerWriter(LoggerWriter):
    def __init__(self, logger, level):
        super().__init__(logger, level)
        self.buffer = ''

    def write(self, message):
        if '\n' not in message:
            self.buffer += message
        else:
            parts = message.split('\n')
            if self.buffer:
                s = self.buffer + parts.pop(0)
                self.logger.log(self.level, s)
            self.buffer = parts.pop()
            for part in parts:
                self.logger.log(self.level, part)

这只是将内容缓冲起来直到看到换行符,然后记录完整的行。使用这种方法,您可以获得更好的输出:

WARNING:demo:Traceback (most recent call last):
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo:    main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo:    1/0
WARNING:demo:ZeroDivisionError: division by zero

如何在日志输出中统一处理换行符

通常,记录的消息(例如到控制台或文件)由单行文本组成。但是,有时需要处理多行消息——无论是由于日志格式字符串包含换行符,还是记录的数据包含换行符。如果您想统一处理此类消息,以便记录消息中的每一行都像单独记录一样统一格式化,您可以使用处理程序 mixin 来实现,如下面的代码片段所示:

# Assume this is in a module mymixins.py
import copy

class MultilineMixin:
    def emit(self, record):
        s = record.getMessage()
        if '\n' not in s:
            super().emit(record)
        else:
            lines = s.splitlines()
            rec = copy.copy(record)
            rec.args = None
            for line in lines:
                rec.msg = line
                super().emit(rec)

您可以按如下脚本使用 mixin:

import logging

from mymixins import MultilineMixin

logger = logging.getLogger(__name__)

class StreamHandler(MultilineMixin, logging.StreamHandler):
    pass

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-9s %(message)s',
                        handlers = [StreamHandler()])
    logger.debug('Single line')
    logger.debug('Multiple lines:\nfool me once ...')
    logger.debug('Another single line')
    logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')

该脚本运行时,会打印出类似以下内容:

2025-07-02 13:54:47,234 DEBUG     Single line
2025-07-02 13:54:47,234 DEBUG     Multiple lines:
2025-07-02 13:54:47,234 DEBUG     fool me once ...
2025-07-02 13:54:47,234 DEBUG     Another single line
2025-07-02 13:54:47,234 DEBUG     Multiple lines:
2025-07-02 13:54:47,234 DEBUG     fool me ...
2025-07-02 13:54:47,234 DEBUG     can't get fooled again

另一方面,如果您担心 日志注入,您可以使用一个转义换行符的格式化程序,如下例所示:

import logging

logger = logging.getLogger(__name__)

class EscapingFormatter(logging.Formatter):
    def format(self, record):
        s = super().format(record)
        return s.replace('\n', r'\n')

if __name__ == '__main__':
    h = logging.StreamHandler()
    h.setFormatter(EscapingFormatter('%(asctime)s %(levelname)-9s %(message)s'))
    logging.basicConfig(level=logging.DEBUG, handlers = [h])
    logger.debug('Single line')
    logger.debug('Multiple lines:\nfool me once ...')
    logger.debug('Another single line')
    logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')

当然,您可以使用任何对您来说最有意义的转义方案。该脚本运行后,应该会产生如下输出:

2025-07-09 06:47:33,783 DEBUG     Single line
2025-07-09 06:47:33,783 DEBUG     Multiple lines:\nfool me once ...
2025-07-09 06:47:33,783 DEBUG     Another single line
2025-07-09 06:47:33,783 DEBUG     Multiple lines:\nfool me ...\ncan't get fooled again

转义行为不能作为标准库的默认行为,因为它会破坏向后兼容性。

需要避免的模式

尽管前面的章节描述了您可能需要做或处理事情的方法,但值得一提的是一些 无益 的使用模式,因此在大多数情况下应避免这些模式。以下章节没有特定顺序。

多次打开同一个日志文件

在 Windows 上,您通常无法多次打开同一个文件,因为这会导致“文件被其他进程使用”错误。然而,在 POSIX 平台上,如果您多次打开同一个文件,则不会收到任何错误。这可能是意外发生的,例如:

  • 多次添加引用同一文件的文件处理程序(例如,由于复制/粘贴/忘记更改错误)。

  • 打开两个看起来不同的文件,因为它们有不同的名称,但由于一个是另一个的符号链接而实际上是同一个文件。

  • 派生一个进程后,父进程和子进程都引用同一个文件。这可能通过使用 multiprocessing 模块来实现。

多次打开文件可能 看起来 大部分时间都能正常工作,但在实践中会导致许多问题:

  • 日志输出可能会混乱,因为多个线程或进程尝试写入同一个文件。尽管日志记录会防止多个线程并发使用同一个处理程序实例,但如果两个不同的线程使用两个不同的处理程序实例(恰好指向同一个文件)并发尝试写入,则没有这种保护。

  • 删除文件的尝试(例如在文件轮转期间)静默失败,因为有另一个引用指向它。这可能导致混乱和浪费调试时间——日志条目最终出现在意想不到的位置,或完全丢失。或者一个本应移动的文件仍然保留在原位,并且尽管 supposedly 有基于大小的轮转,但其大小意外增长。

使用 从多个进程记录到单个文件 中概述的技术来规避此类问题。

将日志记录器用作类中的属性或将其作为参数传递

虽然在某些不寻常的情况下可能需要这样做,但一般来说没有意义,因为日志记录器是单例。代码始终可以通过名称使用 logging.getLogger(name) 访问给定的日志记录器实例,因此传递实例并将其作为实例属性保存是毫无意义的。请注意,在其他语言(如 Java 和 C#)中,日志记录器通常是静态类属性。但是,这种模式在 Python 中没有意义,因为模块(而不是类)是软件分解的单位。

向库中的日志记录器添加除 NullHandler 之外的处理程序

配置日志记录(通过添加处理程序、格式化程序和过滤器)是应用程序开发者的责任,而不是库开发者的责任。如果您正在维护一个库,请确保除了 NullHandler 实例之外,不要向您的任何日志记录器添加处理程序。

创建大量日志记录器

日志记录器是单例,在脚本执行期间永不释放,因此创建大量日志记录器将耗尽无法释放的内存。与其为每个(例如)处理的文件或建立的网络连接创建一个日志记录器,不如使用 现有机制 将上下文信息传递到您的日志中,并将创建的日志记录器限制为描述应用程序内区域的日志记录器(通常是模块,但偶尔会更细粒度)。

其他资源

参见

模块 logging

logging 模块的 API 参考。

模块 logging.config

日志模块的配置 API。

模块 logging.handlers

日志模块附带的有用的处理器。

基础教程

高级教程