concurrent.futures --- 启动并行任务

在 3.2 版本加入。

源代码: Lib/concurrent/futures/thread.py, Lib/concurrent/futures/process.py, and Lib/concurrent/futures/interpreter.py


concurrent.futures 模块提供了一个高级接口,用于异步执行可调用对象。

异步执行可以通过线程池(使用 ThreadPoolExecutorInterpreterPoolExecutor)或独立的进程池(使用 ProcessPoolExecutor)来执行。它们都实现了相同的接口,该接口由抽象类 Executor 定义。

可用性:非 WASI。

此模块在 WebAssembly 上不起作用或不可用。有关更多信息,请参阅 WebAssembly 平台

执行器对象

class concurrent.futures.Executor

一个提供异步执行方法的抽象类。它不应该被直接使用,而应通过其具体子类使用。

submit(fn, /, *args, **kwargs)

安排可调用对象 fnfn(*args, **kwargs) 的方式执行,并返回一个表示该可调用对象执行情况的 Future 对象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

类似于 map(fn, *iterables),不同之处在于:

  • iterables 是立即收集的,而不是惰性收集的,除非指定了 buffersize 来限制尚未产出结果的已提交任务数量。如果缓冲区已满,对 iterables 的迭代将暂停,直到从缓冲区中产出一个结果。

  • fn 是异步执行的,对 fn 的多次调用可能会并发进行。

如果调用了 __next__(),并且在从最初调用 Executor.map() 算起的 timeout 秒后结果仍不可用,则返回的迭代器将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则没有等待时间限制。

如果一个 fn 调用引发了异常,那么当从迭代器中获取其值时,该异常将被引发。

当使用 ProcessPoolExecutor 时,此方法会将 iterables 切分成多个块,并将它们作为单独的任务提交到池中。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。对于非常长的可迭代对象,使用较大的 chunksize 值可以显著提高性能,相比于默认大小 1。对于 ThreadPoolExecutorInterpreterPoolExecutorchunksize 没有效果。

在 3.5 版更改: 增加了 chunksize 形参。

在 3.14 版更改: 增加了 buffersize 参数。

shutdown(wait=True, *, cancel_futures=False)

通知执行器,在当前待处理的 future 执行完毕后,应释放其正在使用的任何资源。在 shutdown 后调用 Executor.submit()Executor.map() 将会引发 RuntimeError

如果 waitTrue,则此方法将不会返回,直到所有待处理的 future 都执行完毕并且与执行器关联的资源都已释放。如果 waitFalse,则此方法将立即返回,与执行器关联的资源将在所有待处理的 future 执行完毕后释放。无论 wait 的值如何,整个 Python 程序都不会退出,直到所有待处理的 future 都执行完毕。

如果 cancel_futuresTrue,此方法将取消所有执行器尚未开始运行的待处理 future。任何已完成或正在运行的 future 都不会被取消,无论 cancel_futures 的值如何。

如果 cancel_futureswait 均为 True,则执行器已开始运行的所有 future 将在此方法返回之前完成。剩余的 future 将被取消。

如果你通过 with 语句将执行器用作上下文管理器,则可以避免显式调用此方法。这会自动关闭 Executor(等待方式如同调用 Executor.shutdown() 时将 wait 设置为 True)。

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版更改: 添加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一个 Executor 子类,它使用线程池来异步执行调用。

当与一个 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一个 Executor 子类,它使用一个最多包含 max_workers 个线程的池来异步执行调用。

所有加入 ThreadPoolExecutor 队列的线程都会在解释器退出前被 join。请注意,执行此操作的退出处理程序在任何使用 atexit 添加的退出处理程序 *之前* 执行。这意味着主线程中的异常必须被捕获和处理,以便向线程发送优雅退出的信号。因此,建议不要将 ThreadPoolExecutor 用于长时间运行的任务。

initializer 是一个可选的可调用对象,在每个工作线程启动时被调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,所有当前待处理的作业都将引发 BrokenThreadPool,任何向池中提交更多作业的尝试也将如此。

在 3.5 版更改: 如果 max_workersNone 或未给出,它将默认为机器上的处理器数量乘以 5,假设 ThreadPoolExecutor 通常用于重叠 I/O 而不是 CPU 工作,因此工作线程数应高于 ProcessPoolExecutor 的工作线程数。

在 3.6 版更改: 添加了 thread_name_prefix 参数,允许用户控制由池创建的工作线程的 threading.Thread 名称,以便于调试。

在 3.7 版更改: 添加了 initializerinitargs 参数。

在 3.8 版更改: max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值至少保留 5 个工作线程用于 I/O 密集型任务。它最多利用 32 个 CPU 核心用于释放 GIL 的 CPU 密集型任务。并且它避免了在多核机器上隐式使用非常大的资源。

ThreadPoolExecutor 现在会在启动 max_workers 个工作线程之前重用空闲的工作线程。

在 3.13 版更改: max_workers 的默认值更改为 min(32, (os.process_cpu_count() or 1) + 4)

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

InterpreterPoolExecutor

InterpreterPoolExecutor 类使用解释器池来异步执行调用。它是 ThreadPoolExecutor 的子类,这意味着每个工作者都在自己的线程中运行。这里的区别在于每个工作者都有自己的解释器,并使用该解释器运行每个任务。

使用解释器而非仅用线程的最大好处是实现了真正的多核并行。每个解释器都有自己的全局解释器锁,因此在一个解释器中运行的代码可以在一个 CPU 核心上运行,而另一个解释器中的代码可以在不同的核心上无阻塞地运行。

权衡之处在于,为多解释器编写并发代码可能需要额外的努力。然而,这是因为它迫使你谨慎地考虑解释器如何以及何时交互,并明确指定哪些数据在解释器之间共享。这带来了一些好处,有助于平衡额外的努力,包括真正的多核并行。例如,以这种方式编写的代码可以更容易地推理并发性。另一个主要好处是,你不必处理使用线程时的一些主要痛点,比如竞争条件。

每个工作者的解释器都与所有其他解释器隔离。“隔离”意味着每个解释器都有自己的运行时状态,并且完全独立运行。例如,如果你在一个解释器中重定向 sys.stdout,它不会自动重定向到任何其他解释器。如果你在一个解释器中导入一个模块,它不会自动在任何其他解释器中导入。你需要在需要它的解释器中单独导入该模块。事实上,在一个解释器中导入的每个模块都是一个与不同解释器中相同模块完全独立的对象,包括 sysbuiltins,甚至 __main__

隔离意味着一个可变对象或其他数据不能同时被多个解释器使用。这实际上意味着解释器之间无法真正共享这类对象或数据。相反,每个解释器必须有自己的副本,并且你必须手动同步副本之间的任何更改。不可变对象和数据,如内置单例、字符串和不可变对象的元组,则没有这些限制。

在解释器之间进行通信和同步最有效的方法是使用专用工具,例如 PEP 734 中提出的工具。一种效率较低的替代方法是使用 pickle 进行序列化,然后通过共享的 socketpipe 发送字节。

class concurrent.futures.InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一个 ThreadPoolExecutor 的子类,它使用一个最多包含 max_workers 个线程的池来异步执行调用。每个线程都在自己的解释器中运行任务。工作解释器彼此隔离,这意味着每个解释器都有自己的运行时状态,并且它们不能共享任何可变对象或其他数据。每个解释器都有自己的全局解释器锁,这意味着使用此执行器运行的代码具有真正的多核并行性。

可选的 initializerinitargs 参数与 ThreadPoolExecutor 的含义相同:初始化器在每个工作者创建时运行,但在这种情况下,它是在工作者的解释器中运行的。执行器在将 initializerinitargs 发送给工作者的解释器时,会使用 pickle 对它们进行序列化。

备注

执行器可能会将来自 initializer 的未捕获异常替换为 ExecutionFailed

来自父类 ThreadPoolExecutor 的其他注意事项也适用于此。

submit()map() 的工作方式与正常情况类似,只是工作者在将可调用对象和参数发送到其解释器时会使用 pickle 进行序列化。同样,工作者在返回结果时也会序列化返回值。

当工作者的当前任务引发未捕获的异常时,工作者总是尝试按原样保留该异常。如果成功,它还会将 __cause__ 设置为相应的 ExecutionFailed 实例,该实例包含原始异常的摘要。在少数情况下,如果工作者无法按原样保留原始异常,它会直接保留相应的 ExecutionFailed 实例。

ProcessPoolExecutor

ProcessPoolExecutor 类是 Executor 的一个子类,它使用进程池来异步执行调用。ProcessPoolExecutor 使用 multiprocessing 模块,这使得它能够绕过全局解释器锁,但这也意味着只有可 pickle 的对象才能被执行和返回。

__main__ 模块必须能被工作者子进程导入。这意味着 ProcessPoolExecutor 在交互式解释器中将无法工作。

从提交给 ProcessPoolExecutor 的可调用对象中调用 ExecutorFuture 的方法将导致死锁。

请注意,根据 multiprocessing.Process 的要求,函数和参数需要是可 pickle 的限制,在使用 submit()map()ProcessPoolExecutor 时同样适用。在 REPL 中定义的函数或 lambda 表达式预计将无法工作。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

一个 Executor 子类,它使用一个最多包含 max_workers 个进程的池来异步执行调用。如果 max_workersNone 或未给出,它将默认为 os.process_cpu_count()。如果 max_workers 小于或等于 0,则将引发 ValueError。在 Windows 上,max_workers 必须小于或等于 61。否则将引发 ValueError。如果 max_workersNone,则即使有更多可用的处理器,选择的默认值也最多为 61mp_context 可以是 multiprocessing 上下文或 None。它将用于启动工作进程。如果 mp_contextNone 或未给出,则使用默认的 multiprocessing 上下文。参见上下文和启动方法

initializer 是一个可选的可调用对象,在每个工作进程启动时被调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,所有当前待处理的作业都将引发 BrokenProcessPool,任何向池中提交更多作业的尝试也将如此。

max_tasks_per_child 是一个可选参数,指定单个进程在退出并被一个新的工作进程替换之前可以执行的最大任务数。默认情况下,max_tasks_per_childNone,这意味着工作进程将与池一样长寿。当指定了最大值时,在没有 mp_context 参数的情况下,默认将使用 "spawn" 多进程启动方法。此功能与 "fork" 启动方法不兼容。

在 3.3 版更改: 当其中一个工作进程突然终止时,现在会引发 BrokenProcessPool 错误。以前,行为是未定义的,但对执行器或其 future 的操作通常会冻结或死锁。

在 3.7 版更改: 添加了 mp_context 参数,允许用户控制由池创建的工作进程的 start_method。

添加了 initializerinitargs 参数。

在 3.11 版更改: 添加了 max_tasks_per_child 参数,允许用户控制池中工作进程的生命周期。

在 3.12 版更改: 在 POSIX 系统上,如果您的应用程序有多个线程,并且 multiprocessing 上下文使用 "fork" 启动方法:内部调用以生成工作进程的 os.fork() 函数可能会引发 DeprecationWarning。请传递一个配置为使用不同启动方法的 mp_context。有关进一步解释,请参阅 os.fork() 文档。

在 3.13 版更改: max_workers 默认使用 os.process_cpu_count(),而不是 os.cpu_count()

在 3.14 版更改: 默认进程启动方法(见上下文和启动方法)已不再是 fork。如果你需要为 ProcessPoolExecutor 使用 fork 启动方法,你必须显式传递 mp_context=multiprocessing.get_context("fork")

terminate_workers()

通过对每个活动工作进程调用 Process.terminate,尝试立即终止所有活动的工作进程。在内部,它还将调用 Executor.shutdown() 以确保与执行器关联的所有其他资源都被释放。

调用此方法后,调用者不应再向执行器提交任务。

在 3.14 版本加入。

kill_workers()

通过对每个活动工作进程调用 Process.kill,尝试立即杀死所有活动的工作进程。在内部,它还将调用 Executor.shutdown() 以确保与执行器关联的所有其他资源都被释放。

调用此方法后,调用者不应再向执行器提交任务。

在 3.14 版本加入。

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 对象

Future 类封装了可调用对象的异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

封装可调用对象的异步执行。Future 实例由 Executor.submit() 创建,除了用于测试外,不应直接创建。

cancel()

尝试取消调用。如果调用当前正在执行或已完成运行且无法取消,则该方法将返回 False,否则调用将被取消,并且该方法将返回 True

cancelled()

如果调用被成功取消,则返回 True

running()

如果调用当前正在执行且无法取消,则返回 True

done()

如果调用被成功取消或完成运行,则返回 True

result(timeout=None)

返回调用返回的值。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果调用在 timeout 秒内未完成,则将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则没有等待时间限制。

如果在完成前 future 被取消,则将引发 CancelledError

如果调用引发了异常,此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果调用在 timeout 秒内未完成,则将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则没有等待时间限制。

如果在完成前 future 被取消,则将引发 CancelledError

如果调用完成而没有引发异常,则返回 None

add_done_callback(fn)

将可调用对象 fn 附加到 future。当 future 被取消或完成运行时,将以 future 作为其唯一参数调用 fn

添加的可调用对象按照添加的顺序被调用,并且总是在添加它们的进程的线程中被调用。如果可调用对象引发 Exception 子类,它将被记录并忽略。如果可调用对象引发 BaseException 子类,则行为是未定义的。

如果 future 已经完成或被取消,fn 将立即被调用。

以下 Future 方法旨在用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

此方法只应由 Executor 实现和单元测试在执行与 Future 相关的工作之前调用。

如果方法返回 False,则 Future 已被取消,即 Future.cancel() 被调用并返回了 True。任何等待 Future 完成的线程(即通过 as_completed()wait())都将被唤醒。

如果方法返回 True,则 Future 未被取消,并已进入运行状态,即调用 Future.running() 将返回 True

此方法只能调用一次,并且不能在 Future.set_result()Future.set_exception() 被调用后调用。

set_result(result)

将与 Future 关联的工作结果设置为 result

此方法只应由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已经完成,此方法将引发 concurrent.futures.InvalidStateError

set_exception(exception)

将与 Future 关联的工作结果设置为 Exception exception

此方法只应由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已经完成,此方法将引发 concurrent.futures.InvalidStateError

模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)完成。给 fs 的重复 future 会被移除,且只返回一次。返回一个由两个集合组成的命名元组。第一个集合名为 done,包含在等待完成前完成的 future(已完成或已取消的 future)。第二个集合名为 not_done,包含未完成的 future(待处理或正在运行的 future)。

timeout 可用于控制返回前等待的最大秒数。timeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则没有等待时间限制。

return_when 指示此函数何时应返回。它必须是以下常量之一:

常量

描述

concurrent.futures.FIRST_COMPLETED

当任何一个 future 完成或被取消时,该函数将返回。

concurrent.futures.FIRST_EXCEPTION

当任何一个 future 因引发异常而完成时,该函数将返回。如果没有 future 引发异常,则它等同于 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

当所有 future 完成或被取消时,该函数将返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一个迭代器,该迭代器遍历由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建),并在 future 完成(完成或取消)时产生它们。由 fs 提供的任何重复 future 将只返回一次。在调用 as_completed() 之前已完成的任何 future 将首先被产生。如果调用了 __next__(),并且在从最初调用 as_completed() 算起的 timeout 秒后结果仍不可用,则返回的迭代器将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则没有等待时间限制。

参见

PEP 3148 – future - 异步执行计算

描述将此功能纳入 Python 标准库的提案。

异常类

exception concurrent.futures.CancelledError

当 future 被取消时引发。

exception concurrent.futures.TimeoutError

TimeoutError 的已弃用别名,当 future 操作超过给定超时时引发。

在 3.11 版更改: 这个类成为了 TimeoutError 的别名。

exception concurrent.futures.BrokenExecutor

派生自 RuntimeError,当执行器因某种原因损坏,无法用于提交或执行新任务时,会引发此异常类。

在 3.7 版本加入。

exception concurrent.futures.InvalidStateError

在对一个处于不允许该操作的当前状态的 future 执行操作时引发。

在 3.8 版本加入。

exception concurrent.futures.thread.BrokenThreadPool

派生自 BrokenExecutor,当 ThreadPoolExecutor 的一个工作线程初始化失败时,会引发此异常类。

在 3.7 版本加入。

exception concurrent.futures.interpreter.BrokenInterpreterPool

派生自 BrokenThreadPool,当 InterpreterPoolExecutor 的一个工作线程初始化失败时,会引发此异常类。

在 3.14 版本加入。

exception concurrent.futures.interpreter.ExecutionFailed

当给定的初始化器失败时,从 InterpreterPoolExecutor 中引发;或者当提交的任务中存在未捕获的异常时,从 submit() 中引发。

在 3.14 版本加入。

exception concurrent.futures.process.BrokenProcessPool

派生自 BrokenExecutor(以前是 RuntimeError),当 ProcessPoolExecutor 的一个工作进程以非正常方式终止时(例如,如果它被从外部杀死),会引发此异常类。

在 3.3 版本加入。