concurrent.futures
— 启动并行任务¶
3.2 版本新增。
源代码: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
concurrent.futures
模块为异步执行可调用对象提供了高级接口。
异步执行可以使用线程(使用 ThreadPoolExecutor
)或单独的进程(使用 ProcessPoolExecutor
)来执行。两者都实现了相同的接口,该接口由抽象的 Executor
类定义。
可用性:不适用于 WASI。
此模块在 WebAssembly 上不起作用或不可用。有关更多信息,请参阅 WebAssembly 平台。
执行器对象¶
- class concurrent.futures.Executor¶
一个抽象类,提供异步执行调用的方法。它不应直接使用,而是通过其具体的子类使用。
- submit(fn, /, *args, **kwargs)¶
安排可调用对象 fn 作为
fn(*args, **kwargs)
执行,并返回一个Future
对象,表示可调用对象的执行。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
类似于
map(fn, *iterables)
,区别在于iterables 会立即收集,而不是惰性地收集;
fn 异步执行,并且可以同时进行多次对 fn 的调用。
如果
__next__()
被调用且在从原始调用Executor.map()
后 timeout 秒后结果不可用,则返回的迭代器会引发TimeoutError
。timeout 可以是整数或浮点数。如果未指定 timeout 或为None
,则等待时间没有限制。如果 fn 调用引发异常,则当从迭代器检索其值时,将引发该异常。
当使用
ProcessPoolExecutor
时,此方法会将 iterables 切分成多个块,并将其作为单独的任务提交到池中。可以通过将 chunksize 设置为正整数来指定这些块的(近似)大小。对于非常长的可迭代对象,与默认大小 1 相比,使用较大的 chunksize 值可以显著提高性能。对于ThreadPoolExecutor
,chunksize 没有影响。在 3.5 版本中做了更改: 添加了 chunksize 参数。
- shutdown(wait=True, *, cancel_futures=False)¶
向执行器发出信号,表示当当前挂起的 future 执行完成后,它应释放其正在使用的任何资源。在关闭后调用
Executor.submit()
和Executor.map()
将引发RuntimeError
。如果 wait 为
True
,则此方法将不会返回,直到所有挂起的 future 执行完成并且与执行器关联的资源已被释放。如果 wait 为False
,则此方法将立即返回,并且当所有挂起的 future 执行完成时,将释放与执行器关联的资源。无论 wait 的值如何,整个 Python 程序都不会退出,直到所有挂起的 future 执行完成。如果 cancel_futures 为
True
,则此方法将取消执行器尚未开始运行的所有挂起的 future。无论 cancel_futures 的值如何,任何已完成或正在运行的 future 都不会被取消。如果 cancel_futures 和 wait 均为
True
,则执行器已开始运行的所有 future 将在此方法返回之前完成。剩余的 future 将被取消。如果您使用
with
语句,则可以避免必须显式调用此方法,该语句将关闭Executor
(等待,就好像Executor.shutdown()
的 wait 设置为True
一样)import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版本中做了更改: 添加了 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor
是一个 Executor
子类,它使用线程池来异步执行调用。
当与 Future
关联的可调用对象等待另一个 Future
的结果时,可能会发生死锁。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
并且
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一个
Executor
子类,它使用最多 max_workers 个线程的池来异步执行调用。在解释器退出之前,将加入所有排队到
ThreadPoolExecutor
的线程。请注意,执行此操作的退出处理程序在任何使用atexit
添加的退出处理程序之前执行。这意味着必须捕获并处理主线程中的异常,以便通知线程正常退出。因此,建议不要将ThreadPoolExecutor
用于长时间运行的任务。initializer 是一个可选的可调用对象,在每个工作线程启动时调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,则所有当前挂起的作业以及任何向池提交更多作业的尝试都将引发
BrokenThreadPool
。在 3.5 版本中更改: 如果 max_workers 为
None
或未给出,则它将默认为机器上的处理器数量乘以5
,假设ThreadPoolExecutor
通常用于重叠 I/O 而不是 CPU 工作,并且工作线程的数量应该高于ProcessPoolExecutor
的工作线程数量。在 3.6 版本中更改: 添加了 thread_name_prefix 参数,以允许用户控制池创建的工作线程的
threading.Thread
名称,以便更轻松地进行调试。在 3.7 版本中更改: 添加了 initializer 和 initargs 参数。
在 3.8 版本中更改: max_workers 的默认值更改为
min(32, os.cpu_count() + 4)
。此默认值保留至少 5 个用于 I/O 绑定任务的工作线程。它最多利用 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定任务。并且它避免在多核机器上隐式使用非常大的资源。ThreadPoolExecutor 现在也会重用空闲工作线程,然后再启动 max_workers 个工作线程。
在 3.13 版本中更改: max_workers 的默认值更改为
min(32, (os.process_cpu_count() or 1) + 4)
。
ThreadPoolExecutor 示例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor
类是一个 Executor
子类,它使用进程池来异步执行调用。ProcessPoolExecutor
使用 multiprocessing
模块,这允许它绕过 全局解释器锁,但也意味着只有可pickle的对象才能被执行和返回。
必须使工作子进程可导入 __main__
模块。这意味着 ProcessPoolExecutor
在交互式解释器中不起作用。
从提交给 ProcessPoolExecutor
的可调用对象调用 Executor
或 Future
方法将导致死锁。
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
一个
Executor
子类,它使用最多 max_workers 个进程的池来异步执行调用。如果 max_workers 为None
或未给出,则它将默认为os.process_cpu_count()
。如果 max_workers 小于或等于0
,则会引发ValueError
。在 Windows 上,max_workers 必须小于或等于61
。如果不是,则会引发ValueError
。如果 max_workers 为None
,则选择的默认值将最多为61
,即使有更多处理器可用。mp_context 可以是一个multiprocessing
上下文或None
。它将用于启动工作进程。如果 mp_context 为None
或未给出,则使用默认的multiprocessing
上下文。请参阅 上下文和启动方法。initializer 是一个可选的可调用对象,在每个工作进程启动时调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,则所有当前挂起的作业以及任何向池提交更多作业的尝试都将引发
BrokenProcessPool
。max_tasks_per_child 是一个可选参数,用于指定单个进程在退出并被新的工作进程替换之前可以执行的最大任务数。默认情况下,max_tasks_per_child 为
None
,这意味着工作进程将与池的生命周期一样长。当指定最大值时,在缺少 mp_context 参数的情况下,默认将使用“spawn”多进程启动方法。此功能与“fork”启动方法不兼容。在 3.3 版本中更改: 当其中一个工作进程突然终止时,现在会引发
BrokenProcessPool
错误。以前,行为未定义,但对执行器或其 future 的操作通常会冻结或死锁。在 3.7 版本中更改: 添加了 mp_context 参数,以允许用户控制池创建的工作进程的 start_method。
添加了 initializer 和 initargs 参数。
注意
默认的
multiprocessing
启动方法(请参阅 上下文和启动方法)将在 Python 3.14 中更改为不使用 fork。 需要使用 fork 来执行其ProcessPoolExecutor
的代码应通过传递mp_context=multiprocessing.get_context("fork")
参数来显式指定。在 3.11 版本中更改: 添加了 max_tasks_per_child 参数,以允许用户控制池中工作进程的生命周期。
3.12 版本更改: 在 POSIX 系统上,如果您的应用程序有多个线程,并且
multiprocessing
上下文使用"fork"
启动方法:内部调用来生成工作进程的os.fork()
函数可能会引发DeprecationWarning
。传递配置为使用不同启动方法的 *mp_context*。有关更多说明,请参阅os.fork()
文档。3.13 版本更改: max_workers 默认使用
os.process_cpu_count()
,而不是os.cpu_count()
。
ProcessPoolExecutor 示例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 对象¶
Future
类封装了可调用对象的异步执行。Future
实例由 Executor.submit()
创建。
- class concurrent.futures.Future¶
封装了可调用对象的异步执行。
Future
实例由Executor.submit()
创建,除非用于测试,否则不应直接创建。- cancel()¶
尝试取消调用。如果当前正在执行调用或已完成运行且无法取消,则该方法将返回
False
,否则将取消调用,并且该方法将返回True
。
- cancelled()¶
如果成功取消调用,则返回
True
。
- running()¶
如果当前正在执行调用且无法取消,则返回
True
。
- done()¶
如果成功取消调用或已完成运行,则返回
True
。
- result(timeout=None)¶
返回调用返回的值。 如果调用尚未完成,则此方法将等待最多 *timeout* 秒。 如果调用在 *timeout* 秒内未完成,则会引发
TimeoutError
。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或为None
,则等待时间没有限制。如果 future 在完成之前被取消,则会引发
CancelledError
。如果调用引发异常,则此方法将引发相同的异常。
- exception(timeout=None)¶
返回调用引发的异常。 如果调用尚未完成,则此方法将等待最多 *timeout* 秒。 如果调用在 *timeout* 秒内未完成,则会引发
TimeoutError
。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或为None
,则等待时间没有限制。如果 future 在完成之前被取消,则会引发
CancelledError
。如果调用完成时没有引发异常,则返回
None
。
- add_done_callback(fn)¶
将可调用对象 *fn* 附加到 future。当 future 被取消或完成运行时,将调用 *fn*,并将 future 作为其唯一参数。
添加的可调用对象按添加顺序调用,并且始终在属于添加它们的进程的线程中调用。 如果可调用对象引发
Exception
子类,则会记录并忽略它。 如果可调用对象引发BaseException
子类,则行为未定义。如果 future 已完成或已取消,则会立即调用 *fn*。
以下
Future
方法旨在用于单元测试和Executor
实现。- set_running_or_notify_cancel()¶
仅应由
Executor
实现,在执行与Future
关联的工作之前以及单元测试调用此方法。如果该方法返回
False
,则Future
已被取消,即调用了Future.cancel()
并返回了True
。任何等待Future
完成(即通过as_completed()
或wait()
)的线程都将被唤醒。如果该方法返回
True
,则Future
未被取消,并且已设置为运行状态,即调用Future.running()
将返回True
。此方法只能调用一次,并且不能在调用
Future.set_result()
或Future.set_exception()
之后调用。
- set_result(result)¶
将与
Future
关联的工作的结果设置为 *result*。此方法仅应由
Executor
实现和单元测试使用。3.8 版本更改: 如果
Future
已完成,则此方法引发concurrent.futures.InvalidStateError
。
模块函数¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待由 fs 给出的
Future
实例(可能是由不同的Executor
实例创建)完成。传递给 fs 的重复 futures 将被删除,并且只会返回一次。返回一个包含两个集合的命名元组。第一个集合名为done
,包含在等待完成之前完成(已完成或已取消的 futures)的 futures。第二个集合名为not_done
,包含未完成的 futures(挂起或正在运行的 futures)。timeout 可用于控制返回前等待的最大秒数。timeout 可以是 int 或 float。如果未指定 timeout 或为
None
,则等待时间没有限制。return_when 指示此函数应何时返回。它必须是以下常量之一
常量
描述
- concurrent.futures.FIRST_COMPLETED¶
当任何 future 完成或被取消时,该函数将返回。
- concurrent.futures.FIRST_EXCEPTION¶
当任何 future 通过引发异常完成时,该函数将返回。如果没有 future 引发异常,则它等效于
ALL_COMPLETED
。- concurrent.futures.ALL_COMPLETED¶
当所有 futures 完成或被取消时,该函数将返回。
- concurrent.futures.as_completed(fs, timeout=None)¶
返回一个迭代器,该迭代器遍历由 fs 给出的
Future
实例(可能是由不同的Executor
实例创建),当这些 futures 完成时(已完成或已取消的 futures)产生 futures。fs 给出的任何重复的 futures 将返回一次。在调用as_completed()
之前完成的任何 futures 将首先产生。如果调用__next__()
并且从最初调用as_completed()
后经过 timeout 秒结果仍然不可用,则返回的迭代器会引发TimeoutError
。timeout 可以是 int 或 float。如果未指定 timeout 或为None
,则等待时间没有限制。
参见
- PEP 3148 – futures - 异步执行计算
该提案描述了将此功能包含在 Python 标准库中的情况。
异常类¶
- exception concurrent.futures.CancelledError¶
当 future 被取消时引发。
- exception concurrent.futures.TimeoutError¶
TimeoutError
的已弃用别名,当 future 操作超出给定的超时时间时引发。在 3.11 版本中更改:此类被设为
TimeoutError
的别名。
- exception concurrent.futures.BrokenExecutor¶
从此类派生自
RuntimeError
,当执行器因某种原因损坏,无法用于提交或执行新任务时,会引发此异常类。3.7 版本中新增。
- exception concurrent.futures.InvalidStateError¶
当在 future 上执行在当前状态下不允许的操作时引发。
3.8 版本中新增。
- exception concurrent.futures.thread.BrokenThreadPool¶
从此类派生自
BrokenExecutor
,当ThreadPoolExecutor
的一个工作线程初始化失败时,会引发此异常类。3.7 版本中新增。
- exception concurrent.futures.process.BrokenProcessPool¶
从此类派生自
BrokenExecutor
(以前是RuntimeError
),当ProcessPoolExecutor
的一个工作线程以非干净的方式终止时(例如,如果它是从外部杀死的),会引发此异常类。3.3 版本中新增。