日志记录烹饪手册¶
- 作者:
Vinay Sajip <vinay_sajip at red-dove dot com>
此页面包含一些与日志记录相关的食谱,这些食谱在过去被证明很有用。有关教程和参考信息的链接,请参见 其他资源.
在多个模块中使用日志记录¶
对 logging.getLogger('someLogger')
的多次调用将返回对同一个记录器对象的引用。这不仅在同一个模块内有效,而且在同一个 Python 解释器进程中的不同模块之间也有效。这对于对同一个对象的引用是正确的;此外,应用程序代码可以在一个模块中定义和配置一个父记录器,并在另一个模块中创建一个子记录器(但不要配置它),所有对子记录器的记录器调用都将传递到父记录器。这是一个主模块
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
这是一个辅助模块
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
输出如下所示
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
多线程日志记录¶
多线程日志记录不需要任何特殊操作。以下示例展示了主线程(初始线程)和另一个线程的日志记录
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
运行时,脚本应该打印类似以下内容
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
这展示了日志输出的交织,正如预期的那样。当然,这种方法适用于比这里展示的更多线程。
多个处理器和格式化器¶
记录器是普通的 Python 对象。 addHandler()
方法对可以添加的处理器数量没有最小或最大配额。有时,对于应用程序来说,将所有级别的所有消息记录到文本文件,同时将错误或以上级别的消息记录到控制台将是有益的。要设置此功能,只需配置相应的处理器即可。应用程序代码中的日志记录调用将保持不变。以下是之前基于模块的简单配置示例的略微修改
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
请注意,“应用程序”代码不关心多个处理器。唯一改变的是添加和配置了一个名为 *fh* 的新处理器。
创建具有更高或更低严重性过滤器的新处理器的能力在编写和测试应用程序时非常有用。在使用许多 print
语句进行调试时,请使用 logger.debug
:与您以后必须删除或注释掉的 print 语句不同,logger.debug 语句可以保留在源代码中,并在您需要它们之前处于休眠状态。此时,唯一需要更改的是修改记录器和/或处理器的严重性级别以进行调试。
记录到多个目的地¶
假设您想以不同的消息格式和不同的情况下记录到控制台和文件。假设您想将 DEBUG 及以上级别的消息记录到文件,并将 INFO 及以上级别的消息记录到控制台。另外,假设文件应该包含时间戳,但控制台消息不应该包含时间戳。以下是如何实现这一点
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/tmp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
运行此代码时,您将在控制台上看到
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
在文件中,您将看到类似以下内容
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所见,DEBUG 消息只显示在文件中。其他消息被发送到两个目的地。
此示例使用控制台和文件处理器,但您可以使用任意数量和组合的处理器。
请注意,上面选择的日志文件名 /tmp/myapp.log
意味着在 POSIX 系统上使用标准的临时文件位置。在 Windows 上,您可能需要为日志选择不同的目录名 - 只需确保目录存在,并且您有权限在其中创建和更新文件。
自定义处理级别¶
有时,您可能希望对处理器中级别的标准处理方式进行一些不同的操作,在标准处理方式中,所有高于阈值的级别都会被处理器处理。为此,您需要使用过滤器。让我们来看一个您想按以下方式安排事物的场景
将
INFO
和WARNING
严重性的消息发送到sys.stdout
将
ERROR
及以上严重性的消息发送到sys.stderr
将
DEBUG
及以上严重性的消息发送到文件app.log
假设您使用以下 JSON 配置日志记录
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
此配置几乎满足了我们的需求,除了 sys.stdout
也会显示 ERROR
及以上严重性的消息,以及 INFO
和 WARNING
消息。为了防止这种情况,我们可以设置一个过滤器来排除这些消息,并将其添加到相关的处理器中。这可以通过添加一个 filters
部分来配置,该部分与 formatters
和 handlers
平行
{
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
}
}
并将 stdout
处理器部分更改为添加它
{
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
}
}
过滤器只是一个函数,因此我们可以定义 filter_maker
(一个工厂函数)如下
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
此函数将传入的字符串参数转换为数值级别,并返回一个函数,该函数仅在传入记录的级别等于或低于指定级别时才返回 True
。请注意,在本例中,我在命令行运行的测试脚本 main.py
中定义了 filter_maker
,因此其模块将为 __main__
- 因此在过滤器配置中为 __main__.filter_maker
。如果您在不同的模块中定义它,则需要更改它。
添加过滤器后,我们可以运行 main.py
,其完整内容为
import json
import logging
import logging.config
CONFIG = '''
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
'''
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')
像这样运行它之后
python main.py 2>stderr.log >stdout.log
我们可以看到结果符合预期
$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG - A DEBUG message
INFO - An INFO message
WARNING - A WARNING message
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO - An INFO message
WARNING - A WARNING message
配置服务器示例¶
这是一个使用日志记录配置服务器的模块示例
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
这是一个脚本,它接受一个文件名并将该文件发送到服务器,在前面加上二进制编码的长度,作为新的日志记录配置
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
处理阻塞的处理程序¶
有时您需要让日志记录处理程序完成工作,而不会阻塞您正在记录的线程。这在 Web 应用程序中很常见,当然也会在其他场景中发生。
一个常见的罪魁祸首是 SMTPHandler
:发送电子邮件可能需要很长时间,原因有很多,超出了开发人员的控制范围(例如,性能不佳的邮件或网络基础设施)。但几乎任何基于网络的处理程序都可能阻塞:即使是 SocketHandler
操作也可能在幕后执行 DNS 查询,这太慢了(并且此查询可能位于套接字库代码的深处,低于 Python 层,并且超出您的控制范围)。
一种解决方案是使用两部分方法。对于第一部分,只将 QueueHandler
附加到从性能关键线程访问的那些记录器。它们只是写入它们的队列,该队列可以调整到足够大的容量或初始化为没有上限的容量。写入队列通常会很快被接受,尽管您可能需要在代码中捕获 queue.Full
异常作为预防措施。如果您是库开发人员,并且代码中存在性能关键线程,请务必记录此信息(以及仅将 QueueHandlers
附加到记录器的建议),以便其他将使用您代码的开发人员受益。
解决方案的第二部分是 QueueListener
,它被设计为 QueueHandler
的对应物。一个 QueueListener
非常简单:它接收一个队列和一些处理程序,并启动一个内部线程,该线程监听其队列以获取从 QueueHandlers
(或任何其他 LogRecords
源)发送的 LogRecords
。这些 LogRecords
将从队列中删除并传递给处理程序以进行处理。
使用单独的 QueueListener
类的好处是,您可以使用同一个实例来为多个 QueueHandlers
提供服务。这比使用现有处理程序类的线程版本更节省资源,后者会为每个处理程序消耗一个线程,而没有特别的益处。
以下是一个使用这两个类的示例(省略导入)
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
运行时,将产生
MainThread: Look out!
注意
虽然之前的讨论没有专门针对异步代码,而是关于缓慢的日志处理程序,但需要注意的是,当从异步代码中记录日志时,网络甚至文件处理程序都可能导致问题(阻塞事件循环),因为一些日志记录是从 asyncio
内部完成的。如果应用程序中使用了任何异步代码,最好使用上述方法进行日志记录,以便任何阻塞代码只在 QueueListener
线程中运行。
在版本 3.5 中更改: 在 Python 3.5 之前,QueueListener
始终将从队列接收到的每条消息传递给它初始化的每个处理程序。(这是因为假设所有级别过滤都在另一端完成,即队列被填充的地方。)从 3.5 开始,可以通过将关键字参数 respect_handler_level=True
传递给侦听器的构造函数来更改此行为。当这样做时,侦听器会将每条消息的级别与处理程序的级别进行比较,并且只有在适当的情况下才会将消息传递给处理程序。
通过网络发送和接收日志事件¶
假设您想通过网络发送日志事件,并在接收端处理它们。一个简单的方法是在发送端的根记录器上附加一个 SocketHandler
实例
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,您可以使用 socketserver
模块设置接收器。这是一个基本的示例
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
首先运行服务器,然后运行客户端。在客户端,控制台上不会打印任何内容;在服务器端,您应该看到类似的内容
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
请注意,在某些情况下,pickle 存在一些安全问题。如果这些问题影响到您,您可以通过覆盖 makePickle()
方法并实现您的替代方案来使用替代序列化方案,以及调整上述脚本以使用您的替代序列化方案。
在生产环境中运行日志套接字侦听器¶
要在生产环境中运行日志侦听器,您可能需要使用进程管理工具,例如 Supervisor。 这是一个 Gist,它提供了使用 Supervisor 运行上述功能的基本文件。它包含以下文件
文件 |
目的 |
---|---|
|
一个 Bash 脚本,用于准备测试环境 |
|
Supervisor 配置文件,其中包含侦听器和多进程 Web 应用程序的条目 |
|
一个 Bash 脚本,用于确保 Supervisor 使用上述配置运行 |
|
套接字侦听器程序,它接收日志事件并将它们记录到文件中 |
|
一个简单的 Web 应用程序,通过连接到侦听器的套接字执行日志记录 |
|
Web 应用程序的 JSON 配置文件 |
|
一个 Python 脚本,用于练习 Web 应用程序 |
该 Web 应用程序使用 Gunicorn,这是一个流行的 Web 应用程序服务器,它启动多个工作进程来处理请求。此示例设置显示了工作进程如何写入同一个日志文件而不会相互冲突——它们都通过套接字侦听器。
要测试这些文件,请在 POSIX 环境中执行以下操作
使用 下载 ZIP 按钮下载 Gist 作为 ZIP 存档。
从存档中将上述文件解压缩到一个临时目录中。
在 scratch 目录中,运行
bash prepare.sh
来准备工作。这将创建一个run
子目录来存放与 Supervisor 相关的文件和日志文件,以及一个venv
子目录来存放一个虚拟环境,其中安装了bottle
、gunicorn
和supervisor
。运行
bash ensure_app.sh
来确保 Supervisor 正在使用上述配置运行。运行
venv/bin/python client.py
来测试 Web 应用程序,这将导致记录写入日志。检查
run
子目录中的日志文件。您应该在与模式app.log*
匹配的文件中看到最新的日志行。它们不会按任何特定顺序排列,因为它们是由不同的工作进程以非确定性方式并发处理的。您可以通过运行
venv/bin/supervisorctl -c supervisor.conf shutdown
来关闭监听器和 Web 应用程序。
在配置的端口与测试环境中的其他内容冲突的极不可能的情况下,您可能需要调整配置文件。
在您的日志输出中添加上下文信息¶
有时您希望日志输出除了传递给日志调用的参数外还包含上下文信息。例如,在网络应用程序中,可能需要在日志中记录特定于客户端的信息(例如远程客户端的用户名或 IP 地址)。虽然您可以使用 extra 参数来实现这一点,但这并不总是方便以这种方式传递信息。虽然可能很想在每个连接的基础上创建 Logger
实例,但这并不是一个好主意,因为这些实例不会被垃圾回收。虽然在实践中这不是问题,但当 Logger
实例的数量取决于您想要在日志记录应用程序中使用的粒度级别时,如果 Logger
实例的数量变得实际上无界,那么它可能难以管理。
使用 LoggerAdapters 传递上下文信息¶
您可以轻松地将上下文信息传递到与日志事件信息一起输出的一种方法是使用 LoggerAdapter
类。此类旨在看起来像 Logger
,以便您可以调用 debug()
、info()
、warning()
、error()
、exception()
、critical()
和 log()
。这些方法与 Logger
中的对应方法具有相同的签名,因此您可以互换使用这两种类型的实例。
当您创建 LoggerAdapter
的实例时,您将向它传递一个 Logger
实例和一个包含您的上下文信息的类似字典的对象。当您在 LoggerAdapter
的实例上调用其中一个日志记录方法时,它会将调用委托给传递给其构造函数的 Logger
的基础实例,并安排在委托调用中传递上下文信息。以下是 LoggerAdapter
代码中的一个片段
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
process()
方法是 LoggerAdapter
类中添加上下文信息到日志输出的地方。它接收日志调用的消息和关键字参数,并返回这些参数的(可能已修改的)版本,用于调用底层日志记录器。此方法的默认实现保留消息不变,但在关键字参数中插入一个名为“extra”的键,其值为传递给构造函数的类字典对象。当然,如果您在调用适配器时传递了“extra”关键字参数,它将被静默覆盖。
使用“extra”的优点是,类字典对象中的值会被合并到 LogRecord
实例的 __dict__ 中,允许您使用自定义字符串与您的 Formatter
实例一起使用,这些实例了解类字典对象的键。如果您需要不同的方法,例如,如果您想在消息字符串的开头或结尾添加上下文信息,您只需要子类化 LoggerAdapter
并重写 process()
来完成您需要的操作。以下是一个简单的示例
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
您可以像这样使用它
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然后,您记录到适配器的任何事件都将在日志消息的开头添加 some_conn_id
的值。
使用除字典以外的对象传递上下文信息¶
您不必将实际的字典传递给 LoggerAdapter
- 您可以传递一个实现了 __getitem__
和 __iter__
的类的实例,以便它看起来像日志记录的字典。如果您想动态生成值(而字典中的值是常量),这将很有用。
使用过滤器传递上下文信息¶
您还可以使用用户定义的 Filter
将上下文信息添加到日志输出。 Filter
实例被允许修改传递给它们的 LogRecords
,包括添加额外的属性,这些属性可以使用合适的格式字符串输出,或者如果需要,可以使用自定义的 Formatter
。
例如,在 Web 应用程序中,正在处理的请求(或至少是其有趣的部分)可以存储在 threadlocal (threading.local
) 变量中,然后从 Filter
中访问,以添加例如来自请求的信息 - 例如,远程 IP 地址和远程用户的用户名 - 到 LogRecord
中,使用属性名称“ip”和“user”,如上面的 LoggerAdapter
示例。在这种情况下,可以使用相同的格式字符串来获取与上面显示的类似输出。以下是一个示例脚本
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
运行时,会产生类似以下内容的输出
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
使用 contextvars
¶
从 Python 3.7 开始,contextvars
模块提供了上下文局部存储,适用于 threading
和 asyncio
处理需求。因此,这种类型的存储通常比线程局部变量更可取。以下示例展示了如何在多线程环境中,使用上下文信息(例如,Web 应用程序处理的请求属性)填充日志。
为了说明,假设您有不同的 Web 应用程序,每个应用程序彼此独立,但在同一个 Python 进程中运行,并使用它们共有的库。如何让这些应用程序分别拥有自己的日志,将库(和其他请求处理代码)的所有日志消息定向到相应的应用程序日志文件,同时在日志中包含其他上下文信息,例如客户端 IP、HTTP 请求方法和客户端用户名?
假设库可以用以下代码模拟
# webapplib.py
import logging
import time
logger = logging.getLogger(__name__)
def useful():
# Just a representative event logged from the library
logger.debug('Hello from webapplib!')
# Just sleep for a bit so other threads get to run
time.sleep(0.01)
我们可以通过两个简单的类 Request
和 WebApp
来模拟多个 Web 应用程序。这些模拟了真实的多线程 Web 应用程序的工作方式 - 每个请求都由一个线程处理
# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib
logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
class Request:
"""
A simple dummy request class which just holds dummy HTTP request method,
client IP address and client username
"""
def __init__(self, method, ip, user):
self.method = method
self.ip = ip
self.user = user
# A dummy set of requests which will be used in the simulation - we'll just pick
# from this list randomly. Note that all GET requests are from 192.168.2.XXX
# addresses, whereas POST requests are from 192.16.3.XXX addresses. Three users
# are represented in the sample requests.
REQUESTS = [
Request('GET', '192.168.2.20', 'jim'),
Request('POST', '192.168.3.20', 'fred'),
Request('GET', '192.168.2.21', 'sheila'),
Request('POST', '192.168.3.21', 'jim'),
Request('GET', '192.168.2.22', 'fred'),
Request('POST', '192.168.3.22', 'sheila'),
]
# Note that the format string includes references to request context information
# such as HTTP method, client IP and username
formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')
# Create our context variables. These will be filled at the start of request
# processing, and used in the logging that happens during that processing
ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')
class InjectingFilter(logging.Filter):
"""
A filter which injects context-specific information into logs and ensures
that only information for a specific webapp is included in its log
"""
def __init__(self, app):
self.app = app
def filter(self, record):
request = ctx_request.get()
record.method = request.method
record.ip = request.ip
record.user = request.user
record.appName = appName = ctx_appname.get()
return appName == self.app.name
class WebApp:
"""
A dummy web application class which has its own handler and filter for a
webapp-specific log.
"""
def __init__(self, name):
self.name = name
handler = logging.FileHandler(name + '.log', 'w')
f = InjectingFilter(self)
handler.setFormatter(formatter)
handler.addFilter(f)
root.addHandler(handler)
self.num_requests = 0
def process_request(self, request):
"""
This is the dummy method for processing a request. It's called on a
different thread for every request. We store the context information into
the context vars before doing anything else.
"""
ctx_request.set(request)
ctx_appname.set(self.name)
self.num_requests += 1
logger.debug('Request processing started')
webapplib.useful()
logger.debug('Request processing finished')
def main():
fn = os.path.splitext(os.path.basename(__file__))[0]
adhf = argparse.ArgumentDefaultsHelpFormatter
ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
description='Simulate a couple of web '
'applications handling some '
'requests, showing how request '
'context can be used to '
'populate logs')
aa = ap.add_argument
aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
options = ap.parse_args()
# Create the dummy webapps and put them in a list which we can use to select
# from randomly
app1 = WebApp('app1')
app2 = WebApp('app2')
apps = [app1, app2]
threads = []
# Add a common handler which will capture all events
handler = logging.FileHandler('app.log', 'w')
handler.setFormatter(formatter)
root.addHandler(handler)
# Generate calls to process requests
for i in range(options.count):
try:
# Pick an app at random and a request for it to process
app = choice(apps)
request = choice(REQUESTS)
# Process the request in its own thread
t = threading.Thread(target=app.process_request, args=(request,))
threads.append(t)
t.start()
except KeyboardInterrupt:
break
# Wait for the threads to terminate
for t in threads:
t.join()
for app in apps:
print('%s processed %s requests' % (app.name, app.num_requests))
if __name__ == '__main__':
main()
如果您运行上面的代码,您应该会发现大约一半的请求进入 app1.log
,其余的进入 app2.log
,并且所有请求都记录到 app.log
。每个特定于 Web 应用程序的日志将只包含该 Web 应用程序的日志条目,并且请求信息将在日志中一致地显示(即,每个虚拟请求中的信息将始终在日志行中一起出现)。以下 shell 输出说明了这一点
~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
153 app1.log
147 app2.log
300 app.log
600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim 192.168.2.20 GET Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__ fred 192.168.2.22 GET Request processing started
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred 192.168.2.22 GET Hello from webapplib!
Thread-6 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147
在处理程序中添加上下文信息¶
每个 Handler
都有自己的过滤器链。如果您想在不将其泄露给其他处理程序的情况下向 LogRecord
添加上下文信息,您可以使用一个过滤器,该过滤器返回一个新的 LogRecord
,而不是就地修改它,如以下脚本所示
import copy
import logging
def filter(record: logging.LogRecord):
record = copy.copy(record)
record.user = 'jim'
return record
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s from %(user)-8s')
handler.setFormatter(formatter)
handler.addFilter(filter)
logger.addHandler(handler)
logger.info('A log message')
从多个进程记录到单个文件¶
虽然日志记录是线程安全的,并且从单个进程中的多个线程记录到单个文件是支持的,但从多个进程记录到单个文件是不支持的,因为在 Python 中没有标准方法可以跨多个进程序列化对单个文件的访问。如果您需要从多个进程记录到单个文件,一种方法是让所有进程都记录到 SocketHandler
,并让一个单独的进程实现一个套接字服务器,该服务器从套接字读取并记录到文件。(如果您愿意,您可以在现有进程中的一个线程中专门执行此功能。) 本节更详细地介绍了这种方法,并包含一个可工作的套接字接收器,您可以将其用作在自己的应用程序中进行调整的起点。
您也可以编写自己的处理程序,该处理程序使用 Lock
类从 multiprocessing
模块序列化对文件的访问。现有的 FileHandler
和子类目前没有使用 multiprocessing
,尽管它们将来可能会这样做。请注意,目前,multiprocessing
模块在所有平台上都没有提供可用的锁功能(请参阅 https://bugs.python.org/issue3770)。
或者,您可以使用 Queue
和 QueueHandler
将所有日志事件发送到多进程应用程序中的一个进程。以下示例脚本演示了如何执行此操作;在示例中,一个单独的监听器进程监听其他进程发送的事件,并根据其自身的日志配置对其进行记录。虽然示例只演示了一种方法(例如,您可能希望使用监听器线程而不是单独的监听器进程 - 实现将类似),但它确实允许监听器和其他进程在您的应用程序中具有完全不同的日志配置,并且可以用作满足您自己特定需求的代码的基础。
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上述脚本的一个变体将日志记录保留在主进程中,在一个单独的线程中。
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
此变体展示了如何例如为特定记录器应用配置 - 例如 foo
记录器有一个特殊的处理程序,它将 foo
子系统中的所有事件存储在文件 mplog-foo.log
中。这将被主进程中的日志记录机制使用(即使日志记录事件是在工作进程中生成的)来将消息定向到适当的目标。
使用 concurrent.futures.ProcessPoolExecutor¶
如果您想使用 concurrent.futures.ProcessPoolExecutor
启动您的工作进程,您需要以稍微不同的方式创建队列。而不是
queue = multiprocessing.Queue(-1)
您应该使用
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然后您可以将工作进程创建从这个
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
更改为这个(记住首先导入 concurrent.futures
)
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
使用 Gunicorn 和 uWSGI 部署 Web 应用程序¶
在使用 Gunicorn 或 uWSGI(或类似工具)部署 Web 应用程序时,会创建多个工作进程来处理客户端请求。在这样的环境中,避免在您的 Web 应用程序中直接创建基于文件的处理程序。相反,使用 SocketHandler
从 Web 应用程序记录到单独进程中的监听器。这可以使用 Supervisor 等进程管理工具进行设置 - 有关更多详细信息,请参阅 在生产环境中运行日志记录套接字监听器。
使用文件轮换¶
有时您希望让日志文件增长到一定大小,然后打开一个新文件并记录到该文件。您可能希望保留一定数量的这些文件,当创建了这么多文件时,轮换这些文件,以便文件的数量和文件的大小都保持在一定范围内。对于这种使用模式,日志记录包提供了一个 RotatingFileHandler
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
结果应该是 6 个单独的文件,每个文件都包含应用程序日志历史记录的一部分。
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
最新的文件始终是 logging_rotatingfile_example.out
,并且每次它达到大小限制时,它都会使用后缀 .1
重命名。每个现有的备份文件都被重命名以增加后缀(.1
变为 .2
等),并且 .6
文件被删除。
显然,此示例将日志长度设置得过小,只是一个极端的例子。您需要将 maxBytes 设置为适当的值。
使用替代格式样式¶
当日志记录被添加到 Python 标准库时,格式化包含可变内容的消息的唯一方法是使用 %-格式化方法。从那时起,Python 获得了两种新的格式化方法:string.Template
(在 Python 2.4 中添加)和 str.format()
(在 Python 2.6 中添加)。
日志记录(截至 3.2 版本)对这两种额外的格式化样式提供了更好的支持。 Formatter
类已增强,可以接受一个额外的可选关键字参数,名为 style
。默认值为 '%'
,但其他可能的值为 '{'
和 '$'
,它们对应于另外两种格式化样式。向后兼容性默认情况下得以保留(正如您所期望的那样),但通过显式指定样式参数,您可以指定与 str.format()
或 string.Template
一起使用的格式字符串。以下是一个示例控制台会话,展示了各种可能性
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
请注意,日志消息的最终输出格式与单个日志消息的构建方式完全独立。它仍然可以使用 %-格式化,如这里所示
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
日志记录调用(logger.debug()
、logger.info()
等)仅针对实际的日志消息本身接受位置参数,关键字参数仅用于确定如何处理实际日志记录调用的选项(例如,exc_info
关键字参数表示应记录跟踪信息,或者 extra
关键字参数表示要添加到日志中的其他上下文信息)。因此,您不能直接使用 str.format()
或 string.Template
语法进行日志记录调用,因为在内部,日志记录包使用 %-格式化来合并格式字符串和可变参数。在保留向后兼容性的同时,无法更改这一点,因为所有现有的代码中存在的日志记录调用都将使用 %-格式字符串。
但是,您可以使用 {} 和 $-格式化来构建您的单个日志消息。请记住,对于消息,您可以使用任意对象作为消息格式字符串,并且日志记录包将对该对象调用 str()
以获取实际的格式字符串。请考虑以下两个类
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
这两个类都可以用作格式字符串的替代,以允许使用 {} 或 $-格式化来构建实际的“消息”部分,该部分出现在格式化日志输出中,以代替“%(message)s”或“{message}”或“$message”。在您想要记录某些内容时使用类名有点笨拙,但如果您使用别名,例如 __(双下划线 - 不要与 _ 混淆,_ 是 gettext.gettext()
或其同类的同义词/别名),则会变得非常容易接受。
上面的类不包含在 Python 中,尽管它们很容易复制粘贴到您自己的代码中。它们可以按如下方式使用(假设它们在名为 wherever
的模块中声明)
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
虽然上面的示例使用 print()
来展示格式化的工作原理,但您当然可以使用 logger.debug()
或类似方法来实际使用这种方法进行日志记录。
需要注意的是,使用这种方法不会造成明显的性能损失:实际的格式化不是在您进行日志记录调用时发生的,而是在(如果)日志消息实际上要由处理程序输出到日志时发生的。因此,唯一可能让您感到困惑的是,圆括号围绕着格式字符串和参数,而不仅仅是格式字符串。这是因为 __ 符号只是对 XXXMessage
类之一的构造函数调用的语法糖。
如果您愿意,可以使用 LoggerAdapter
来实现与上述类似的效果,如下例所示
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger.log(level, Message(msg, args), **kwargs,
stacklevel=stacklevel+1)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
上面的脚本在使用 Python 3.8 或更高版本运行时,应该记录消息 Hello, world!
。
自定义 LogRecord
¶
每个日志事件都由一个 LogRecord
实例表示。当记录事件且未被记录器的级别过滤掉时,会创建一个 LogRecord
,并填充有关该事件的信息,然后将其传递给该记录器的处理程序(及其祖先,直到且包括禁用进一步向上传播层次结构的记录器)。在 Python 3.2 之前,只有两个地方会进行此创建
Logger.makeRecord()
,它在记录事件的正常过程中被调用。这直接调用了LogRecord
来创建一个实例。makeLogRecord()
,它使用包含要添加到 LogRecord 的属性的字典调用。这通常在通过网络收到合适的字典时调用(例如,通过SocketHandler
以 pickle 格式,或通过HTTPHandler
以 JSON 格式)。
这通常意味着,如果您需要对 LogRecord
做任何特殊操作,您必须执行以下操作之一。
创建您自己的
Logger
子类,它覆盖了Logger.makeRecord()
,并在您关心的任何记录器实例化之前使用setLoggerClass()
设置它。
第一种方法在以下情况下会有点笨拙(例如)几个不同的库想要做不同的事情。每个库都会尝试设置自己的 Logger
子类,而最后执行此操作的库将获胜。
第二种方法在许多情况下都能很好地工作,但它不允许您例如使用 LogRecord
的专用子类。库开发人员可以在其记录器上设置合适的过滤器,但他们必须记住每次引入新记录器时都要这样做(他们只需添加新包或模块并执行
logger = logging.getLogger(__name__)
(在模块级别)。这可能有点过于复杂了。开发人员也可以将过滤器添加到一个 NullHandler
,该过滤器附加到他们的顶级记录器,但这不会在应用程序开发人员将处理程序附加到更低级别的库记录器时被调用 - 因此来自该处理程序的输出不会反映库开发人员的意图。
在 Python 3.2 及更高版本中,LogRecord
的创建是通过一个工厂完成的,你可以指定它。工厂只是一个可调用对象,你可以使用 setLogRecordFactory()
设置它,并使用 getLogRecordFactory()
查询它。工厂使用与 LogRecord
构造函数相同的签名调用,因为 LogRecord
是工厂的默认设置。
这种方法允许自定义工厂控制 LogRecord 创建的所有方面。例如,你可以返回一个子类,或者在创建记录后只添加一些额外的属性,使用类似于以下模式
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
这种模式允许不同的库将工厂链接在一起,只要它们不覆盖彼此的属性或意外地覆盖作为标准提供的属性,就不会出现任何意外。但是,应该记住,链中的每个环节都会为所有日志记录操作增加运行时开销,并且这种技术只应在使用 Filter
不能提供所需结果时使用。
子类化 QueueHandler 和 QueueListener - 一个 ZeroMQ 示例¶
子类化 QueueHandler
¶
你可以使用 QueueHandler
子类将消息发送到其他类型的队列,例如 ZeroMQ 的“发布”套接字。在下面的示例中,套接字是单独创建的,并传递给处理程序(作为其“队列”)
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
当然,还有其他方法可以组织它,例如将处理程序创建套接字所需的数据传递进来
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子类化 QueueListener
¶
你也可以子类化 QueueListener
从其他类型的队列接收消息,例如 ZeroMQ 的“订阅”套接字。以下是一个示例
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
子类化 QueueHandler 和 QueueListener - 一个 pynng
示例¶
与上一节类似,我们可以使用 pynng 实现侦听器和处理程序,它是一个 Python 绑定到 NNG,被誉为 ZeroMQ 的精神继承者。以下代码片段说明了这一点 - 你可以在安装了 pynng
的环境中测试它们。为了多样性,我们首先展示侦听器。
子类化 QueueListener
¶
# listener.py
import json
import logging
import logging.handlers
import pynng
DEFAULT_ADDR = "tcp://localhost:13232"
interrupted = False
class NNGSocketListener(logging.handlers.QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
# Have a timeout for interruptability, and open a
# subscriber socket
socket = pynng.Sub0(listen=uri, recv_timeout=500)
# The b'' subscription matches all topics
topics = kwargs.pop('topics', None) or b''
socket.subscribe(topics)
# We treat the socket as a queue
super().__init__(socket, *handlers, **kwargs)
def dequeue(self, block):
data = None
# Keep looping while not interrupted and no data received over the
# socket
while not interrupted:
try:
data = self.queue.recv(block=block)
break
except pynng.Timeout:
pass
except pynng.Closed: # sometimes happens when you hit Ctrl-C
break
if data is None:
return None
# Get the logging event sent from a publisher
event = json.loads(data.decode('utf-8'))
return logging.makeLogRecord(event)
def enqueue_sentinel(self):
# Not used in this implementation, as the socket isn't really a
# queue
pass
logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
while True:
pass
except KeyboardInterrupt:
interrupted = True
finally:
listener.stop()
子类化 QueueHandler
¶
# sender.py
import json
import logging
import logging.handlers
import time
import random
import pynng
DEFAULT_ADDR = "tcp://localhost:13232"
class NNGSocketHandler(logging.handlers.QueueHandler):
def __init__(self, uri):
socket = pynng.Pub0(dial=uri, send_timeout=500)
super().__init__(socket)
def enqueue(self, record):
# Send the record as UTF-8 encoded JSON
d = dict(record.__dict__)
data = json.dumps(d)
self.queue.send(data.encode('utf-8'))
def close(self):
self.queue.close()
logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# Make sure the process ID is in the output
logging.basicConfig(level=logging.DEBUG,
handlers=[logging.StreamHandler(), handler],
format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
# Just randomly select some loggers and levels and log away
level = random.choice(levels)
logger = logging.getLogger(random.choice(logger_names))
logger.log(level, 'Message no. %5d' % msgno)
msgno += 1
delay = random.random() * 2 + 0.5
time.sleep(delay)
您可以在单独的命令行中运行以上两个代码片段。如果我们在一个 shell 中运行监听器,在另外两个 shell 中运行发送器,我们应该会看到类似以下内容。在第一个发送器 shell 中
$ python sender.py
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
CRITICAL myapp.lib2 613 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
DEBUG myapp 613 Message no. 6
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
(and so on)
在第二个发送器 shell 中
$ python sender.py
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
DEBUG myapp.lib1 657 Message no. 8
(and so on)
在监听器 shell 中
$ python listener.py
Press Ctrl-C to stop.
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 613 Message no. 3
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
DEBUG myapp 613 Message no. 6
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
DEBUG myapp.lib1 657 Message no. 8
(and so on)
如您所见,来自两个发送器进程的日志在监听器的输出中交织在一起。
基于字典的配置示例¶
下面是一个日志配置字典的示例 - 它取自Django 项目的文档。此字典传递给dictConfig()
以使配置生效
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'simple',
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers': ['console'],
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有关此配置的更多信息,您可以查看 Django 文档的相关部分。
使用 rotator 和 namer 自定义日志轮转处理¶
以下可运行脚本给出了如何定义 namer 和 rotator 的示例,它展示了日志文件的 gzip 压缩
import gzip
import logging
import logging.handlers
import os
import shutil
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
root.info(f'Message no. {i + 1}')
运行完此脚本后,您将看到六个新文件,其中五个被压缩了
$ ls rotated.log*
rotated.log rotated.log.2.gz rotated.log.4.gz
rotated.log.1.gz rotated.log.3.gz rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998
更详细的多进程示例¶
以下工作示例展示了如何使用配置文件将日志记录与多进程一起使用。这些配置相当简单,但用于说明如何在实际的多进程场景中实现更复杂的配置。
在示例中,主进程生成一个监听器进程和一些工作进程。主进程、监听器和工作进程每个都有三个单独的配置(所有工作进程共享相同的配置)。我们可以看到主进程中的日志记录,工作进程如何将日志记录到 QueueHandler,以及监听器如何实现 QueueListener 和更复杂的日志记录配置,并安排通过队列接收的事件分派到配置中指定的处理程序。请注意,这些配置纯粹是说明性的,但您应该能够将此示例调整到您自己的场景中。
这是脚本 - 文档字符串和注释希望解释了它的工作原理
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
在发送到 SysLogHandler 的消息中插入 BOM¶
RFC 5424 要求将 Unicode 消息发送到 syslog 守护进程作为一组字节,这些字节具有以下结构:可选的纯 ASCII 组件,后跟 UTF-8 字节顺序标记 (BOM),后跟使用 UTF-8 编码的 Unicode。(参见规范的相关部分。)
在 Python 3.1 中,代码被添加到 SysLogHandler
中以在消息中插入 BOM,但不幸的是,它实现不正确,BOM 出现在消息的开头,因此不允许任何纯 ASCII 组件出现在它之前。
由于此行为已损坏,因此不正确的 BOM 插入代码已从 Python 3.2.4 及更高版本中删除。但是,它没有被替换,如果您想生成RFC 5424 兼容的消息,其中包含 BOM、它之前的可选纯 ASCII 序列以及使用 UTF-8 编码的任意 Unicode,那么您需要执行以下操作
将
Formatter
实例附加到您的SysLogHandler
实例,使用以下格式字符串'ASCII section\ufeffUnicode section'
Unicode 代码点 U+FEFF 在使用 UTF-8 编码时,将被编码为 UTF-8 BOM - 字节串
b'\xef\xbb\xbf'
。用您喜欢的任何占位符替换 ASCII 部分,但请确保替换后出现在其中的数据始终是 ASCII(这样,它在 UTF-8 编码后将保持不变)。
用您喜欢的任何占位符替换 Unicode 部分;如果替换后出现在其中的数据包含 ASCII 范围之外的字符,那也没关系 - 它将使用 UTF-8 编码。
格式化的消息将使用 UTF-8 编码由 SysLogHandler
编码。如果您遵循上述规则,您应该能够生成符合 RFC 5424 的消息。如果您没有,日志记录可能不会抱怨,但您的消息将不符合 RFC 5424,并且您的 syslog 守护进程可能会抱怨。
实现结构化日志记录¶
虽然大多数日志消息旨在供人类阅读,因此不容易被机器解析,但可能存在您希望以结构化格式输出消息的情况,该格式能够被程序解析(无需使用复杂的正则表达式来解析日志消息)。使用日志记录包可以轻松实现这一点。有许多方法可以实现这一点,但以下是一种使用 JSON 以机器可解析的方式序列化事件的简单方法
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
如果运行上述脚本,它将打印
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
请注意,项目的顺序可能因使用的 Python 版本而异。
如果您需要更专业的处理,可以使用自定义 JSON 编码器,如下面的完整示例所示
import json
import logging
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, str):
return o.encode('unicode_escape').decode('ascii')
return super().default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
当运行上述脚本时,它将打印
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
请注意,项目的顺序可能因使用的 Python 版本而异。
使用 dictConfig()
自定义处理程序¶
有时您想以特定方式自定义日志记录处理程序,如果您使用 dictConfig()
,您可能能够在不进行子类化的情况下做到这一点。例如,您可能希望设置日志文件的拥有者。在 POSIX 上,这很容易使用 shutil.chown()
完成,但 stdlib 中的文件处理程序不提供内置支持。您可以使用以下简单的函数自定义处理程序创建
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然后,您可以在传递给 dictConfig()
的日志记录配置中指定通过调用此函数来创建日志记录处理程序
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
在这个例子中,我使用 pulse
用户和组设置所有权,仅用于说明目的。将它组合成一个工作脚本,chowntest.py
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
要运行它,您可能需要以 root
身份运行
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
请注意,此示例使用 Python 3.3,因为这是 shutil.chown()
出现的地方。这种方法应该适用于任何支持 dictConfig()
的 Python 版本 - 即 Python 2.7、3.2 或更高版本。对于 3.3 之前的版本,您需要使用例如 os.chown()
来实现实际的所有权更改。
在实践中,创建处理程序的函数可能位于项目中的某个实用程序模块中。而不是配置中的行
'()': owned_file_handler,
您可以使用例如
'()': 'ext://project.util.owned_file_handler',
其中 project.util
可以替换为函数所在的包的实际名称。在上面的工作脚本中,使用 'ext://__main__.owned_file_handler'
应该可以工作。在这里,实际的可调用对象由 dictConfig()
从 ext://
规范中解析。
此示例希望也指明了如何以相同的方式使用 os.chmod()
实现其他类型的文件更改 - 例如设置特定的 POSIX 权限位。
当然,这种方法也可以扩展到除 FileHandler
之外的其他类型的处理程序 - 例如,其中一个旋转文件处理程序,或完全不同的处理程序类型。
在整个应用程序中使用特定的格式样式¶
在 Python 3.2 中,Formatter
获得了一个 style
关键字参数,虽然默认值为 %
以保持向后兼容性,但它允许指定 {
或 $
来支持 str.format()
和 string.Template
支持的格式化方法。请注意,这控制了日志消息的最终输出到日志的格式,并且与单个日志消息的构建方式完全正交。
日志调用(debug()
、info()
等)只接受实际日志消息本身的位置参数,关键字参数仅用于确定如何处理日志调用的选项(例如,exc_info
关键字参数指示应记录跟踪信息,或 extra
关键字参数指示要添加到日志中的其他上下文信息)。因此,您不能直接使用 str.format()
或 string.Template
语法进行日志调用,因为在内部,日志包使用 %-格式化来合并格式字符串和可变参数。在保持向后兼容性的同时,不会改变这一点,因为所有现有的代码中的日志调用都将使用 %-格式字符串。
有人建议将格式样式与特定记录器相关联,但这种方法也会遇到向后兼容性问题,因为任何现有代码都可能使用给定的记录器名称并使用 %-格式化。
为了使日志记录在任何第三方库和您的代码之间可互操作,需要在单个日志调用级别做出有关格式化的决定。这为适应替代格式样式提供了几种方法。
使用 LogRecord 工厂¶
在 Python 3.2 中,除了上面提到的 Formatter
的变化之外,logging 包还获得了允许用户设置自己的 LogRecord
子类的能力,使用 setLogRecordFactory()
函数。您可以使用它来设置您自己的 LogRecord
子类,通过覆盖 getMessage()
方法来实现正确的事情。该方法的基类实现是 msg % args
格式化发生的地方,也是您可以替换您的备用格式的地方;但是,您应该注意支持所有格式化样式并允许 %-格式化作为默认格式,以确保与其他代码的互操作性。还应注意调用 str(self.msg)
,就像基类实现一样。
有关更多信息,请参阅 setLogRecordFactory()
和 LogRecord
的参考文档。
使用自定义消息对象¶
还有另一种可能更简单的方法,您可以使用 {} 和 $-格式化来构建您的单个日志消息。您可能还记得(从 使用任意对象作为消息)在记录时可以使用任意对象作为消息格式字符串,并且 logging 包将调用 str()
该对象以获取实际的格式字符串。考虑以下两个类
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
这两个类都可以用来代替格式字符串,允许使用 {} 或 $-格式化来构建实际的“消息”部分,该部分出现在格式化的日志输出中,以代替“%(message)s”或“{message}”或“$message”。如果您发现每次想要记录某些内容时使用类名有点笨拙,您可以使用别名(如 M
或 _
)来表示消息(或者可能是 __
,如果您使用 _
用于本地化)。
下面给出这种方法的示例。首先,使用 str.format()
进行格式化
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
其次,使用 string.Template
进行格式化
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
需要注意的是,这种方法不会带来明显的性能损失:实际的格式化不是在您进行记录调用时发生的,而是在(如果)日志消息实际上要由处理程序输出到日志时发生的。因此,唯一可能让您感到困惑的是,括号围绕着格式字符串和参数,而不仅仅是格式字符串。这是因为 __ 符号只是对上面显示的 XXXMessage
类之一的构造函数调用的语法糖。
使用 dictConfig()
配置过滤器¶
您可以使用 dictConfig()
配置过滤器,尽管乍一看可能不太明显如何操作(因此有了这个食谱)。由于 Filter
是标准库中包含的唯一过滤器类,并且不太可能满足许多需求(它只是作为基类存在),您通常需要定义自己的 Filter
子类,并覆盖 filter()
方法。为此,请在过滤器配置字典中指定 ()
键,并指定一个可调用对象,该对象将用于创建过滤器(类是最明显的,但您可以提供任何返回 Filter
实例的可调用对象)。以下是一个完整的示例
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
此示例展示了如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。运行后,上面的脚本将打印
changed: hello
这表明过滤器按配置工作。
需要注意的几个额外要点
如果您无法在配置中直接引用可调用对象(例如,如果它位于不同的模块中,并且您无法在配置字典所在的模块中直接导入它),您可以使用
ext://...
形式,如 访问外部对象 中所述。例如,您可以在上面的示例中使用文本'ext://__main__.MyFilter'
而不是MyFilter
。除了过滤器之外,此技术还可以用于配置自定义处理程序和格式化程序。有关日志记录如何支持在其配置中使用用户定义对象的更多信息,请参阅 用户定义的对象,并参阅上面的另一个食谱 使用 dictConfig() 自定义处理程序。
自定义异常格式化¶
有时您可能希望进行自定义异常格式化 - 为了论证起见,假设您希望每个记录的事件只有一行,即使存在异常信息。您可以使用自定义格式化程序类来实现,如以下示例所示
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super().formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super().format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
运行后,这将生成一个只有两行的文件
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
虽然上面的处理方式很简单,但它指明了如何根据您的喜好格式化异常信息。对于更专业的需求,traceback
模块可能会有所帮助。
语音日志消息¶
在某些情况下,可能希望以可听的方式而不是以可见的方式呈现日志消息。如果您在系统中可以使用文本转语音 (TTS) 功能,即使它没有 Python 绑定,这也很容易做到。大多数 TTS 系统都有一个可以运行的命令行程序,并且可以使用 subprocess
从处理程序中调用它。这里假设 TTS 命令行程序不会期望与用户交互或花费很长时间才能完成,并且日志消息的频率不会高到用消息淹没用户,并且可以接受一次说出一条消息而不是同时说出一条消息。下面的示例实现等待一条消息说完后再处理下一条消息,这可能会导致其他处理程序处于等待状态。以下是一个简短的示例展示了这种方法,该方法假设 espeak
TTS 包可用
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
运行后,此脚本应该用女声说出“Hello”,然后说出“Goodbye”。
当然,上述方法可以适用于其他 TTS 系统,甚至可以适用于其他完全不同的系统,这些系统可以通过从命令行运行的外部程序处理消息。
缓冲日志消息并有条件地输出它们¶
在某些情况下,您可能希望将日志消息记录到临时区域,并且仅在满足特定条件时才输出它们。例如,您可能希望在函数中开始记录调试事件,如果函数在没有错误的情况下完成,您不希望用收集的调试信息来使日志混乱,但是如果有错误,您希望所有调试信息以及错误都输出。
以下是一个示例,它展示了如何使用您希望日志记录以这种方式执行的函数的装饰器来实现这一点。它使用了 logging.handlers.MemoryHandler
,它允许缓冲记录的事件,直到满足某些条件,此时缓冲的事件被 flushed
- 传递给另一个处理程序(target
处理程序)进行处理。默认情况下,MemoryHandler
在其缓冲区填满或看到级别大于或等于指定阈值的事件时刷新。如果您希望自定义刷新行为,可以使用此配方与 MemoryHandler
的更专门的子类一起使用。
示例脚本有一个简单的函数 foo
,它只是循环遍历所有日志记录级别,写入 sys.stderr
以说明它将要记录的级别,然后实际记录该级别的消息。您可以将参数传递给 foo
,如果为真,它将记录 ERROR 和 CRITICAL 级别 - 否则,它只记录 DEBUG、INFO 和 WARNING 级别。
该脚本只是安排用一个装饰器来装饰 foo
,该装饰器将执行所需的条件日志记录。装饰器将记录器作为参数,并在调用装饰函数期间附加内存处理程序。装饰器可以使用目标处理程序、应发生刷新的级别以及缓冲区的容量(缓冲的记录数量)进行额外参数化。这些默认设置为 StreamHandler
,它写入 sys.stderr
、logging.ERROR
和 100
。
以下是脚本
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
运行此脚本时,应观察到以下输出
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如您所见,实际的日志记录输出仅在记录的事件的严重性为 ERROR 或更高时才会发生,但在这种情况下,任何先前较低严重性的事件也会被记录。
当然,您可以使用传统的装饰方式
@log_if_errors(logger)
def foo(fail=False):
...
将日志消息发送到电子邮件,并进行缓冲¶
为了说明如何通过电子邮件发送日志消息,以便每封电子邮件发送一定数量的消息,您可以对 BufferingHandler
进行子类化。在以下示例中,您可以根据自己的具体需求进行调整,提供了一个简单的测试工具,允许您使用命令行参数运行脚本,指定您通常需要通过 SMTP 发送内容所需的条件。(使用 -h
参数运行下载的脚本以查看必需和可选参数。)
import logging
import logging.handlers
import smtplib
class BufferingSMTPHandler(logging.handlers.BufferingHandler):
def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
subject, capacity):
logging.handlers.BufferingHandler.__init__(self, capacity)
self.mailhost = mailhost
self.mailport = port
self.username = username
self.password = password
self.fromaddr = fromaddr
if isinstance(toaddrs, str):
toaddrs = [toaddrs]
self.toaddrs = toaddrs
self.subject = subject
self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))
def flush(self):
if len(self.buffer) > 0:
try:
smtp = smtplib.SMTP(self.mailhost, self.mailport)
smtp.starttls()
smtp.login(self.username, self.password)
msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
for record in self.buffer:
s = self.format(record)
msg = msg + s + "\r\n"
smtp.sendmail(self.fromaddr, self.toaddrs, msg)
smtp.quit()
except Exception:
if logging.raiseExceptions:
raise
self.buffer = []
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
aa = ap.add_argument
aa('host', metavar='HOST', help='SMTP server')
aa('--port', '-p', type=int, default=587, help='SMTP port')
aa('user', metavar='USER', help='SMTP username')
aa('password', metavar='PASSWORD', help='SMTP password')
aa('to', metavar='TO', help='Addressee for emails')
aa('sender', metavar='SENDER', help='Sender email address')
aa('--subject', '-s',
default='Test Logging email from Python logging module (buffering)',
help='Subject of email')
options = ap.parse_args()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
h = BufferingSMTPHandler(options.host, options.port, options.user,
options.password, options.sender,
options.to, options.subject, 10)
logger.addHandler(h)
for i in range(102):
logger.info("Info index = %d", i)
h.flush()
h.close()
如果您运行此脚本并且您的 SMTP 服务器已正确设置,您应该会发现它向您指定的收件人发送了 11 封电子邮件。前十封电子邮件将分别包含十条日志消息,而第十一封将包含两条消息。这构成了脚本中指定的 102 条消息。
通过配置使用 UTC (GMT) 格式化时间¶
有时您希望使用 UTC 格式化时间,这可以使用诸如 UTCFormatter
之类的类来完成,如下所示
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然后,您可以在代码中使用 UTCFormatter
而不是 Formatter
。如果您想通过配置来实现,可以使用 dictConfig()
API,其方法由以下完整示例说明
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
运行此脚本时,它应该打印类似以下内容
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
显示时间如何以本地时间和 UTC 格式化,每个处理程序一个。
使用上下文管理器进行选择性日志记录¶
有时,临时更改日志记录配置并在执行某些操作后将其恢复将非常有用。为此,上下文管理器是保存和恢复日志记录上下文的最佳方法。以下是一个简单的上下文管理器示例,它允许您选择性地更改日志记录级别并在上下文管理器的范围内添加日志记录处理程序
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果您指定了级别值,则记录器的级别将在上下文管理器覆盖的 with 块的范围内设置为该值。如果您指定了处理程序,则它将在进入块时添加到记录器,并在退出块时从记录器中删除。您还可以要求管理器在块退出时为您关闭处理程序 - 如果您不再需要处理程序,可以这样做。
为了说明它的工作原理,我们可以将以下代码块添加到上面
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我们最初将记录器的级别设置为 INFO
,因此消息 #1 出现,而消息 #2 没有出现。然后,我们在接下来的 with
块中将级别临时更改为 DEBUG
,因此消息 #3 出现。块退出后,记录器的级别恢复为 INFO
,因此消息 #4 没有出现。在接下来的 with
块中,我们将级别再次设置为 DEBUG
,但还添加了一个写入 sys.stdout
的处理程序。因此,消息 #5 在控制台上出现两次(一次通过 stderr
,一次通过 stdout
)。在 with
语句完成之后,状态与之前相同,因此消息 #6 出现(与消息 #1 相同),而消息 #7 没有出现(与消息 #2 相同)。
如果我们运行生成的脚本,结果如下
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
如果我们再次运行它,但将 stderr
管道到 /dev/null
,我们会看到以下内容,这是唯一写入 stdout
的消息
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再次,但将 stdout
管道到 /dev/null
,我们得到
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在这种情况下,打印到 stdout
的消息 #5 不会出现,正如预期的那样。
当然,这里描述的方法可以推广,例如临时附加日志记录过滤器。请注意,上面的代码在 Python 2 和 Python 3 中都可以工作。
CLI 应用程序启动器模板¶
以下是一个示例,它展示了如何
根据命令行参数使用日志级别
将多个子命令分派到单独的文件中,所有子命令都以一致的方式在同一级别进行日志记录
使用简单、最小的配置
假设我们有一个命令行应用程序,其工作是停止、启动或重启一些服务。为了说明目的,这可以组织为一个文件 app.py
,它是应用程序的主脚本,各个命令在 start.py
、stop.py
和 restart.py
中实现。进一步假设我们希望通过命令行参数控制应用程序的详细程度,默认值为 logging.INFO
。以下是一种编写 app.py
的方法
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
而 start
、stop
和 restart
命令可以在单独的模块中实现,例如,以下是如何启动的
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
因此,停止
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
重启类似
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果我们使用默认日志级别运行此应用程序,我们会得到如下输出
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一个词是日志级别,第二个词是记录事件的位置的模块或包名称。
如果我们更改日志级别,那么我们可以更改发送到日志的信息。例如,如果我们想要更多信息
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
如果我们想要更少的信息
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
在这种情况下,命令不会向控制台打印任何内容,因为它们不会记录 WARNING
级别或更高的任何内容。
用于日志记录的 Qt GUI¶
一个经常出现的问题是如何将日志记录到 GUI 应用程序。 Qt 框架是一个流行的跨平台 UI 框架,使用 PySide2 或 PyQt5 库具有 Python 绑定。
以下示例展示了如何将日志记录到 Qt GUI。这引入了一个简单的 QtHandler
类,它接受一个可调用对象,该可调用对象应该是主线程中的一个槽,用于执行 GUI 更新。还创建了一个工作线程来展示如何从 UI 本身(通过用于手动日志记录的按钮)以及在后台执行工作的 worker 线程(这里,只是以随机级别记录消息,并在消息之间随机延迟很短的时间)向 GUI 记录日志。
工作线程使用 Qt 的 QThread
类实现,而不是 threading
模块,因为在某些情况下必须使用 QThread
,它提供了与其他 Qt
组件更好的集成。
该代码应该适用于 PySide6
、PyQt6
、PySide2
或 PyQt5
的最新版本。您应该能够将这种方法调整为早期版本的 Qt。有关更详细的信息,请参阅代码片段中的注释。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between different Qt packages
try:
from PySide6 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
try:
from PyQt6 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
except ImportError:
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
try:
if random.random() < 0.1:
raise ValueError('Exception raised: %d' % i)
else:
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
except ValueError as e:
logger.exception('Failed: %s', e, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super().__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
if hasattr(f, 'Monospace'):
f.setStyleHint(f.Monospace)
else:
f.setStyleHint(f.StyleHint.Monospace) # for Qt6
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
if hasattr(app, 'exec'):
rc = app.exec()
else:
rc = app.exec_()
sys.exit(rc)
if __name__=='__main__':
main()
使用 RFC5424 支持将日志记录到 syslog¶
尽管 RFC 5424 的日期是 2009 年,但大多数 syslog 服务器默认配置为使用较旧的 RFC 3164,该协议的日期是 2001 年。当 logging
在 2003 年添加到 Python 时,它支持当时较早(也是唯一存在的)协议。自从 RFC5424 发布以来,由于它在 syslog 服务器中没有得到广泛部署,因此 SysLogHandler
功能没有更新。
RFC 5424 包含一些有用的功能,例如对结构化数据的支持,如果您需要能够将日志记录到支持它的 syslog 服务器,您可以使用类似于以下内容的子类化处理程序来实现
import datetime
import logging.handlers
import re
import socket
import time
class SysLogHandler5424(logging.handlers.SysLogHandler):
tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
escaped = re.compile(r'([\]"\\])')
def __init__(self, *args, **kwargs):
self.msgid = kwargs.pop('msgid', None)
self.appname = kwargs.pop('appname', None)
super().__init__(*args, **kwargs)
def format(self, record):
version = 1
asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
m = self.tz_offset.match(time.strftime('%z'))
has_offset = False
if m and time.timezone:
hrs, mins = m.groups()
if int(hrs) or int(mins):
has_offset = True
if not has_offset:
asctime += 'Z'
else:
asctime += f'{hrs}:{mins}'
try:
hostname = socket.gethostname()
except Exception:
hostname = '-'
appname = self.appname or '-'
procid = record.process
msgid = '-'
msg = super().format(record)
sdata = '-'
if hasattr(record, 'structured_data'):
sd = record.structured_data
# This should be a dict where the keys are SD-ID and the value is a
# dict mapping PARAM-NAME to PARAM-VALUE (refer to the RFC for what these
# mean)
# There's no error checking here - it's purely for illustration, and you
# can adapt this code for use in production environments
parts = []
def replacer(m):
g = m.groups()
return '\\' + g[0]
for sdid, dv in sd.items():
part = f'[{sdid}'
for k, v in dv.items():
s = str(v)
s = self.escaped.sub(replacer, s)
part += f' {k}="{s}"'
part += ']'
parts.append(part)
sdata = ''.join(parts)
return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'
您需要熟悉 RFC 5424 才能完全理解上面的代码,并且您可能会有略微不同的需求(例如,如何将结构化数据传递给日志)。但是,以上内容应该可以适应您的特定需求。使用上面的处理程序,您可以使用类似以下内容传递结构化数据
sd = {
'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)
如何将记录器视为输出流¶
有时,您需要与期望文件类对象写入的第三方 API 交互,但您希望将 API 的输出定向到记录器。您可以使用将记录器包装在文件类 API 中的类来做到这一点。以下是一个简短的脚本,说明了这样的类
import logging
class LoggerWriter:
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
if message != '\n': # avoid printing bare newlines, if you like
self.logger.log(self.level, message)
def flush(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation
pass
def close(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation. You might want
# to set a flag so that later calls to write raise an exception
pass
def main():
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('demo')
info_fp = LoggerWriter(logger, logging.INFO)
debug_fp = LoggerWriter(logger, logging.DEBUG)
print('An INFO message', file=info_fp)
print('A DEBUG message', file=debug_fp)
if __name__ == "__main__":
main()
运行此脚本时,它将打印
INFO:demo:An INFO message
DEBUG:demo:A DEBUG message
您还可以使用 LoggerWriter
通过以下操作重定向 sys.stdout
和 sys.stderr
import sys
sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)
您应该在为您的需求配置日志记录之后执行此操作。在上面的示例中,basicConfig()
调用执行此操作(使用 sys.stderr
值在被 LoggerWriter
实例覆盖之前)。然后,您将获得这种结果
>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>
当然,上面的示例显示了根据 basicConfig()
使用的格式进行的输出,但您可以在配置日志记录时使用不同的格式化程序。
请注意,使用上述方案,您在一定程度上受制于缓冲和您正在拦截的写入调用的顺序。例如,使用上面的 LoggerWriter
定义,如果您有以下代码段
sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0
那么运行脚本会导致
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>
WARNING:demo:
WARNING:demo:main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main
WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero
如您所见,此输出并不理想。这是因为写入 sys.stderr
的底层代码执行多次写入,每次写入都会导致单独的日志记录行(例如,上面的最后三行)。为了解决这个问题,您需要缓冲内容,并且只有在看到换行符时才输出日志行。让我们使用 LoggerWriter
的稍微更好的实现
class BufferingLoggerWriter(LoggerWriter):
def __init__(self, logger, level):
super().__init__(logger, level)
self.buffer = ''
def write(self, message):
if '\n' not in message:
self.buffer += message
else:
parts = message.split('\n')
if self.buffer:
s = self.buffer + parts.pop(0)
self.logger.log(self.level, s)
self.buffer = parts.pop()
for part in parts:
self.logger.log(self.level, part)
这只是缓冲内容,直到看到换行符,然后记录完整的行。使用这种方法,您可以获得更好的输出
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo: main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo: 1/0
WARNING:demo:ZeroDivisionError: division by zero
要避免的模式¶
尽管前面的部分描述了您可能需要执行或处理的操作方法,但值得一提的是一些无用的使用模式,因此在大多数情况下应避免这些模式。以下部分没有特定的顺序。
多次打开同一个日志文件¶
在 Windows 上,您通常无法多次打开同一个文件,因为这会导致“文件正在被另一个进程使用”错误。但是,在 POSIX 平台上,如果您多次打开同一个文件,您不会收到任何错误。这可能是意外发生的,例如通过
多次添加引用同一个文件的日志记录器处理程序(例如,通过复制/粘贴/忘记更改错误)。
打开两个看起来不同的文件,因为它们具有不同的名称,但实际上是相同的,因为其中一个是另一个文件的符号链接。
派生一个进程,之后父进程和子进程都引用同一个文件。这可能是通过使用
multiprocessing
模块实现的,例如。
多次打开文件可能看起来在大多数情况下都能正常工作,但在实践中会导致许多问题
日志输出可能出现乱码,因为多个线程或进程尝试写入同一个文件。虽然日志记录可以防止多个线程同时使用同一个处理程序实例,但如果两个不同的线程使用两个不同的处理程序实例(恰好指向同一个文件)尝试同时写入,则没有这样的保护。
尝试删除文件(例如在文件轮转期间)会静默失败,因为存在另一个指向它的引用。这会导致混乱和浪费调试时间 - 日志条目最终出现在意外的位置,或者完全丢失。或者,应该移动的文件仍然保留在原位,并且即使启用了基于大小的轮转,文件的大小也会意外地增长。
使用从多个进程写入单个文件中概述的技术来规避此类问题。
将记录器用作类中的属性或将其作为参数传递¶
虽然可能存在需要这样做的情况,但通常没有必要,因为记录器是单例。代码始终可以使用logging.getLogger(name)
按名称访问给定的记录器实例,因此传递实例和将其作为实例属性毫无意义。请注意,在 Java 和 C# 等其他语言中,记录器通常是静态类属性。但是,这种模式在 Python 中没有意义,因为模块(而不是类)是软件分解的单位。
在库中向记录器添加除NullHandler
之外的处理程序¶
通过添加处理程序、格式化程序和过滤器来配置日志记录是应用程序开发人员的责任,而不是库开发人员的责任。如果您维护一个库,请确保您不会向任何记录器添加除NullHandler
实例之外的任何处理程序。
创建大量记录器¶
记录器是单例,在脚本执行期间永远不会释放,因此创建大量记录器会占用内存,这些内存无法释放。与其为每个处理的文件或建立的网络连接创建记录器,不如使用现有机制将上下文信息传递到日志中,并将创建的记录器限制为描述应用程序内区域的记录器(通常是模块,但有时比这更细粒度)。
其他资源¶
另请参见
- 模块
logging
日志记录模块的 API 参考。
- 模块
logging.config
日志记录模块的配置 API。
- 模块
logging.handlers
日志记录模块中包含的有用处理程序。