threading — 基于线程的并行

源代码: Lib/threading.py


此模块在低级 _thread 模块之上构建了更高级的线程接口。

可用性:非 WASI。

此模块在 WebAssembly 上不起作用或不可用。有关更多信息,请参阅 WebAssembly 平台

引言

threading 模块提供了一种在单个进程中并发运行多个线程(进程的更小单元)的方法。它允许创建和管理线程,从而可以并行执行任务并共享内存空间。当任务是 I/O 密集型时(例如文件操作或网络请求,大部分时间都在等待外部资源),线程特别有用。

threading 的典型用例包括管理一个工作线程池,这些工作线程可以并发处理多个任务。以下是使用 Thread 创建和启动线程的基本示例:

import threading
import time

def crawl(link, delay=3):
    print(f"crawl started for {link}")
    time.sleep(delay)  # Blocking I/O (simulating a network request)
    print(f"crawl ended for {link}")

links = [
    "https://pythonlang.cn",
    "https://docs.pythonlang.cn",
    "https://peps.pythonlang.cn",
]

# Start threads for each link
threads = []
for link in links:
    # Using `args` to pass positional arguments and `kwargs` for keyword arguments
    t = threading.Thread(target=crawl, args=(link,), kwargs={"delay": 2})
    threads.append(t)

# Start each thread
for t in threads:
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()

版本 3.7 中的变化: 该模块以前是可选的,现在始终可用。

参见

concurrent.futures.ThreadPoolExecutor 提供了一个更高级的接口,可以将任务推送到后台线程而不会阻塞调用线程的执行,同时仍然可以在需要时检索其结果。

queue 提供了一个线程安全接口,用于在运行中的线程之间交换数据。

asyncio 提供了一种替代方法来实现任务级别的并发,而无需使用多个操作系统线程。

备注

在 Python 2.x 系列中,此模块包含一些方法和函数的 camelCase 名称。从 Python 3.10 开始,这些名称已被弃用,但为了兼容 Python 2.5 及更低版本,它们仍然受支持。

CPython 实现细节: 在 CPython 中,由于 全局解释器锁,一次只能有一个线程执行 Python 代码(尽管某些注重性能的库可能会克服此限制)。如果希望应用程序更好地利用多核机器的计算资源,建议使用 multiprocessingconcurrent.futures.ProcessPoolExecutor。但是,如果要同时运行多个 I/O 密集型任务,线程仍然是一个合适的模型。

GIL 和性能考量

与使用单独进程来绕过 全局解释器锁 (GIL) 的 multiprocessing 模块不同,`threading` 模块在一个进程内运行,这意味着所有线程共享相同的内存空间。然而,当涉及到 CPU 密集型任务时,GIL 限制了线程的性能增益,因为一次只能有一个线程执行 Python 字节码。尽管如此,线程在许多场景中仍然是实现并发的有用工具。

自 Python 3.13 起,自由线程 构建可以禁用 GIL,从而实现真正的线程并行执行,但此功能默认不可用(请参阅 PEP 703)。

参考

该模块定义了以下函数:

threading.active_count()

返回当前存活的 Thread 对象的数量。返回的数量等于 enumerate() 返回的列表的长度。

函数 activeCount 是此函数的已弃用别名。

threading.current_thread()

返回当前 Thread 对象,对应于调用者的控制线程。如果调用者的控制线程不是通过 threading 模块创建的,则返回一个功能有限的虚拟线程对象。

函数 currentThread 是此函数的已弃用别名。

threading.excepthook(args, /)

处理由 Thread.run() 引发的未捕获异常。

args 参数具有以下属性

  • exc_type: 异常类型。

  • exc_value: 异常值,可以为 None

  • exc_traceback: 异常回溯,可以为 None

  • thread: 引发异常的线程,可以为 None

如果 exc_typeSystemExit,则异常会被静默忽略。否则,异常会打印到 sys.stderr

如果此函数引发异常,则会调用 sys.excepthook() 来处理它。

threading.excepthook() 可以被重写以控制如何处理由 Thread.run() 引发的未捕获异常。

使用自定义钩子存储 exc_value 可能会创建引用循环。当不再需要异常时,应明确清除它以打破引用循环。

使用自定义钩子存储 thread 可能会在对象正在终结时使其复活。为避免对象复活,请在自定义钩子完成后避免存储 thread

参见

sys.excepthook() 处理未捕获的异常。

在 3.8 版本加入。

threading.__excepthook__

保存 threading.excepthook() 的原始值。它被保存下来,以便在原始值可能被损坏或替代对象替换时可以恢复。

在 3.10 版本加入。

threading.get_ident()

返回当前线程的“线程标识符”。这是一个非零整数。其值没有直接意义;它旨在用作一个神奇的“cookie”,例如用于索引线程特定数据的字典。当一个线程退出并创建另一个线程时,线程标识符可能会被回收。

在 3.3 版本加入。

threading.get_native_id()

返回内核分配的当前线程的本地整数线程 ID。这是一个非负整数。其值可用于在系统范围内唯一标识此特定线程(直到线程终止,之后该值可能被操作系统回收)。

可用性: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD, GNU/kFreeBSD。

在 3.8 版本加入。

版本 3.13 中的变化: 添加了对 GNU/kFreeBSD 的支持。

threading.enumerate()

返回当前所有活跃的 Thread 对象的列表。该列表包括守护线程和由 current_thread() 创建的虚拟线程对象。它不包括已终止的线程和尚未启动的线程。然而,主线程总是结果的一部分,即使它已终止。

threading.main_thread()

返回主 Thread 对象。在正常情况下,主线程是 Python 解释器启动的线程。

在 3.4 版本加入。

threading.settrace(func)

为从 threading 模块启动的所有线程设置跟踪函数。在每个线程的 run() 方法被调用之前,func 将被传递给每个线程的 sys.settrace()

threading.settrace_all_threads(func)

为从 threading 模块启动的所有线程以及所有当前正在执行的 Python 线程设置跟踪函数。

在每个线程的 run() 方法被调用之前,func 将被传递给每个线程的 sys.settrace()

3.12 新版功能.

threading.gettrace()

获取由 settrace() 设置的跟踪函数。

在 3.10 版本加入。

threading.setprofile(func)

为从 threading 模块启动的所有线程设置配置文件函数。在每个线程的 run() 方法被调用之前,func 将被传递给每个线程的 sys.setprofile()

threading.setprofile_all_threads(func)

为从 threading 模块启动的所有线程以及所有当前正在执行的 Python 线程设置配置文件函数。

在每个线程的 run() 方法被调用之前,func 将被传递给每个线程的 sys.setprofile()

3.12 新版功能.

threading.getprofile()

获取由 setprofile() 设置的分析器函数。

在 3.10 版本加入。

threading.stack_size([size])

返回创建新线程时使用的线程栈大小。可选的 size 参数指定后续创建的线程将使用的栈大小,必须为 0 (使用平台或配置的默认值) 或至少 32,768 (32 KiB) 的正整数值。如果未指定 size,则使用 0。如果不支持更改线程栈大小,则会引发 RuntimeError。如果指定的栈大小无效,则会引发 ValueError 且栈大小不会修改。32 KiB 是当前支持的最小栈大小值,以保证解释器本身有足够的栈空间。请注意,某些平台可能对栈大小值有特定限制,例如要求最小栈大小 > 32 KiB 或要求以系统内存页大小的倍数进行分配 - 有关更多信息应查阅平台文档(4 KiB 页很常见;在没有更具体信息的情况下,建议使用 4096 的倍数作为栈大小)。

可用性: Windows, pthreads。

支持 POSIX 线程的 Unix 平台。

此模块还定义了以下常量

threading.TIMEOUT_MAX

阻塞函数(Lock.acquire()RLock.acquire()Condition.wait() 等)的 timeout 参数允许的最大值。指定大于此值的超时将引发 OverflowError

在 3.2 版本加入。

此模块定义了许多类,这些类在以下部分中详细介绍。

此模块的设计大致基于 Java 的线程模型。然而,Java 将锁和条件变量作为每个对象的基本行为,而在 Python 中它们是单独的对象。Python 的 Thread 类支持 Java 的 Thread 类行为的一个子集;目前,没有优先级,没有线程组,并且线程不能被销毁、停止、暂停、恢复或中断。Java 的 Thread 类的静态方法在实现时被映射到模块级函数。

下面描述的所有方法都是原子地执行的。

线程局部数据

线程局部数据是其值特定于线程的数据。如果您有希望对线程局部的数据,请创建一个 local 对象并使用其属性

>>> mydata = local()
>>> mydata.number = 42
>>> mydata.number
42

您还可以访问 local 对象的字典

>>> mydata.__dict__
{'number': 42}
>>> mydata.__dict__.setdefault('widgets', [])
[]
>>> mydata.widgets
[]

如果我们在不同的线程中访问数据

>>> log = []
>>> def f():
...     items = sorted(mydata.__dict__.items())
...     log.append(items)
...     mydata.number = 11
...     log.append(mydata.number)

>>> import threading
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()
>>> log
[[], 11]

我们得到不同的数据。此外,在其他线程中进行的更改不会影响此线程中看到的数据

>>> mydata.number
42

当然,您从 local 对象获得的值,包括它们的 __dict__ 属性,都适用于读取该属性时当前所在的线程。因此,您通常不希望在线程之间保存这些值,因为它们仅适用于它们所来自的线程。

您可以通过子类化 local 类来创建自定义 local 对象

>>> class MyLocal(local):
...     number = 2
...     def __init__(self, /, **kw):
...         self.__dict__.update(kw)
...     def squared(self):
...         return self.number ** 2

这对于支持默认值、方法和初始化很有用。请注意,如果您定义了一个 __init__() 方法,则每次在单独的线程中使用 local 对象时都会调用它。这是初始化每个线程字典所必需的。

现在如果我们创建一个 local 对象

>>> mydata = MyLocal(color='red')

我们有一个默认数字

>>> mydata.number
2

一个初始颜色

>>> mydata.color
'red'
>>> del mydata.color

以及一个对数据进行操作的方法

>>> mydata.squared()
4

像以前一样,我们可以在单独的线程中访问数据

>>> log = []
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()
>>> log
[[('color', 'red')], 11]

而不会影响此线程的数据

>>> mydata.number
2
>>> mydata.color
Traceback (most recent call last):
...
AttributeError: 'MyLocal' object has no attribute 'color'

请注意,子类可以定义 __slots__,但它们不是线程局部的。它们在线程之间共享

>>> class MyLocal(local):
...     __slots__ = 'number'

>>> mydata = MyLocal()
>>> mydata.number = 42
>>> mydata.color = 'red'

所以,单独的线程

>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()

影响我们所看到的

>>> mydata.number
11
class threading.local

一个表示线程局部数据的类。

线程对象

Thread 类表示在单独控制线程中运行的活动。有两种方法可以指定活动:通过将可调用对象传递给构造函数,或通过在子类中重写 run() 方法。在子类中不应重写其他方法(构造函数除外)。换句话说,*仅*重写此类的 __init__()run() 方法。

一旦创建了线程对象,就必须通过调用线程的 start() 方法来启动其活动。这会在单独的控制线程中调用 run() 方法。

一旦线程的活动开始,线程就被认为是“活跃的”。当其 run() 方法终止时(无论是正常终止还是通过引发未处理的异常),它就不再活跃。is_alive() 方法测试线程是否活跃。

其他线程可以调用线程的 join() 方法。这会阻塞调用线程,直到调用了其 join() 方法的线程终止。

线程有一个名称。该名称可以传递给构造函数,并通过 name 属性读取或更改。

如果 run() 方法引发异常,则会调用 threading.excepthook() 来处理它。默认情况下,threading.excepthook() 会静默忽略 SystemExit

线程可以标记为“守护线程”。此标志的意义在于,当只剩下守护线程时,整个 Python 程序都会退出。初始值从创建线程继承。该标志可以通过 daemon 属性或 daemon 构造函数参数设置。

备注

守护线程在关闭时会突然停止。它们的资源(如打开的文件、数据库事务等)可能无法正确释放。如果您希望线程优雅地停止,请将它们设置为非守护线程,并使用合适的信号机制,例如 Event

存在一个“主线程”对象;它对应于 Python 程序中初始的控制线程。它不是守护线程。

有可能创建“虚拟线程对象”。这些是与“外部线程”对应的线程对象,外部线程是在 `threading` 模块之外启动的控制线程,例如直接从 C 代码启动。虚拟线程对象功能有限;它们总是被认为是活跃的和守护的,并且不能被 连接。它们永远不会被删除,因为无法检测外部线程的终止。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None, context=None)

此构造函数应始终使用关键字参数调用。参数为

group 应为 None;保留用于将来实现 ThreadGroup 类时的扩展。

target 是由 run() 方法调用的可调用对象。默认为 None,表示不调用任何内容。

name 是线程名称。默认情况下,会构造一个唯一的名称,形式为“Thread-N”,其中 N 是一个小的十进制数字,或者如果指定了 target 参数,则为“Thread-N (target)”,其中“target”是 target.__name__

args 是用于目标调用的参数列表或元组。默认为 ()

kwargs 是用于目标调用的关键字参数字典。默认为 {}

如果不是 Nonedaemon 明确设置线程是否为守护线程。如果为 None(默认值),则守护属性从当前线程继承。

context 是启动线程时使用的 Context 值。默认值为 None,表示 sys.flags.thread_inherit_context 标志控制行为。如果该标志为真,线程将以 start() 调用者的上下文副本启动。如果为假,它们将以空上下文启动。要明确地以空上下文启动,请传递 Context() 的新实例。要明确地以当前上下文的副本启动,请传递 copy_context() 的值。在自由线程构建中,该标志默认为真,否则为假。

如果子类重写了构造函数,它必须确保在对线程执行任何其他操作之前调用基类构造函数(Thread.__init__())。

版本 3.3 中的变化: 添加了 daemon 参数。

版本 3.10 中的变化: 如果省略 name 参数,则使用 target 名称。

版本 3.14 中的变化: 添加了 context 参数。

start()

启动线程的活动。

它必须每个线程对象最多调用一次。它安排在单独的控制线程中调用对象的 run() 方法。

如果对同一个线程对象多次调用此方法,将引发 RuntimeError

如果支持,将操作系统线程名称设置为 threading.Thread.name。该名称可能会根据操作系统线程名称限制而被截断。

版本 3.14 中的变化: 设置操作系统线程名称。

run()

表示线程活动的方法。

您可以在子类中重写此方法。标准的 run() 方法调用传递给对象构造函数作为 target 参数的可调用对象(如果有),并分别使用 argskwargs 参数中的位置参数和关键字参数。

使用列表或元组作为传递给 Threadargs 参数可以达到相同的效果。

示例

>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
join(timeout=None)

等待直到线程终止。这将阻塞调用线程,直到调用了其 join() 方法的线程终止(无论是正常终止还是通过未处理的异常),或者直到可选的超时发生。

timeout 参数存在且不为 None 时,它应该是一个浮点数,指定操作的超时时间(以秒或其分数计)。由于 join() 总是返回 None,因此您必须在 join() 之后调用 is_alive() 来判断是否发生了超时——如果线程仍然活跃,则 join() 调用超时。

timeout 参数不存在或为 None 时,操作将阻塞直到线程终止。

一个线程可以被多次 присоединить。

join() 如果尝试连接当前线程,则会引发 RuntimeError,因为那会导致死锁。在线程启动之前连接线程也是一个错误,尝试这样做会引发相同的异常。

如果在 Python 终结化的后期阶段尝试连接正在运行的守护线程,join() 将引发 PythonFinalizationError

版本 3.14 中的变化: 可能会引发 PythonFinalizationError

name

一个仅用于标识目的的字符串。它没有语义。多个线程可以具有相同的名称。初始名称由构造函数设置。

在某些平台上,线程启动时会在操作系统级别设置线程名称,使其在任务管理器中可见。此名称可能会被截断以适应系统特定的限制(例如,Linux 上为 15 字节,macOS 上为 63 字节)。

name 的更改仅在当前运行的线程被重命名时才反映在操作系统级别。(设置不同线程的 name 属性仅更新 Python Thread 对象。)

getName()
setName()

name 的已弃用 getter/setter API;请直接将其用作属性。

3.10 版本后已弃用。

ident

此线程的“线程标识符”,如果线程尚未启动,则为 None。这是一个非零整数。请参阅 get_ident() 函数。当一个线程退出并创建另一个线程时,线程标识符可能会被回收。即使在线程退出后,标识符仍然可用。

native_id

此线程的线程 ID (TID),由操作系统(内核)分配。这是一个非负整数,如果线程尚未启动,则为 None。请参阅 get_native_id() 函数。此值可用于在系统范围内唯一标识此特定线程(直到线程终止,之后该值可能被操作系统回收)。

备注

与进程 ID 类似,线程 ID 仅在线程创建到线程终止期间有效(保证系统范围唯一)。

可用性: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD。

在 3.8 版本加入。

is_alive()

返回线程是否活跃。

此方法在 run() 方法开始之前到 run() 方法终止之后返回 True。模块函数 enumerate() 返回所有活跃线程的列表。

daemon

一个布尔值,指示此线程是否为守护线程 (True) 或不是 (False)。这必须在调用 start() 之前设置,否则会引发 RuntimeError。其初始值从创建线程继承;主线程不是守护线程,因此在主线程中创建的所有线程默认 daemon = False

当没有存活的非守护线程时,整个 Python 程序退出。

isDaemon()
setDaemon()

daemon 的已弃用 getter/setter API;请直接将其用作属性。

3.10 版本后已弃用。

锁对象

一个原始锁是一种同步原语,当它被锁定时,它不被特定的线程拥有。在 Python 中,它目前是可用的最低级别的同步原语,由 _thread 扩展模块直接实现。

一个原始锁有两种状态:“锁定”或“未锁定”。它在未锁定状态下创建。它有两个基本方法:acquire()release()。当状态为未锁定时,acquire() 会将状态更改为锁定并立即返回。当状态为锁定时,acquire() 会阻塞,直到另一个线程调用 release() 将其更改为未锁定,然后 acquire() 调用将其重置为锁定并返回。release() 方法应仅在锁定状态下调用;它将状态更改为未锁定并立即返回。如果尝试释放未锁定的锁,将引发 RuntimeError

锁还支持上下文管理协议

当多个线程在 acquire() 中阻塞,等待状态变为未锁定时,当 release() 调用将状态重置为未锁定时,只有一个线程会继续;具体是哪个等待线程会继续,这是未定义的,并且可能因实现而异。

所有方法都原子执行。

class threading.Lock

实现原始锁对象的类。一旦一个线程获取了锁,后续尝试获取它都会阻塞,直到它被释放;任何线程都可以释放它。

版本 3.13 中的变化: Lock 现在是一个类。在早期的 Python 中,Lock 是一个工厂函数,返回底层私有锁类型的一个实例。

acquire(blocking=True, timeout=-1)

获取一个锁,阻塞或非阻塞。

当以 blocking 参数设置为 True(默认值)调用时,阻塞直到锁被解锁,然后将其设置为锁定并返回 True

当以 blocking 参数设置为 False 调用时,不阻塞。如果以 blocking 设置为 True 的调用会阻塞,则立即返回 False;否则,将锁设置为锁定并返回 True

当以浮点数 timeout 参数设置为正值调用时,最多阻塞 timeout 指定的秒数(或其分数),只要锁无法获取。timeout 参数为 -1 表示无限等待。当 blockingFalse 时,禁止指定 timeout

如果锁成功获取,则返回值为 True,否则返回 False(例如,如果 timeout 过期)。

版本 3.2 中的变化: timeout 参数是新的。

版本 3.2 中的变化: 如果底层线程实现支持,锁获取现在可以被 POSIX 上的信号中断。

版本 3.14 中的变化: 锁获取现在可以被 Windows 上的信号中断。

release()

释放一个锁。这可以从任何线程调用,而不仅仅是获取锁的线程。

当锁被锁定时,将其重置为未锁定,并返回。如果有其他线程因等待锁变为未锁定而被阻塞,则允许其中恰好一个线程继续执行。

当在未锁定的锁上调用时,会引发 RuntimeError

没有返回值。

locked()

如果锁已获取,则返回 True

RLock 对象

可重入锁是一种同步原语,可以被同一线程多次获取。在内部,它除了使用原始锁的锁定/未锁定状态外,还使用“拥有线程”和“递归级别”的概念。在锁定状态下,某个线程拥有该锁;在未锁定状态下,没有线程拥有它。

线程调用锁的 acquire() 方法来锁定它,调用其 release() 方法来解锁它。

备注

可重入锁支持上下文管理协议,因此建议使用 with 而不是手动调用 acquire()release() 来处理代码块的锁获取和释放。

RLock 的 acquire()/release() 调用对可以嵌套,这与 Lock 的 acquire()/release() 不同。只有最后一个 release()(最外层对的 release())才会将锁重置为未锁定状态,并允许另一个在 acquire() 中阻塞的线程继续执行。

acquire()/release() 必须成对使用:每个 acquire 必须在获取锁的线程中有一个 release。如果未能按照获取锁的次数调用 release,可能会导致死锁。

class threading.RLock

此类实现可重入锁对象。可重入锁必须由获取它的线程释放。一旦一个线程获取了可重入锁,同一个线程可以再次获取它而不会阻塞;线程必须每次获取后释放一次。

请注意,RLock 实际上是一个工厂函数,它返回平台支持的最有效版本的具体 RLock 类实例。

acquire(blocking=True, timeout=-1)

获取一个锁,阻塞或非阻塞。

参见

将 RLock 用作上下文管理器

在实际情况下,建议使用此方法,而不是手动调用 acquire()release()

当以 blocking 参数设置为 True(默认值)调用时

  • 如果没有线程拥有锁,则获取锁并立即返回。

  • 如果另一个线程拥有锁,则阻塞直到我们能够获取锁,或者如果 timeout 设置为正浮点值,则阻塞直到超时。

  • 如果同一个线程拥有锁,则再次获取锁,并立即返回。这是 LockRLock 之间的区别;Lock 处理此情况与前一种情况相同,阻塞直到可以获取锁。

当以 blocking 参数设置为 False 调用时

  • 如果没有线程拥有锁,则获取锁并立即返回。

  • 如果另一个线程拥有锁,则立即返回。

  • 如果同一个线程拥有锁,则再次获取锁并立即返回。

在所有情况下,如果线程能够获取锁,则返回 True。如果线程无法获取锁(即如果未阻塞或达到超时),则返回 False

如果多次调用,未能多次调用 release() 可能会导致死锁。考虑将 RLock 用作上下文管理器,而不是直接调用 acquire/release。

版本 3.2 中的变化: timeout 参数是新的。

release()

释放锁,递减递归级别。如果递减后递归级别为零,则将锁重置为未锁定(不被任何线程拥有),并且如果有其他线程在等待锁变为未锁定而被阻塞,则允许其中恰好一个线程继续执行。如果递减后递归级别仍不为零,则锁保持锁定状态并由调用线程拥有。

仅当调用线程拥有锁时才调用此方法。如果在未获取锁时调用此方法,则会引发 RuntimeError

没有返回值。

locked()

返回一个布尔值,指示此对象当前是否被锁定。

在 3.14 版本加入。

条件对象

条件变量总是与某种锁相关联;这可以传入,也可以默认创建一个。当多个条件变量必须共享同一个锁时,传入一个锁很有用。锁是条件对象的一部分:您不必单独跟踪它。

条件变量遵循上下文管理协议:使用 with 语句会在封闭块的持续时间内获取关联的锁。acquire()release() 方法也调用关联锁的相应方法。

必须在持有相关联的锁时调用其他方法。wait() 方法释放锁,然后阻塞,直到另一个线程通过调用 notify()notify_all() 唤醒它。一旦被唤醒,wait() 重新获取锁并返回。也可以指定超时。

notify() 方法唤醒一个正在等待条件变量的线程(如果存在)。notify_all() 方法唤醒所有正在等待条件变量的线程。

注意:notify()notify_all() 方法不释放锁;这意味着被唤醒的线程不会立即从它们的 wait() 调用返回,而是在调用 notify()notify_all() 的线程最终放弃锁的所有权时才会返回。

使用条件变量的典型编程风格使用锁来同步对某些共享状态的访问;对特定状态变化感兴趣的线程反复调用 wait() 直到它们看到所需状态,而修改状态的线程在以可能成为某个等待者的所需状态的方式更改状态时调用 notify()notify_all()。例如,以下代码是具有无限缓冲区容量的通用生产者-消费者情况

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

检查应用程序条件的 while 循环是必要的,因为 wait() 可能会在任意长时间后返回,并且促使 notify() 调用的条件可能不再成立。这是多线程编程固有的。可以使用 wait_for() 方法来自动化条件检查,并简化超时计算

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

要选择 notify()notify_all(),请考虑一个状态变化是否只对一个或几个等待线程感兴趣。例如,在典型的生产者-消费者场景中,向缓冲区添加一个项目只需要唤醒一个消费者线程。

class threading.Condition(lock=None)

此类实现条件变量对象。条件变量允许一个或多个线程等待,直到它们被另一个线程通知。

如果给定了 lock 参数且不为 None,则它必须是一个 LockRLock 对象,并将其用作底层锁。否则,将创建一个新的 RLock 对象并将其用作底层锁。

版本 3.3 中的变化: 从工厂函数更改为类。

acquire(*args)

获取底层锁。此方法调用底层锁上的相应方法;返回值是该方法返回的任何内容。

release()

释放底层锁。此方法调用底层锁上的相应方法;没有返回值。

locked()

返回一个布尔值,指示此对象当前是否被锁定。

在 3.14 版本加入。

wait(timeout=None)

等待直到被通知或直到发生超时。如果调用此方法时调用线程尚未获取锁,则会引发 RuntimeError

此方法释放底层锁,然后阻塞,直到它被另一个线程中对同一条件变量的 notify()notify_all() 调用唤醒,或者直到可选的超时发生。一旦被唤醒或超时,它会重新获取锁并返回。

timeout 参数存在且不为 None 时,它应该是一个浮点数,指定操作的超时时间(以秒或其分数计)。

当底层锁是 RLock 时,它不会通过其 release() 方法释放,因为当它被递归多次获取时,这可能实际上不会解锁。相反,使用的是 RLock 类的一个内部接口,该接口即使在它被递归获取多次后也能真正解锁。然后使用另一个内部接口在重新获取锁时恢复递归级别。

返回值是 True,除非给定的 timeout 超时,在这种情况下,返回值是 False

3.2 版本中已更改: 以前,此方法总是返回 None

wait_for(predicate, timeout=None)

等待直到条件评估为真。 predicate 应该是一个可调用对象,其结果将被解释为布尔值。可以提供一个 timeout 值来指定最大等待时间。

此工具方法可以重复调用 wait() 直到谓词满足,或者直到超时发生。返回值是谓词的最后一个返回值,如果方法超时,则将评估为 False

忽略超时特性,调用此方法大致等同于编写

while not predicate():
    cv.wait()

因此,与 wait() 相同规则适用:调用时必须持有锁,并在返回时重新获取锁。在持有锁的情况下评估谓词。

在 3.2 版本加入。

notify(n=1)

默认情况下,唤醒等待此条件的一个线程(如果有)。如果调用线程在此方法被调用时未获取锁,则会引发 RuntimeError

此方法最多唤醒 n 个等待条件变量的线程;如果没有线程等待,则此方法不做任何操作。

当前实现会精确唤醒 n 个线程,如果至少有 n 个线程在等待。但是,不应依赖此行为。未来经过优化的实现可能会偶尔唤醒超过 n 个线程。

注意:被唤醒的线程实际上不会从其 wait() 调用返回,直到它能够重新获取锁。由于 notify() 不会释放锁,因此其调用者应该释放锁。

notify_all()

唤醒所有等待此条件的线程。此方法的行为类似于 notify(),但会唤醒所有等待线程而不是一个。如果调用线程在此方法被调用时未获取锁,则会引发 RuntimeError

方法 notifyAll 是此方法的已弃用别名。

信号量对象

这是计算机科学史上最古老的同步原语之一,由早期的荷兰计算机科学家 Edsger W. Dijkstra 发明(他使用名称 P()V() 而不是 acquire()release())。

信号量管理一个内部计数器,每次调用 acquire() 都会使计数器减一,每次调用 release() 都会使计数器增一。计数器永远不会低于零;当 acquire() 发现计数器为零时,它会阻塞,等待直到其他线程调用 release()

信号量也支持 上下文管理协议

class threading.Semaphore(value=1)

此类的实现是一个信号量对象。信号量管理一个原子计数器,该计数器表示 release() 调用次数减去 acquire() 调用次数,再加上一个初始值。如有必要,acquire() 方法会阻塞,直到它能够在不使计数器变为负数的情况下返回。如果未给出,则 value 默认为 1。

可选参数为内部计数器提供初始 value;它默认为 1。如果给定的 value 小于 0,则会引发 ValueError

版本 3.3 中的变化: 从工厂函数更改为类。

acquire(blocking=True, timeout=None)

获取一个信号量。

在不带参数调用时

  • 如果进入时内部计数器大于零,则将其减一并立即返回 True

  • 如果进入时内部计数器为零,则阻塞直到被 release() 调用唤醒。一旦被唤醒(并且计数器大于0),则将计数器减1并返回 True。每次调用 release() 都将精确唤醒一个线程。唤醒线程的顺序不应被依赖。

当调用时 blocking 设置为 False,不阻塞。如果无参数调用会阻塞,则立即返回 False;否则,执行与无参数调用相同的操作,并返回 True

当调用时 timeout 不为 None 时,它将阻塞最多 timeout 秒。如果在此间隔内 acquire 未成功完成,则返回 False。否则返回 True

版本 3.2 中的变化: timeout 参数是新的。

release(n=1)

释放一个信号量,将内部计数器增加 n。如果进入时计数器为零,并且有其他线程正在等待它再次大于零,则唤醒其中 n 个线程。

3.9 版本中已更改: 添加了 n 参数以同时释放多个等待线程。

class threading.BoundedSemaphore(value=1)

实现有界信号量对象的类。有界信号量会检查其当前值是否超过其初始值。如果超过,则会引发 ValueError。在大多数情况下,信号量用于保护容量有限的资源。如果信号量被释放的次数过多,则表明存在错误。如果未给出,则 value 默认为 1。

版本 3.3 中的变化: 从工厂函数更改为类。

Semaphore 示例

信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定H的任何情况下,都应使用有界信号量。在生成任何工作线程之前,您的主线程将初始化信号量

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

一旦生成,工作线程在需要连接到服务器时会调用信号量的 acquire 和 release 方法

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

有界信号量的使用减少了因程序错误导致信号量释放次数多于获取次数而未被检测到的可能性。

事件对象

这是线程间通信最简单的机制之一:一个线程发出事件信号,其他线程等待该事件。

事件对象管理一个内部标志,可以使用 set() 方法将其设置为 True,并使用 clear() 方法将其重置为 False。 wait() 方法会阻塞直到该标志为 True。

class threading.Event

实现事件对象的类。事件管理一个标志,该标志可以使用 set() 方法设置为真,并使用 clear() 方法重置为假。 wait() 方法会阻塞直到该标志为真。该标志最初为假。

版本 3.3 中的变化: 从工厂函数更改为类。

is_set()

当且仅当内部标志为真时返回 True

方法 isSet 是此方法的已弃用别名。

set()

将内部标志设置为 true。所有等待它变为 true 的线程都被唤醒。一旦标志为 true,调用 wait() 的线程将根本不会阻塞。

clear()

将内部标志重置为 false。之后,调用 wait() 的线程将阻塞,直到调用 set() 将内部标志再次设置为 true。

wait(timeout=None)

只要内部标志为假,且超时(如果给定)未过期,就阻塞。返回值表示此阻塞方法返回的原因;如果因内部标志设置为真而返回,则为 True,或者如果给定超时且内部标志在给定等待时间内未变为真,则为 False

当存在 timeout 参数且不为 None 时,它应该是一个浮点数,指定操作的超时时间(以秒为单位或其分数)。

3.1 版本中已更改: 以前,此方法总是返回 None

计时器对象

此类表示一个计时器,即只有在经过一定时间后才应运行的操作。 TimerThread 的子类,因此也可以作为创建自定义线程的示例。

计时器像线程一样,通过调用它们的 Timer.start 方法启动。可以通过调用 cancel() 方法来停止计时器(在其操作开始之前)。计时器在执行其操作之前等待的间隔可能与用户指定的间隔不完全相同。

例如:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)

创建一个计时器,它将在 interval 秒过后,使用参数 args 和关键字参数 kwargs 来运行 function。如果 argsNone(默认值),则将使用空列表。如果 kwargsNone(默认值),则将使用空字典。

版本 3.3 中的变化: 从工厂函数更改为类。

cancel()

停止计时器,并取消计时器操作的执行。这仅在计时器仍在等待阶段时有效。

屏障对象

在 3.2 版本加入。

此类提供了一个简单的同步原语,供固定数量的线程使用,这些线程需要彼此等待。每个线程都通过调用 wait() 方法来尝试通过屏障,并将阻塞直到所有线程都调用了 wait()。此时,这些线程同时释放。

该屏障可以为相同数量的线程重复使用任意次数。

例如,这是一种同步客户端和服务器线程的简单方法

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

parties 个线程创建一个屏障对象。如果提供了 action,则它是一个可调用对象,由其中一个线程在它们被释放时调用。 timeoutwait() 方法未指定时的默认超时值。

wait(timeout=None)

通过屏障。当所有参与屏障的线程都调用此函数时,它们将同时释放。如果提供了 timeout,则优先使用它而不是提供给类构造函数的任何超时。

返回值是一个 0 到 parties – 1 范围内的整数,每个线程不同。这可用于选择一个线程来做一些特殊的内务处理,例如

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

如果构造函数中提供了 action,其中一个线程会在被释放之前调用它。如果此调用引发错误,则屏障将进入 broken 状态。

如果调用超时,屏障将进入 broken 状态。

如果屏障在线程等待时被重置或损坏,此方法可能会引发 BrokenBarrierError 异常。

reset()

将屏障恢复到默认的空状态。任何等待它的线程都将收到 BrokenBarrierError 异常。

请注意,如果存在状态未知的其他线程,使用此函数可能需要一些外部同步。如果屏障损坏,最好直接放弃并创建一个新的。

abort()

将屏障置于 broken 状态。这将导致任何正在进行的或未来的 wait() 调用因 BrokenBarrierError 而失败。例如,如果其中一个线程需要中止,则使用此方法以避免应用程序死锁。

最好直接为屏障创建一个合理的 timeout 值,以自动防止其中一个线程出错。

parties

通过屏障所需的线程数。

n_waiting

当前在屏障中等待的线程数。

broken

一个布尔值,如果屏障处于 broken 状态,则为 True

exception threading.BrokenBarrierError

Barrier 对象被重置或损坏时,会引发此异常,它是 RuntimeError 的子类。

with 语句中使用锁、条件和信号量

此模块提供的所有具有 acquirerelease 方法的对象都可以用作 with 语句的上下文管理器。 acquire 方法将在进入块时调用, release 将在退出块时调用。因此,以下代码片段

with some_lock:
    # do something...

等价于

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

目前,LockRLockConditionSemaphoreBoundedSemaphore 对象可以用作 with 语句上下文管理器。