concurrent.futures — 启动并行任务

3.2 版新增。

源代码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块为异步执行可调用对象提供了一个高级接口。

异步执行可以使用线程(使用 ThreadPoolExecutor)或单独的进程(使用 ProcessPoolExecutor)来执行。两者都实现了相同的接口,该接口由抽象类 Executor 定义。

可用性:不可用于 Emscripten,不可用于 WASI。

此模块在 WebAssembly 平台 wasm32-emscriptenwasm32-wasi 上不可用或无法工作。有关更多信息,请参阅 WebAssembly 平台

执行器对象

class concurrent.futures.Executor

一个抽象类,提供了异步执行调用的方法。它不应该直接使用,而应该通过它的具体子类使用。

submit(fn, /, *args, **kwargs)

安排可调用对象 fn 作为 fn(*args, **kwargs) 执行,并返回一个表示可调用对象执行的 Future 对象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

类似于 map(fn, *iterables),除了

  • iterables 是立即收集的,而不是延迟收集的;

  • fn 是异步执行的,并且可以同时进行对 fn 的多次调用。

如果调用 __next__() 并且在最初调用 Executor.map()timeout 秒内结果不可用,则返回的迭代器将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 fn 调用引发异常,则在从迭代器检索其值时将引发该异常。

使用 ProcessPoolExecutor 时,此方法会将 iterables 切割成多个块,并将这些块作为单独的任务提交给池。可以通过将 chunksize 设置为正整数来指定这些块的(近似)大小。对于非常长的可迭代对象,与默认大小 1 相比,使用较大的 chunksize 值可以显著提高性能。对于 ThreadPoolExecutorchunksize 无效。

3.5 版更动: 添加了 chunksize 参数。

shutdown(wait=True, *, cancel_futures=False)

向执行器发送信号,指示它应该在当前挂起的期货执行完毕后释放其正在使用的任何资源。在关闭后调用 Executor.submit()Executor.map() 将引发 RuntimeError

如果 waitTrue,则此方法将一直等到所有挂起的期货执行完毕并且与执行器关联的资源已释放后才会返回。如果 waitFalse,则此方法将立即返回,并且与执行器关联的资源将在所有挂起的期货执行完毕后释放。无论 wait 的值是什么,整个 Python 程序都只有在所有挂起的期货执行完毕后才会退出。

如果 cancel_futuresTrue,则此方法将取消执行器尚未开始运行的所有挂起的期货。无论 cancel_futures 的值是什么,已完成或正在运行的任何期货都不会被取消。

如果 cancel_futureswait 都为 True,则执行器已开始运行的所有期货都将在该方法返回之前完成。其余的期货将被取消。

如果使用 with 语句,则可以避免必须显式调用此方法,该语句将关闭 Executor(等待,就好像使用 wait 设置为 True 调用了 Executor.shutdown()

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

3.9 版更动: 添加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutorExecutor 的一个子类,它使用线程池来异步执行调用。

当与 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。例如

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一个 Executor 子类,它使用最多 max_workers 个线程的池来异步执行调用。

排队到 ThreadPoolExecutor 的所有线程将在解释器退出之前被连接。请注意,执行此操作的退出处理程序是在使用 atexit 添加的任何退出处理程序*之前*执行的。这意味着必须捕获和处理主线程中的异常,以便通知线程正常退出。因此,建议不要将 ThreadPoolExecutor 用于长时间运行的任务。

initializer 是一个可选的可调用对象,它在每个工作线程启动时被调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,所有当前挂起的作业都将引发 BrokenThreadPool,任何向池中提交更多作业的尝试也将如此。

在 3.5 版更改: 如果 max_workersNone 或未给出,它将默认为机器上的处理器数量乘以 5,假设 ThreadPoolExecutor 通常用于重叠 I/O 而不是 CPU 工作,并且工作线程的数量应该高于 ProcessPoolExecutor 的工作线程数量。

在 3.6 版更改: 添加了 thread_name_prefix 参数,允许用户控制池创建的工作线程的 threading.Thread 名称,以便于调试。

在 3.7 版更改: 添加了 initializerinitargs 参数。

在 3.8 版更改: max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值至少为 I/O 密集型任务保留 5 个工作线程。它最多使用 32 个 CPU 内核来处理释放 GIL 的 CPU 密集型任务。并且它避免了在多核机器上隐式使用非常大的资源。

ThreadPoolExecutor 现在会在启动 max_workers 个工作线程之前重用空闲的工作线程。

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池来异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 模块,这使得它可以避开 全局解释器锁,但也意味着只能执行和返回可腌制的对象。

工作子进程必须能够导入 __main__ 模块。这意味着 ProcessPoolExecutor 在交互式解释器中不起作用。

从提交到 ProcessPoolExecutor 的可调用对象调用 ExecutorFuture 方法将导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

一个 Executor 子类,它使用最多 max_workers 个进程的池来异步执行调用。如果 max_workersNone 或未给出,它将默认为机器上的处理器数量。如果 max_workers 小于或等于 0,则会引发 ValueError。在 Windows 上,max_workers 必须小于或等于 61。如果不是,则会引发 ValueError。如果 max_workersNone,则即使有更多处理器可用,选择的默认值最多为 61mp_context 可以是 multiprocessing 上下文或 None。它将用于启动工作进程。如果 mp_contextNone 或未给出,则使用默认的 multiprocessing 上下文。请参阅 上下文和启动方法

initializer 是一个可选的可调用对象,它在每个工作进程启动时被调用;initargs 是传递给初始化器的参数元组。如果 initializer 引发异常,所有当前挂起的作业都将引发 BrokenProcessPool,任何向池中提交更多作业的尝试也将如此。

max_tasks_per_child 是一个可选参数,用于指定单个进程在退出并被新的工作进程替换之前可以执行的最大任务数。默认情况下,max_tasks_per_childNone,这意味着工作进程将与池一样长久运行。如果指定了最大值,则在没有 mp_context 参数的情况下,默认将使用“spawn”多处理启动方法。此功能与“fork”启动方法不兼容。

版本 3.3 中的变化: 当其中一个工作进程异常终止时,现在会引发 BrokenProcessPool 错误。以前,行为未定义,但对执行器或其 Future 的操作通常会冻结或死锁。

版本 3.7 中的变化: 添加了 mp_context 参数,以允许用户控制池创建的工作进程的 start_method。

添加了 initializerinitargs 参数。

注意

默认的 multiprocessing 启动方法(请参阅 上下文和启动方法)将在 Python 3.14 中从 fork 更改。需要为其 ProcessPoolExecutor 使用 fork 的代码应通过传递 mp_context=multiprocessing.get_context("fork") 参数来明确指定。

版本 3.11 中的变化: 添加了 max_tasks_per_child 参数,以允许用户控制池中工作进程的生命周期。

版本 3.12 中的变化: 在 POSIX 系统上,如果您的应用程序有多个线程并且 multiprocessing 上下文使用 "fork" 启动方法:内部调用以生成工作进程的 os.fork() 函数可能会引发 DeprecationWarning。传递配置为使用不同启动方法的 mp_context。有关进一步的解释,请参阅 os.fork() 文档。

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 对象

Future 类封装了可调用对象的异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

封装可调用对象的异步执行。Future 实例由 Executor.submit() 创建,并且不应直接创建,除非是为了测试。

cancel()

尝试取消调用。如果调用当前正在执行或已完成运行且无法取消,则该方法将返回 False,否则调用将被取消,并且该方法将返回 True

cancelled()

如果调用已成功取消,则返回 True

running()

如果调用当前正在执行且无法取消,则返回 True

done()

如果调用已成功取消或已完成运行,则返回 True

result(timeout=None)

返回调用返回的值。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果调用在 timeout 秒内未完成,则将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 Future 在完成之前被取消,则将引发 CancelledError

如果调用引发了异常,则此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果调用在 timeout 秒内未完成,则将引发 TimeoutErrortimeout 可以是整数或浮点数。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 Future 在完成之前被取消,则将引发 CancelledError

如果调用完成时没有引发异常,则返回 None

add_done_callback(fn)

将可调用对象 fn 附加到 Future。当 Future 被取消或完成运行时,将调用 fn,并将 Future 作为其唯一参数。

添加的可调用对象将按照添加顺序调用,并且始终在添加它们的进程所属的线程中调用。如果可调用对象引发 Exception 子类,则将记录并忽略它。如果可调用对象引发 BaseException 子类,则行为未定义。

如果 Future 已完成或已被取消,则将立即调用 fn

以下 Future 方法旨在用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

此方法只能由 Executor 实现调用,在执行与 Future 相关的任务之前,以及由单元测试调用。

如果该方法返回 False,则表示 Future 已被取消,即调用了 Future.cancel() 并返回了 True。任何等待 Future 完成的线程(例如,通过 as_completed()wait())都将被唤醒。

如果该方法返回 True,则表示 Future 未被取消,并且已进入运行状态,即调用 Future.running() 将返回 True

此方法只能调用一次,并且不能在调用 Future.set_result()Future.set_exception() 之后调用。

set_result(result)

将与 Future 关联的任务的结果设置为 *result*。

此方法只能由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已完成,此方法将引发 concurrent.futures.InvalidStateError

set_exception(exception)

将与 Future 关联的任务的结果设置为 Exception *exception*。

此方法只能由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已完成,此方法将引发 concurrent.futures.InvalidStateError

模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 *fs* 给出的 Future 实例(可能由不同的 Executor 实例创建)完成。*fs* 中给出的重复期货将被移除,并且只返回一次。返回一个命名的 2 元组集合。第一个集合名为 done,包含在等待完成之前已完成的期货(已完成或已取消的期货)。第二个集合名为 not_done,包含未完成的期货(待处理或正在运行的期货)。

*timeout* 可用于控制返回之前的最大等待秒数。*timeout* 可以是整数或浮点数。如果未指定 *timeout* 或为 None,则等待时间没有限制。

*return_when* 指示此函数应何时返回。它必须是以下常量之一

常量

描述

concurrent.futures.FIRST_COMPLETED

当任何期货完成或被取消时,该函数将返回。

concurrent.futures.FIRST_EXCEPTION

当任何期货因引发异常而完成时,该函数将返回。如果没有期货引发异常,则它等效于 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

当所有期货完成或被取消时,该函数将返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一个迭代器,该迭代器遍历由 *fs* 给出的 Future 实例(可能由不同的 Executor 实例创建),并在期货完成(已完成或已取消的期货)时生成它们。*fs* 中给出的任何重复期货都将返回一次。在调用 as_completed() 之前已完成的任何期货将首先生成。如果调用 __next__() 并且在最初调用 as_completed() 后的 *timeout* 秒内结果不可用,则返回的迭代器将引发 TimeoutError。*timeout* 可以是整数或浮点数。如果未指定 *timeout* 或为 None,则等待时间没有限制。

另请参阅

PEP 3148 – 期货 - 异步执行计算

建议将此功能包含在 Python 标准库中。

异常类

exception concurrent.futures.CancelledError

当期货被取消时引发。

exception concurrent.futures.TimeoutError

TimeoutError 的已弃用别名,当 future 操作超过给定超时时引发。

在 3.11 版更改: 此类已成为 TimeoutError 的别名。

exception concurrent.futures.BrokenExecutor

此异常类派生自 RuntimeError,当执行器因某种原因损坏且无法用于提交或执行新任务时引发。

3.7 版的新增功能。

exception concurrent.futures.InvalidStateError

当对当前状态不允许的 future 执行操作时引发。

3.8 版的新增功能。

exception concurrent.futures.thread.BrokenThreadPool

此异常类派生自 BrokenExecutor,当 ThreadPoolExecutor 的某个工作线程初始化失败时引发。

3.7 版的新增功能。

exception concurrent.futures.process.BrokenProcessPool

此异常类派生自 BrokenExecutor(以前为 RuntimeError),当 ProcessPoolExecutor 的某个工作进程以非正常方式终止时引发(例如,如果它从外部被杀死)。

3.3 版的新增功能。