concurrent.futures — 启动并行任务

3.2 版本新增。

源代码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块为异步执行可调用对象提供了高级接口。

异步执行可以使用线程(使用 ThreadPoolExecutor)或单独的进程(使用 ProcessPoolExecutor)来执行。两者都实现了相同的接口,该接口由抽象的 Executor 类定义。

可用性:不适用于 WASI。

此模块在 WebAssembly 上不起作用或不可用。有关更多信息,请参阅 WebAssembly 平台

执行器对象

class concurrent.futures.Executor

一个抽象类,提供异步执行调用的方法。它不应直接使用,而是通过其具体的子类使用。

submit(fn, /, *args, **kwargs)

安排可调用对象 fn 作为 fn(*args, **kwargs) 执行,并返回一个 Future 对象,表示可调用对象的执行。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

类似于 map(fn, *iterables),区别在于

  • iterables 会立即收集,而不是惰性地收集;

  • fn 异步执行,并且可以同时进行多次对 fn 的调用。

如果 __next__() 被调用且在从原始调用 Executor.map()timeout 秒后结果不可用,则返回的迭代器会引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 fn 调用引发异常,则当从迭代器检索其值时,将引发该异常。

当使用 ProcessPoolExecutor 时,此方法会将 iterables 切分成多个块,并将其作为单独的任务提交到池中。可以通过将 chunksize 设置为正整数来指定这些块的(近似)大小。对于非常长的可迭代对象,与默认大小 1 相比,使用较大的 chunksize 值可以显著提高性能。对于 ThreadPoolExecutorchunksize 没有影响。

在 3.5 版本中做了更改: 添加了 chunksize 参数。

shutdown(wait=True, *, cancel_futures=False)

向执行器发出信号,表示当当前挂起的 future 执行完成后,它应释放其正在使用的任何资源。在关闭后调用 Executor.submit()Executor.map() 将引发 RuntimeError

如果 waitTrue,则此方法将不会返回,直到所有挂起的 future 执行完成并且与执行器关联的资源已被释放。如果 waitFalse,则此方法将立即返回,并且当所有挂起的 future 执行完成时,将释放与执行器关联的资源。无论 wait 的值如何,整个 Python 程序都不会退出,直到所有挂起的 future 执行完成。

如果 cancel_futuresTrue,则此方法将取消执行器尚未开始运行的所有挂起的 future。无论 cancel_futures 的值如何,任何已完成或正在运行的 future 都不会被取消。

如果 cancel_futureswait 均为 True,则执行器已开始运行的所有 future 将在此方法返回之前完成。剩余的 future 将被取消。

如果您使用 with 语句,则可以避免必须显式调用此方法,该语句将关闭 Executor(等待,就好像 Executor.shutdown()wait 设置为 True 一样)

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版本中做了更改: 添加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一个 Executor 子类,它使用线程池来异步执行调用。

当与 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

并且

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一个 Executor 子类,它使用最多 max_workers 个线程的池来异步执行调用。

在解释器退出之前,将加入所有排队到 ThreadPoolExecutor 的线程。请注意,执行此操作的退出处理程序在任何使用 atexit 添加的退出处理程序之前执行。这意味着必须捕获并处理主线程中的异常,以便通知线程正常退出。因此,建议不要将 ThreadPoolExecutor 用于长时间运行的任务。

initializer 是一个可选的可调用对象,在每个工作线程启动时调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,则所有当前挂起的作业以及任何向池提交更多作业的尝试都将引发 BrokenThreadPool

在 3.5 版本中更改: 如果 max_workersNone 或未给出,则它将默认为机器上的处理器数量乘以 5,假设 ThreadPoolExecutor 通常用于重叠 I/O 而不是 CPU 工作,并且工作线程的数量应该高于 ProcessPoolExecutor 的工作线程数量。

在 3.6 版本中更改: 添加了 thread_name_prefix 参数,以允许用户控制池创建的工作线程的 threading.Thread 名称,以便更轻松地进行调试。

在 3.7 版本中更改: 添加了 initializerinitargs 参数。

在 3.8 版本中更改: max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值保留至少 5 个用于 I/O 绑定任务的工作线程。它最多利用 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定任务。并且它避免在多核机器上隐式使用非常大的资源。

ThreadPoolExecutor 现在也会重用空闲工作线程,然后再启动 max_workers 个工作线程。

在 3.13 版本中更改: max_workers 的默认值更改为 min(32, (os.process_cpu_count() or 1) + 4)

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池来异步执行调用。ProcessPoolExecutor 使用 multiprocessing 模块,这允许它绕过 全局解释器锁,但也意味着只有可pickle的对象才能被执行和返回。

必须使工作子进程可导入 __main__ 模块。这意味着 ProcessPoolExecutor 在交互式解释器中不起作用。

从提交给 ProcessPoolExecutor 的可调用对象调用 ExecutorFuture 方法将导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

一个 Executor 子类,它使用最多 max_workers 个进程的池来异步执行调用。如果 max_workersNone 或未给出,则它将默认为 os.process_cpu_count()。如果 max_workers 小于或等于 0,则会引发 ValueError。在 Windows 上,max_workers 必须小于或等于 61。如果不是,则会引发 ValueError。如果 max_workersNone,则选择的默认值将最多为 61,即使有更多处理器可用。mp_context 可以是一个 multiprocessing 上下文或 None。它将用于启动工作进程。如果 mp_contextNone 或未给出,则使用默认的 multiprocessing 上下文。请参阅 上下文和启动方法

initializer 是一个可选的可调用对象,在每个工作进程启动时调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,则所有当前挂起的作业以及任何向池提交更多作业的尝试都将引发 BrokenProcessPool

max_tasks_per_child 是一个可选参数,用于指定单个进程在退出并被新的工作进程替换之前可以执行的最大任务数。默认情况下,max_tasks_per_childNone,这意味着工作进程将与池的生命周期一样长。当指定最大值时,在缺少 mp_context 参数的情况下,默认将使用“spawn”多进程启动方法。此功能与“fork”启动方法不兼容。

在 3.3 版本中更改: 当其中一个工作进程突然终止时,现在会引发 BrokenProcessPool 错误。以前,行为未定义,但对执行器或其 future 的操作通常会冻结或死锁。

在 3.7 版本中更改: 添加了 mp_context 参数,以允许用户控制池创建的工作进程的 start_method。

添加了 initializerinitargs 参数。

注意

默认的 multiprocessing 启动方法(请参阅 上下文和启动方法)将在 Python 3.14 中更改为不使用 fork。 需要使用 fork 来执行其 ProcessPoolExecutor 的代码应通过传递 mp_context=multiprocessing.get_context("fork") 参数来显式指定。

在 3.11 版本中更改: 添加了 max_tasks_per_child 参数,以允许用户控制池中工作进程的生命周期。

3.12 版本更改: 在 POSIX 系统上,如果您的应用程序有多个线程,并且 multiprocessing 上下文使用 "fork" 启动方法:内部调用来生成工作进程的 os.fork() 函数可能会引发 DeprecationWarning。传递配置为使用不同启动方法的 *mp_context*。有关更多说明,请参阅 os.fork() 文档。

3.13 版本更改: max_workers 默认使用 os.process_cpu_count(),而不是 os.cpu_count()

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 对象

Future 类封装了可调用对象的异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

封装了可调用对象的异步执行。Future 实例由 Executor.submit() 创建,除非用于测试,否则不应直接创建。

cancel()

尝试取消调用。如果当前正在执行调用或已完成运行且无法取消,则该方法将返回 False,否则将取消调用,并且该方法将返回 True

cancelled()

如果成功取消调用,则返回 True

running()

如果当前正在执行调用且无法取消,则返回 True

done()

如果成功取消调用或已完成运行,则返回 True

result(timeout=None)

返回调用返回的值。 如果调用尚未完成,则此方法将等待最多 *timeout* 秒。 如果调用在 *timeout* 秒内未完成,则会引发 TimeoutError。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则会引发 CancelledError

如果调用引发异常,则此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。 如果调用尚未完成,则此方法将等待最多 *timeout* 秒。 如果调用在 *timeout* 秒内未完成,则会引发 TimeoutError。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则会引发 CancelledError

如果调用完成时没有引发异常,则返回 None

add_done_callback(fn)

将可调用对象 *fn* 附加到 future。当 future 被取消或完成运行时,将调用 *fn*,并将 future 作为其唯一参数。

添加的可调用对象按添加顺序调用,并且始终在属于添加它们的进程的线程中调用。 如果可调用对象引发 Exception 子类,则会记录并忽略它。 如果可调用对象引发 BaseException 子类,则行为未定义。

如果 future 已完成或已取消,则会立即调用 *fn*。

以下 Future 方法旨在用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

仅应由 Executor 实现,在执行与 Future 关联的工作之前以及单元测试调用此方法。

如果该方法返回 False,则 Future 已被取消,即调用了 Future.cancel() 并返回了 True。任何等待 Future 完成(即通过 as_completed()wait())的线程都将被唤醒。

如果该方法返回 True,则 Future 未被取消,并且已设置为运行状态,即调用 Future.running() 将返回 True

此方法只能调用一次,并且不能在调用 Future.set_result()Future.set_exception() 之后调用。

set_result(result)

将与 Future 关联的工作的结果设置为 *result*。

此方法仅应由 Executor 实现和单元测试使用。

3.8 版本更改: 如果 Future 已完成,则此方法引发 concurrent.futures.InvalidStateError

set_exception(exception)

将与 Future 相关联的工作结果设置为 Exception exception

此方法仅应由 Executor 实现和单元测试使用。

3.8 版本更改: 如果 Future 已完成,则此方法引发 concurrent.futures.InvalidStateError

模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 给出的 Future 实例(可能是由不同的 Executor 实例创建)完成。传递给 fs 的重复 futures 将被删除,并且只会返回一次。返回一个包含两个集合的命名元组。第一个集合名为 done,包含在等待完成之前完成(已完成或已取消的 futures)的 futures。第二个集合名为 not_done,包含未完成的 futures(挂起或正在运行的 futures)。

timeout 可用于控制返回前等待的最大秒数。timeout 可以是 int 或 float。如果未指定 timeout 或为 None,则等待时间没有限制。

return_when 指示此函数应何时返回。它必须是以下常量之一

常量

描述

concurrent.futures.FIRST_COMPLETED

当任何 future 完成或被取消时,该函数将返回。

concurrent.futures.FIRST_EXCEPTION

当任何 future 通过引发异常完成时,该函数将返回。如果没有 future 引发异常,则它等效于 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

当所有 futures 完成或被取消时,该函数将返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一个迭代器,该迭代器遍历由 fs 给出的 Future 实例(可能是由不同的 Executor 实例创建),当这些 futures 完成时(已完成或已取消的 futures)产生 futures。fs 给出的任何重复的 futures 将返回一次。在调用 as_completed() 之前完成的任何 futures 将首先产生。如果调用 __next__() 并且从最初调用 as_completed() 后经过 timeout 秒结果仍然不可用,则返回的迭代器会引发 TimeoutErrortimeout 可以是 int 或 float。如果未指定 timeout 或为 None,则等待时间没有限制。

参见

PEP 3148 – futures - 异步执行计算

该提案描述了将此功能包含在 Python 标准库中的情况。

异常类

exception concurrent.futures.CancelledError

当 future 被取消时引发。

exception concurrent.futures.TimeoutError

TimeoutError 的已弃用别名,当 future 操作超出给定的超时时间时引发。

在 3.11 版本中更改:此类被设为 TimeoutError 的别名。

exception concurrent.futures.BrokenExecutor

从此类派生自 RuntimeError,当执行器因某种原因损坏,无法用于提交或执行新任务时,会引发此异常类。

3.7 版本中新增。

exception concurrent.futures.InvalidStateError

当在 future 上执行在当前状态下不允许的操作时引发。

3.8 版本中新增。

exception concurrent.futures.thread.BrokenThreadPool

从此类派生自 BrokenExecutor,当 ThreadPoolExecutor 的一个工作线程初始化失败时,会引发此异常类。

3.7 版本中新增。

exception concurrent.futures.process.BrokenProcessPool

从此类派生自 BrokenExecutor (以前是 RuntimeError),当 ProcessPoolExecutor 的一个工作线程以非干净的方式终止时(例如,如果它是从外部杀死的),会引发此异常类。

3.3 版本中新增。