multiprocessing — 基于进程的并行

源代码: Lib/multiprocessing/


可用性: 不支持 Emscripten 和 WASI。

此模块在 WebAssembly 平台 wasm32-emscriptenwasm32-wasi 上无法工作或不可用。有关更多信息,请参见 WebAssembly 平台

简介

multiprocessing 是一个支持使用类似于 threading 模块的 API 来生成进程的包。 multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程,有效地绕过了 全局解释器锁。因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它在 POSIX 和 Windows 上运行。

multiprocessing 模块还引入了在 threading 模块中没有类似物的 API。一个主要例子是 Pool 对象,它提供了一种方便的方法来跨多个输入值并行执行函数,将输入数据分布到多个进程(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个使用 Pool 的数据并行基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

另请参见

concurrent.futures.ProcessPoolExecutor 提供了一个更高级别的接口,用于将任务推送到后台进程,而不会阻塞调用进程的执行。与直接使用 Pool 接口相比,concurrent.futures API 更容易地将工作提交到底层进程池,并与等待结果分开。

Process

multiprocessing 中,进程是通过创建 Process 对象,然后调用其 start() 方法来生成的。 Process 遵循 threading.Thread 的 API。一个简单的多进程程序示例是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了显示涉及的各个进程 ID,这里有一个扩展的示例

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关 if __name__ == '__main__' 部分为何必要的解释,请参见 编程指南

上下文和启动方法

根据平台的不同,multiprocessing 支持三种启动进程的方式。这些启动方法

spawn

父进程启动一个新的 Python 解释器进程。子进程只继承运行进程对象 run() 方法所需的资源。特别是,父进程中不必要的文件描述符和句柄不会被继承。与使用 forkforkserver 相比,使用此方法启动进程相当慢。

在 POSIX 和 Windows 平台上可用。Windows 和 macOS 上的默认值。

fork

父进程使用 os.fork() 来派生 Python 解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地派生多线程进程是有问题的。

在 POSIX 系统上可用。目前在 POSIX 上是默认值,除了 macOS。

注意

Python 3.14 中的默认启动方法将从 fork 更改。需要 fork 的代码应通过 get_context()set_start_method() 明确指定。

在 3.12 版本中更改: 如果 Python 能够检测到您的进程有多个线程,则此启动方法在内部调用的 os.fork() 函数将引发 DeprecationWarning。使用其他启动方法。有关更多解释,请参见 os.fork() 文档。

forkserver

当程序启动并选择 forkserver 启动方法时,将生成一个服务器进程。从那时起,每当需要一个新进程时,父进程都会连接到服务器并请求它派生一个新进程。fork 服务器进程是单线程的,除非系统库或预加载的导入作为副作用生成线程,因此它通常可以安全地使用 os.fork()。不会继承任何不必要的资源。

在支持通过 Unix 管道传递文件描述符的 POSIX 平台上可用,例如 Linux。

版本 3.4 中的变更: spawn 在所有 POSIX 平台上添加,forkserver 在某些 POSIX 平台上添加。子进程不再继承 Windows 上所有父进程的可继承句柄。

版本 3.8 中的变更: 在 macOS 上,spawn 启动方法现在是默认方法。fork 启动方法应被视为不安全,因为它会导致子进程崩溃,因为 macOS 系统库可能会启动线程。参见 bpo-33725

在 POSIX 上,使用 spawnforkserver 启动方法也会启动一个 资源跟踪器 进程,该进程跟踪程序进程创建的未链接的命名系统资源(例如命名信号量或 SharedMemory 对象)。当所有进程都退出时,资源跟踪器会取消链接任何剩余的跟踪对象。通常应该没有,但如果进程被信号杀死,可能会有一些“泄漏”的资源。(泄漏的信号量或共享内存段在下次重启之前不会自动取消链接。这对这两个对象来说都是问题,因为系统只允许有限数量的命名信号量,并且共享内存段会占用主内存中的某些空间。)

要选择启动方法,请在主模块的 if __name__ == '__main__' 子句中使用 set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 在程序中不应使用超过一次。

或者,您可以使用 get_context() 获取上下文对象。上下文对象具有与 multiprocessing 模块相同的 API,并允许在同一个程序中使用多个启动方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用 fork 上下文创建的锁不能传递给使用 spawnforkserver 启动方法启动的进程。

想要使用特定启动方法的库可能应该使用 get_context() 以避免干扰库用户的选择。

警告

'spawn''forkserver' 启动方法通常不能与 POSIX 系统上的“冻结”可执行文件(即由 PyInstallercx_Freeze 等包生成的二进制文件)一起使用。如果代码不使用线程,则 'fork' 启动方法可能会起作用。

进程间交换对象

multiprocessing 支持进程之间两种类型的通信通道

队列

Queue 类是 queue.Queue 的近似克隆。例如

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。

管道

Pipe() 函数返回一对由管道连接的连接对象,默认情况下是双工(双向)的。例如

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的两个连接对象代表管道的两端。每个连接对象都有 send()recv() 方法(以及其他方法)。请注意,如果两个进程(或线程)尝试同时从管道的同一端读取或写入,则管道中的数据可能会损坏。当然,如果进程同时使用管道的不同端,则不会存在损坏风险。

进程间同步

multiprocessing 包含来自 threading 的所有同步原语的等效项。例如,可以使用锁来确保一次只有一个进程打印到标准输出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用锁,来自不同进程的输出可能会全部混在一起。

进程间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。在使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么 multiprocessing 提供了几种方法来实现这一点。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。例如,以下代码

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

在创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型代码:'d' 表示双精度浮点数,'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。

服务器进程

Manager() 返回的管理器对象控制一个服务器进程,该进程保存 Python 对象并允许其他进程使用代理来操作它们。

Manager() 返回的管理器将支持类型 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray。例如,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由不同计算机上的进程通过网络共享。但是,它们比使用共享内存慢。

使用工作进程池

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。

例如

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,池的方法只能由创建它的进程使用。

注意

此包中的功能要求子进程能够导入 __main__ 模块。这在 编程指南 中有介绍,但在这里值得一提。这意味着某些示例(例如 multiprocessing.pool.Pool 示例)在交互式解释器中将无法运行。例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(如果您尝试这样做,它实际上会输出三个完整的跟踪,以半随机的方式交织在一起,然后您可能需要以某种方式停止父进程。)

参考

multiprocessing 包主要复制了 threading 模块的 API。

Process 和异常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process 对象表示在单独进程中运行的活动。 Process 类等效于 threading.Thread 的所有方法。

构造函数应始终使用关键字参数调用。group 应始终为 None;它仅存在于与 threading.Thread 的兼容性。target 是由 run() 方法调用的可调用对象。它默认为 None,这意味着不调用任何内容。name 是进程名称(有关更多详细信息,请参见 name)。args 是目标调用的参数元组。kwargs 是目标调用的关键字参数字典。如果提供,关键字参数 daemon 将进程 daemon 标志设置为 TrueFalse。如果为 None(默认值),此标志将从创建进程继承。

默认情况下,不会将任何参数传递给 targetargs 参数默认为 (),可用于指定要传递给 target 的参数列表或元组。

如果子类覆盖了构造函数,则它必须确保在对进程执行任何其他操作之前调用基类构造函数 (Process.__init__())。

在版本 3.3 中更改: 添加了 daemon 参数。

run()

表示进程活动的方法。

您可以在子类中覆盖此方法。标准 run() 方法调用传递给对象构造函数作为目标参数的可调用对象(如果有),并使用分别从 argskwargs 参数获取的顺序和关键字参数。

使用列表或元组作为传递给 Processargs 参数可以实现相同的效果。

示例

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

启动进程的活动。

每个进程对象最多只能调用一次。它安排在单独的进程中调用对象的 run() 方法。

join([timeout])

如果可选参数 timeoutNone(默认值),则该方法会阻塞,直到调用了其 join() 方法的进程终止。如果 timeout 是一个正数,它最多阻塞 timeout 秒。请注意,如果其进程终止或方法超时,该方法将返回 None。检查进程的 exitcode 以确定它是否已终止。

可以多次加入一个进程。

一个进程不能加入自身,因为这会导致死锁。在进程启动之前尝试加入进程是错误的。

name

进程的名称。名称是一个字符串,仅用于识别目的。它没有语义。多个进程可以被赋予相同的名称。

初始名称由构造函数设置。如果未向构造函数提供显式名称,则会构造一个形式为“Process-N1:N2:…:Nk”的名称,其中每个 Nk 是其父级的第 N 个子级。

is_alive()

返回进程是否存活。

粗略地说,进程对象从 start() 方法返回的那一刻起就存活,直到子进程终止。

daemon

进程的守护进程标志,一个布尔值。这必须在调用 start() 之前设置。

初始值从创建进程继承。

当进程退出时,它会尝试终止其所有守护进程子进程。

请注意,守护进程不允许创建子进程。否则,如果守护进程在父进程退出时被终止,它会使子进程成为孤儿。此外,这些不是 Unix 守护进程或服务,它们是正常的进程,如果非守护进程进程已退出,它们将被终止(并且不会加入)。

除了 threading.Thread API 之外,Process 对象还支持以下属性和方法

pid

返回进程 ID。在进程生成之前,这将为 None

exitcode

子进程的退出代码。如果进程尚未终止,这将为 None

如果子进程的 run() 方法正常返回,则退出代码将为 0。如果它通过 sys.exit() 终止,并带有整数参数 N,则退出代码将为 N

如果子进程由于在 run() 中未捕获的异常而终止,则退出代码将为 1。如果它被信号 N 终止,则退出代码将为负值 -N

authkey

进程的身份验证密钥(一个字节字符串)。

multiprocessing 初始化时,主进程使用 os.urandom() 被分配一个随机字符串。

当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改此密钥。

参见 身份验证密钥

sentinel

一个系统对象的数字句柄,当进程结束时,该句柄将变为“就绪”。

如果您想使用 multiprocessing.connection.wait() 同时等待多个事件,则可以使用此值。否则,调用 join() 更简单。

在 Windows 上,这是一个可与 WaitForSingleObjectWaitForMultipleObjects API 调用系列一起使用的操作系统句柄。在 POSIX 上,这是一个可与 select 模块中的原语一起使用的文件描述符。

在 3.3 版本中添加。

terminate()

终止进程。在 POSIX 上,这是使用 SIGTERM 信号完成的;在 Windows 上,使用 TerminateProcess()。请注意,不会执行退出处理程序和 finally 子句等。

请注意,进程的后代进程 *不会* 被终止 - 它们只会成为孤儿进程。

警告

如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并且可能无法被其他进程使用。类似地,如果进程已获取锁或信号量等,则终止它可能会导致其他进程死锁。

kill()

terminate() 相同,但在 POSIX 上使用 SIGKILL 信号。

在 3.7 版本中添加。

close()

关闭 Process 对象,释放与其关联的所有资源。如果底层进程仍在运行,则会引发 ValueError。一旦 close() 成功返回,Process 对象的大多数其他方法和属性将引发 ValueError

在 3.7 版本中添加。

请注意,start()join()is_alive()terminate()exitcode 方法只能由创建进程对象的进程调用。

一些 Process 方法的示例用法

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 异常的基类。

exception multiprocessing.BufferTooShort

当提供的缓冲区对象对于读取的消息太小时,由 Connection.recv_bytes_into() 引发的异常。

如果 eBufferTooShort 的实例,则 e.args[0] 将以字节字符串形式给出消息。

exception multiprocessing.AuthenticationError

当出现身份验证错误时引发。

exception multiprocessing.TimeoutError

由具有超时的具有超时的方法在超时到期时引发。

管道和队列

在使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免使用任何同步原语,如锁。

为了传递消息,可以使用 Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

QueueSimpleQueueJoinableQueue 类型是基于标准库中的 queue.Queue 类建模的多生产者、多消费者 FIFO 队列。它们的不同之处在于 Queue 缺少在 Python 2.5 的 queue.Queue 类中引入的 task_done()join() 方法。

如果您使用 JoinableQueue,则 *必须* 为从队列中删除的每个任务调用 JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量最终可能会溢出,从而引发异常。

请注意,还可以使用管理器对象创建共享队列 - 请参见 管理器

注意

multiprocessing 使用常见的 queue.Emptyqueue.Full 异常来表示超时。它们在 multiprocessing 命名空间中不可用,因此您需要从 queue 中导入它们。

注意

当一个对象被放入队列时,该对象会被序列化,然后一个后台线程稍后将序列化后的数据刷新到底层管道。这有一些令人惊讶的后果,但不会造成任何实际困难 - 如果它们真的困扰了你,你可以使用使用 管理器 创建的队列。

  1. 在将一个对象放入一个空队列后,队列的 empty() 方法返回 False 以及 get_nowait() 可以返回而不引发 queue.Empty 之前,可能会有一个极小的延迟。

  2. 如果多个进程正在入队对象,则对象可能以不同的顺序在另一端被接收。但是,由同一个进程入队的对象将始终相对于彼此保持预期的顺序。

警告

如果一个进程在尝试使用 Queue 时,使用 Process.terminate()os.kill() 被杀死,那么队列中的数据很可能会被破坏。这可能会导致任何其他进程在稍后尝试使用队列时获得异常。

警告

如上所述,如果子进程已将项目放入队列(并且它没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道。

这意味着,如果您尝试加入该进程,您可能会遇到死锁,除非您确定已使用队列的所有项目都被使用。同样,如果子进程是非守护进程,那么父进程在尝试加入其所有非守护进程子进程时可能会在退出时挂起。

请注意,使用管理器创建的队列没有这个问题。请参阅 编程指南

有关使用队列进行进程间通信的示例,请参阅 示例

multiprocessing.Pipe([duplex])

返回一对 (conn1, conn2)Connection 对象,它们代表管道的两端。

如果 duplexTrue(默认值),则管道是双向的。如果 duplexFalse,则管道是单向的:conn1 只能用于接收消息,而 conn2 只能用于发送消息。

class multiprocessing.Queue([maxsize])

返回一个使用管道和一些锁/信号量实现的进程共享队列。当一个进程第一次将一个项目放入队列时,一个馈送线程被启动,它将对象从缓冲区传输到管道。

标准库的 queue 模块中的常见 queue.Emptyqueue.Full 异常被引发以表示超时。

Queue 实现 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回队列的近似大小。由于多线程/多进程语义,此数字不可靠。

请注意,这可能会在 macOS 等平台上引发 NotImplementedError,因为 sem_getvalue() 在这些平台上没有实现。

empty()

如果队列为空,则返回 True,否则返回 False。由于多线程/多进程语义,这不可靠。

full()

如果队列已满,则返回 True,否则返回 False。由于多线程/多进程语义,这不可靠。

put(obj[, block[, timeout]])

将 obj 放入队列。如果可选参数 blockTrue(默认值)且 timeoutNone(默认值),则在必要时阻塞,直到有空闲插槽可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在该时间内没有空闲插槽可用,则引发 queue.Full 异常。否则(blockFalse),如果立即有空闲插槽,则将项目放入队列,否则引发 queue.Full 异常(在这种情况下,timeout 被忽略)。

在 3.8 版中变更: 如果队列已关闭,则引发 ValueError 而不是 AssertionError

put_nowait(obj)

等效于 put(obj, False)

get([block[, timeout]])

从队列中移除并返回一个项目。如果可选参数 blockTrue(默认值)且 timeoutNone(默认值),则在必要时阻塞,直到有项目可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在该时间内没有项目可用,则引发 queue.Empty 异常。否则(block 为 False),如果立即有项目可用,则返回一个项目,否则引发 queue.Empty 异常(在这种情况下,timeout 被忽略)。

在 3.8 版中变更: 如果队列已关闭,则引发 ValueError 而不是 OSError

get_nowait()

等效于 get(False)

multiprocessing.Queue 有一些在 queue.Queue 中找不到的额外方法。这些方法通常对于大多数代码来说是不必要的。

close()

指示当前进程不再向此队列添加数据。后台线程将在将所有缓冲数据刷新到管道后退出。当队列被垃圾回收时,此方法会自动调用。

join_thread()

加入后台线程。这只能在调用 close() 后使用。它会阻塞,直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。

默认情况下,如果一个进程不是队列的创建者,那么在退出时,它将尝试加入队列的后台线程。进程可以调用 cancel_join_thread() 使 join_thread() 不执行任何操作。

cancel_join_thread()

阻止 join_thread() 阻塞。特别是,这会阻止后台线程在进程退出时自动加入 - 请参阅 join_thread()

此方法的更好的名称可能是 allow_exit_without_flush()。它可能会导致排队的 数据丢失,并且您几乎肯定不需要使用它。它实际上只在您需要当前进程立即退出而无需等待将排队的 数据刷新到底层管道,并且您不关心数据丢失时才使用。

注意

此类的功能需要主机操作系统上有效的共享信号量实现。如果没有,此类中的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。有关更多信息,请参阅 bpo-3770。以下列出的任何专用队列类型也是如此。

class multiprocessing.SimpleQueue

它是一个简化的 Queue 类型,非常接近于锁定的 Pipe

close()

关闭队列:释放内部资源。

关闭队列后,队列不能再使用。例如,get()put()empty() 方法不能再调用。

在 3.9 版本中添加。

empty()

如果队列为空,则返回 True,否则返回 False

get()

从队列中移除并返回一个项目。

put(item)

item 放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 的子类,它是一个队列,此外还具有 task_done()join() 方法。

task_done()

指示以前排队的任务已完成。由队列使用者使用。对于每个用于获取任务的 get(),后续对 task_done() 的调用告诉队列任务的处理已完成。

如果当前有 join() 阻塞,则在所有项目都已处理完后(意味着对每个放入队列的项目都收到了 task_done() 调用)它将恢复。

如果调用的次数超过放入队列的项目数,则会引发 ValueError

join()

阻塞,直到队列中的所有项目都被获取并处理。

每当将项目添加到队列时,未完成任务的计数就会增加。每当使用者调用 task_done() 以指示项目已检索到并且所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 将解除阻塞。

杂项

multiprocessing.active_children()

返回当前进程的所有活动子进程的列表。

调用此方法具有“加入”任何已完成的进程的副作用。

multiprocessing.cpu_count()

返回系统中的 CPU 数量。

此数字不等于当前进程可以使用的 CPU 数量。可以使用 CPU 的数量可以通过 len(os.sched_getaffinity(0)) 获得。

当无法确定 CPU 数量时,将引发 NotImplementedError

另请参见

os.cpu_count()

multiprocessing.current_process()

返回与当前进程对应的 Process 对象。

threading.current_thread() 的类似物。

multiprocessing.parent_process()

返回与 current_process() 的父进程对应的 Process 对象。对于主进程,parent_process 将为 None

在 3.8 版本中添加。

multiprocessing.freeze_support()

为使用 multiprocessing 的程序被冻结以生成 Windows 可执行文件时添加支持。(已在 py2exePyInstallercx_Freeze 中测试)。

需要在主模块的 if __name__ == '__main__' 行之后立即调用此函数。例如

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略了 freeze_support() 行,那么尝试运行冻结的可执行文件将引发 RuntimeError

调用 freeze_support() 在除 Windows 之外的任何操作系统上调用时都不会产生任何影响。此外,如果模块正在 Windows 上由 Python 解释器正常运行(程序未被冻结),那么 freeze_support() 不会产生任何影响。

multiprocessing.get_all_start_methods()

返回支持的启动方法列表,第一个是默认方法。可能的启动方法是 'fork''spawn''forkserver'。并非所有平台都支持所有方法。请参阅 上下文和启动方法

在版本 3.4 中添加。

multiprocessing.get_context(method=None)

返回一个上下文对象,该对象具有与 multiprocessing 模块相同的属性。

如果 methodNone,则返回默认上下文。否则,method 应为 'fork''spawn''forkserver'。如果指定的启动方法不可用,则会引发 ValueError。请参阅 上下文和启动方法

在版本 3.4 中添加。

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的启动方法的名称。

如果启动方法尚未固定且 allow_none 为 false,则将启动方法固定为默认值并返回其名称。如果启动方法尚未固定且 allow_none 为 true,则返回 None

返回值可以是 'fork''spawn''forkserver'None。请参阅 上下文和启动方法

在版本 3.4 中添加。

在版本 3.8 中更改:在 macOS 上,spawn 启动方法现在是默认方法。fork 启动方法应被视为不安全,因为它会导致子进程崩溃。请参阅 bpo-33725

multiprocessing.set_executable(executable)

设置启动子进程时要使用的 Python 解释器的路径。(默认情况下使用 sys.executable)。嵌入器可能需要执行类似的操作

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

才能创建子进程。

在版本 3.4 中更改: 现在在使用 'spawn' 启动方法的 POSIX 上受支持。

在版本 3.11 中更改:接受 路径类对象

multiprocessing.set_forkserver_preload(module_names)

为 forkserver 主进程设置一个模块名称列表,以便尝试导入这些模块,以便其已导入的状态被派生的进程继承。在执行此操作时,任何 ImportError 都会被静默忽略。这可以用作性能增强,以避免在每个进程中重复工作。

为了使此方法起作用,必须在 forkserver 进程启动之前(在创建 Pool 或启动 Process 之前)调用它。

仅在使用 'forkserver' 启动方法时才有意义。请参阅 上下文和启动方法

在版本 3.4 中添加。

multiprocessing.set_start_method(method, force=False)

设置用于启动子进程的方法。method 参数可以是 'fork''spawn''forkserver'。如果启动方法已设置且 force 不是 True,则会引发 RuntimeError。如果 methodNoneforceTrue,则启动方法将设置为 None。如果 methodNoneforceFalse,则上下文将设置为默认上下文。

请注意,此方法最多应调用一次,并且应在主模块的 if __name__ == '__main__' 子句中进行保护。

请参阅 上下文和启动方法

在版本 3.4 中添加。

连接对象

连接对象允许发送和接收可腌制对象或字符串。可以将它们视为面向消息的连接套接字。

连接对象通常使用 Pipe 创建 - 另请参阅 侦听器和客户端

class multiprocessing.connection.Connection
send(obj)

将对象发送到连接的另一端,该对象应使用 recv() 读取。

该对象必须是可腌制的。非常大的腌制数据(大约 32 MiB+,具体取决于操作系统)可能会引发 ValueError 异常。

recv()

返回从连接的另一端使用 send() 发送的对象。阻塞直到有数据可以接收。如果没有任何数据可以接收并且另一端已关闭,则会引发 EOFError

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾回收时,会自动调用此方法。

poll([timeout])

返回是否有任何数据可供读取。

如果未指定 timeout,则会立即返回。如果 timeout 是一个数字,则它指定阻塞的最大时间(以秒为单位)。如果 timeoutNone,则使用无限超时。

请注意,可以使用 multiprocessing.connection.wait() 同时轮询多个连接对象。

send_bytes(buffer[, offset[, size]])

将来自 类字节对象 的字节数据作为完整消息发送。

如果给出 offset,则从 buffer 中的该位置读取数据。如果给出 size,则将从 buffer 中读取这么多字节。非常大的缓冲区(大约 32 MiB+,具体取决于操作系统)可能会引发 ValueError 异常。

recv_bytes([maxlength])

返回从连接的另一端发送的字节数据的完整消息,作为字符串。阻塞直到有数据可以接收。如果没有任何数据可以接收并且另一端已关闭,则会引发 EOFError

如果指定了 maxlength 并且消息长度超过 maxlength,则会引发 OSError,并且连接将不再可读。

在版本 3.3 中更改: 此函数以前会引发 IOError,现在它是 OSError 的别名。

recv_bytes_into(buffer[, offset])

将从连接的另一端发送的字节数据的完整消息读入 buffer 中,并返回消息中的字节数。阻塞直到有数据可以接收。如果没有任何数据可以接收并且另一端已关闭,则会引发 EOFError

buffer 必须是可写的 类字节对象。如果给出 offset,则消息将从该位置写入缓冲区。偏移量必须是非负整数,小于 buffer 的长度(以字节为单位)。

如果缓冲区太短,则会引发 BufferTooShort 异常,并且完整消息可作为 e.args[0] 使用,其中 e 是异常实例。

在版本 3.3 中更改: 连接对象本身现在可以使用 Connection.send()Connection.recv() 在进程之间传输。

连接对象现在还支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 返回连接对象,而 __exit__() 调用 close()

例如

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法会自动取消接收数据的腌制,这可能存在安全风险,除非您可以信任发送消息的进程。

因此,除非连接对象是使用 Pipe() 生成的,否则您应该仅在执行某种身份验证后使用 recv()send() 方法。请参阅 身份验证密钥

警告

如果进程在尝试读写管道时被杀死,则管道中的数据可能会损坏,因为可能无法确定消息边界在哪里。

同步原语

通常,同步原语在多进程程序中不像在多线程程序中那样必要。请参阅 threading 模块的文档。

请注意,还可以使用管理器对象创建同步原语 - 请参阅 管理器

class multiprocessing.Barrier(parties[, action[, timeout]])

屏障对象:threading.Barrier 的克隆。

在 3.3 版本中添加。

class multiprocessing.BoundedSemaphore([value])

有界信号量对象:threading.BoundedSemaphore 的近似模拟。

与它的近似模拟相比,只有一个区别:它的 acquire 方法的第一个参数名为 block,与 Lock.acquire() 一致。

注意

在 macOS 上,它与 Semaphore 没有区别,因为该平台上没有实现 sem_getvalue()

class multiprocessing.Condition([lock])

条件变量:threading.Condition 的别名。

如果指定了 lock,则它应该是来自 multiprocessingLockRLock 对象。

在版本 3.3 中更改: 添加了 wait_for() 方法。

class multiprocessing.Event

threading.Event 的克隆。

class multiprocessing.Lock

一个非递归锁对象:与 threading.Lock 非常类似。一旦一个进程或线程获取了锁,随后任何进程或线程尝试获取它都会被阻塞,直到它被释放;任何进程或线程都可以释放它。 threading.Lock 在线程中适用的概念和行为在 multiprocessing.Lock 中被复制,它适用于进程或线程,除非另有说明。

请注意,Lock 实际上是一个工厂函数,它返回一个 multiprocessing.synchronize.Lock 实例,并使用默认上下文进行初始化。

Lock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

block 参数设置为 True(默认值)时,方法调用将阻塞,直到锁处于解锁状态,然后将其设置为锁定并返回 True。请注意,第一个参数的名称与 threading.Lock.acquire() 中的参数名称不同。

block 参数设置为 False 时,方法调用不会阻塞。如果锁当前处于锁定状态,则返回 False;否则将锁设置为锁定状态并返回 True

当使用 timeout 的正浮点值调用时,在最多由 timeout 指定的秒数内阻塞,只要无法获取锁。使用 timeout 的负值调用等效于 timeout 为零。使用 timeout 值为 None(默认值)的调用将超时时间设置为无限。请注意,对 timeout 的负值或 None 值的处理与 threading.Lock.acquire() 中的实现行为不同。如果 block 参数设置为 False,则 timeout 参数没有实际意义,因此被忽略。如果锁已被获取,则返回 True,如果超时时间已过,则返回 False

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是从最初获取锁的进程或线程调用。

行为与 threading.Lock.release() 中的行为相同,只是当在解锁的锁上调用时,会引发 ValueError

class multiprocessing.RLock

一个递归锁对象:与 threading.RLock 非常类似。递归锁必须由获取它的进程或线程释放。一旦一个进程或线程获取了递归锁,同一个进程或线程可以再次获取它,而不会被阻塞;该进程或线程必须为每次获取释放它一次。

请注意,RLock 实际上是一个工厂函数,它返回一个 multiprocessing.synchronize.RLock 实例,并使用默认上下文进行初始化。

RLock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

当使用 block 参数设置为 True 调用时,阻塞,直到锁处于解锁状态(没有被任何进程或线程拥有),除非锁已经被当前进程或线程拥有。然后,当前进程或线程将拥有锁(如果它还没有拥有),并且锁内的递归级别增加一,导致返回值为 True。请注意,与 threading.RLock.acquire() 的实现相比,第一个参数的行为存在一些差异,从参数本身的名称开始。

当使用 block 参数设置为 False 调用时,不要阻塞。如果锁已经被另一个进程或线程获取(因此被拥有),则当前进程或线程不会拥有锁,并且锁内的递归级别不会改变,导致返回值为 False。如果锁处于解锁状态,则当前进程或线程将拥有锁,并且递归级别增加,导致返回值为 True

timeout 参数的使用和行为与 Lock.acquire() 中相同。请注意,timeout 的一些行为与 threading.RLock.acquire() 中的实现行为不同。

release()

释放锁,递减递归级别。如果递减后递归级别为零,则将锁重置为解锁状态(不受任何进程或线程拥有),如果任何其他进程或线程被阻塞等待锁解锁,则允许其中一个进程或线程继续执行。如果递减后递归级别仍然不为零,则锁保持锁定状态,并由调用进程或线程拥有。

仅当调用进程或线程拥有锁时才调用此方法。如果此方法由除所有者以外的进程或线程调用,或者锁处于解锁(未拥有)状态,则会引发 AssertionError。请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中的实现行为不同。

class multiprocessing.Semaphore([value])

信号量对象:与 threading.Semaphore 非常类似。

与它的近似模拟相比,只有一个区别:它的 acquire 方法的第一个参数名为 block,与 Lock.acquire() 一致。

注意

在 macOS 上,sem_timedwait 不受支持,因此使用超时调用 acquire() 将使用睡眠循环来模拟该函数的行为。

注意

如果主线程被 BoundedSemaphore.acquire()Lock.acquire()RLock.acquire()Semaphore.acquire()Condition.acquire()Condition.wait() 调用阻塞时,Ctrl-C 生成的 SIGINT 信号到达,则调用将立即中断,并引发 KeyboardInterrupt

这与 threading 的行为不同,在 threading 中,SIGINT 将在等效的阻塞调用正在进行时被忽略。

注意

此包的一些功能需要主机操作系统上正常工作的共享信号量实现。如果没有,multiprocessing.synchronize 模块将被禁用,尝试导入它将导致 ImportError。有关更多信息,请参见 bpo-3770

共享 ctypes 对象

可以使用共享内存创建共享对象,这些对象可以被子进程继承。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存分配的 ctypes 对象。默认情况下,返回值实际上是对象的同步包装器。可以通过 Valuevalue 属性访问对象本身。

typecode_or_type 确定返回对象的类型:它要么是 ctypes 类型,要么是 array 模块使用的单字符类型代码。*args 传递给类型的构造函数。

如果 lockTrue(默认值),则会创建一个新的递归锁对象来同步对值的访问。如果 lockLockRLock 对象,则它将用于同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动由锁保护,因此它不一定是“进程安全的”。

+= 这样的涉及读写操作不是原子的。因此,例如,如果您想原子地增加共享值,仅仅这样做是不够的

counter.value += 1

假设关联的锁是递归的(默认情况下是递归的),您可以改为执行

with counter.get_lock():
    counter.value += 1

请注意,lock 是一个关键字参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的 ctypes 数组。默认情况下,返回值实际上是数组的同步包装器。

typecode_or_type 确定返回数组元素的类型:它要么是 ctypes 类型,要么是 array 模块使用的单字符类型代码。如果 size_or_initializer 是一个整数,则它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer 是一个序列,用于初始化数组,其长度决定数组的长度。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则它将用于同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动由锁保护,因此它不一定是“进程安全的”。

请注意,lock 是一个关键字参数。

请注意,ctypes.c_char 的数组具有 valueraw 属性,允许使用它们来存储和检索字符串。

multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供用于从共享内存分配 ctypes 对象的函数,这些对象可以被子进程继承。

注意

虽然可以在共享内存中存储指针,但请记住,这将引用特定进程地址空间中的一个位置。但是,指针很可能在第二个进程的上下文中无效,尝试从第二个进程取消引用指针可能会导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的 ctypes 数组。

typecode_or_type 确定返回数组元素的类型:它要么是 ctypes 类型,要么是 array 模块使用的单字符类型代码。如果 size_or_initializer 是一个整数,那么它决定数组的长度,并且数组将被初始化为零。否则 size_or_initializer 是一个用于初始化数组的序列,其长度决定数组的长度。

请注意,设置和获取元素可能是非原子的 - 使用 Array() 来确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的 ctypes 对象。

typecode_or_type 确定返回对象的类型:它要么是 ctypes 类型,要么是 array 模块使用的单字符类型代码。*args 传递给类型的构造函数。

请注意,设置和获取值可能是非原子的 - 使用 Value() 来确保使用锁自动同步访问。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许使用它来存储和检索字符串 - 请参阅 ctypes 的文档。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,只是根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始 ctypes 数组。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则它将用于同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动由锁保护,因此它不一定是“进程安全的”。

请注意,lock 是一个关键字参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,只是根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始 ctypes 对象。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则它将用于同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动由锁保护,因此它不一定是“进程安全的”。

请注意,lock 是一个关键字参数。

multiprocessing.sharedctypes.copy(obj)

返回从共享内存分配的 ctypes 对象,它是 ctypes 对象 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回 ctypes 对象的进程安全包装器对象,它使用 lock 来同步访问。如果 lockNone(默认值),则会自动创建一个 multiprocessing.RLock 对象。

除了它包装的对象之外,同步包装器还将具有两个方法:get_obj() 返回包装的对象,而 get_lock() 返回用于同步的锁对象。

请注意,通过包装器访问 ctypes 对象可能比访问原始 ctypes 对象慢得多。

在版本 3.5 中更改: 同步对象支持 上下文管理器 协议。

下表比较了从共享内存创建共享 ctypes 对象的语法与正常的 ctypes 语法。(在表中,MyStructctypes.Structure 的某个子类。)

ctypes

使用类型的 sharedctypes

使用类型代码的 sharedctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

以下是一个示例,其中多个 ctypes 对象由子进程修改

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一种创建数据的方法,这些数据可以在不同的进程之间共享,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制一个服务器进程,该进程管理共享对象。其他进程可以使用代理访问共享对象。

multiprocessing.Manager()

返回一个已启动的 SyncManager 对象,可用于在进程之间共享对象。返回的管理器对象对应于一个生成的子进程,并且具有用于创建共享对象并返回相应代理的方法。

管理器进程将在它们被垃圾回收或它们的父进程退出时立即关闭。管理器类在 multiprocessing.managers 模块中定义

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

创建一个 BaseManager 对象。

创建后,应调用 start()get_server().serve_forever() 以确保管理器对象引用已启动的管理器进程。

address 是管理器进程监听新连接的地址。如果 addressNone,则会选择一个任意的地址。

authkey 是用于检查传入连接到服务器进程的有效性的身份验证密钥。如果 authkeyNone,则使用 current_process().authkey。否则使用 authkey,它必须是字节字符串。

serializer 必须是 'pickle'(使用 pickle 序列化)或 'xmlrpclib'(使用 xmlrpc.client 序列化)。

ctx 是一个上下文对象,或 None(使用当前上下文)。请参阅 get_context() 函数。

shutdown_timeout 是一个以秒为单位的超时时间,用于等待管理器使用的进程在 shutdown() 方法中完成。如果关闭超时,则进程将被终止。如果终止进程也超时,则进程将被杀死。

在 3.11 版本中更改: 添加了 shutdown_timeout 参数。

start([initializer[, initargs]])

启动一个子进程来启动管理器。如果 initializer 不是 None,则子进程将在启动时调用 initializer(*initargs)

get_server()

返回一个 Server 对象,该对象表示管理器控制下的实际服务器。 Server 对象支持 serve_forever() 方法

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 另外还有一个 address 属性。

connect()

将本地管理器对象连接到远程管理器进程

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的进程。这只有在使用 start() 启动服务器进程后才可用。

这可以多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

一个类方法,可用于将类型或可调用对象注册到管理器类。

typeid 是一个“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。

callable 是一个可调用对象,用于为该类型标识符创建对象。如果管理器实例将使用 connect() 方法连接到服务器,或者如果 create_method 参数为 False,则可以将其保留为 None

proxytypeBaseProxy 的一个子类,用于为具有此 typeid 的共享对象创建代理。如果为 None,则会自动创建一个代理类。

exposed 用于指定代理应允许使用 BaseProxy._callmethod() 访问的此 typeid 的方法名称序列。(如果 exposedNone,则如果存在,则使用 proxytype._exposed_。)在没有指定公开列表的情况下,共享对象的所有“公共方法”都将可访问。(这里“公共方法”是指任何具有 __call__() 方法且名称不以 '_' 开头的属性。)

method_to_typeid 是一个映射,用于指定应返回代理的那些公开方法的返回类型。它将方法名称映射到 typeid 字符串。(如果 method_to_typeidNone,则如果存在,则使用 proxytype._method_to_typeid_。)如果方法的名称不是此映射的键,或者如果映射为 None,则方法返回的对象将按值复制。

create_method 确定是否应使用名称 typeid 创建一个方法,该方法可用于告诉服务器进程创建新的共享对象并返回其代理。默认情况下,它是 True

BaseManager 实例还有一个只读属性

address

管理器使用的地址。

在 3.3 版本中更改: Manager 对象支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 启动服务器进程(如果尚未启动),然后返回管理器对象。 __exit__() 调用 shutdown()

在以前的版本中,如果 __enter__() 尚未启动,则不会启动管理器的服务器进程。

class multiprocessing.managers.SyncManager

一个 BaseManager 的子类,可用于进程同步。此类型的对象由 multiprocessing.Manager() 返回。

它的方法创建并返回 代理对象,用于跨进程同步许多常用数据类型。这尤其包括共享列表和字典。

Barrier(parties[, action[, timeout]])

创建一个共享的 threading.Barrier 对象并返回其代理。

在 3.3 版本中添加。

BoundedSemaphore([value])

创建一个共享的 threading.BoundedSemaphore 对象并返回其代理。

Condition([lock])

创建一个共享的 threading.Condition 对象并返回它的代理。

如果提供了 lock,那么它应该是一个 threading.Lockthreading.RLock 对象的代理。

在版本 3.3 中更改: 添加了 wait_for() 方法。

Event()

创建一个共享的 threading.Event 对象并返回它的代理。

Lock()

创建一个共享的 threading.Lock 对象并返回它的代理。

Namespace()

创建一个共享的 Namespace 对象并返回它的代理。

Queue([maxsize])

创建一个共享的 queue.Queue 对象并返回它的代理。

RLock()

创建一个共享的 threading.RLock 对象并返回它的代理。

Semaphore([value])

创建一个共享的 threading.Semaphore 对象并返回它的代理。

Array(typecode, sequence)

创建一个数组并返回它的代理。

Value(typecode, value)

创建一个具有可写 value 属性的对象并返回它的代理。

dict()
dict(mapping)
dict(sequence)

创建一个共享的 dict 对象并返回它的代理。

list()
list(sequence)

创建一个共享的 list 对象并返回它的代理。

Changed in version 3.6: 共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,所有这些对象都将由 SyncManager 管理和同步。

class multiprocessing.managers.Namespace

可以注册到 SyncManager 的类型。

命名空间对象没有公共方法,但具有可写属性。它的表示显示了其属性的值。

但是,当使用命名空间对象的代理时,以 '_' 开头的属性将是代理的属性,而不是被引用对象的属性。

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

自定义管理器

要创建自己的管理器,需要创建一个 BaseManager 的子类,并使用 register() 类方法将新类型或可调用对象注册到管理器类中。例如

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用远程管理器

可以在一台机器上运行管理器服务器,让其他机器上的客户端使用它(假设涉及的防火墙允许这样做)。

运行以下命令会为单个共享队列创建一个服务器,远程客户端可以访问它

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以按如下方式访问服务器

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用上面客户端代码远程访问它

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理对象

代理是一个对象,它引用一个共享对象,该对象(可能)存在于不同的进程中。共享对象被称为代理的被引用对象。多个代理对象可能具有相同的被引用对象。

代理对象具有调用其被引用对象对应方法的方法(尽管并非被引用对象的每个方法都一定可以通过代理使用)。这样,代理就可以像其被引用对象一样使用

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

请注意,将 str() 应用于代理将返回被引用对象的表示,而将 repr() 应用于代理将返回代理的表示。

代理对象的一个重要特性是它们是可腌制的,因此它们可以在进程之间传递。因此,被引用对象可以包含 代理对象。这允许嵌套这些管理的列表、字典和其他 代理对象

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

类似地,字典和列表代理可以相互嵌套

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果被引用对象中包含标准(非代理)listdict 对象,对这些可变值的修改不会通过管理器传播,因为代理无法知道何时修改了其中包含的值。但是,将值存储在容器代理中(这会在代理对象上触发 __setitem__)确实会通过管理器传播,因此要有效地修改此类项,可以将修改后的值重新分配给容器代理

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

这种方法可能不如在大多数用例中使用嵌套的 代理对象 方便,但也展示了对同步的控制级别。

注意

multiprocessing 中的代理类型不支持按值比较。因此,例如,我们有

>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用被代理对象的副本。

class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy 子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理对象的被代理对象的某个方法的结果。

如果 proxy 是一个代理,其被代理对象是 obj,那么表达式

proxy._callmethod(methodname, args, kwds)

将计算表达式

getattr(obj, methodname)(*args, **kwds)

在管理器进程中。

返回值将是调用结果的副本或指向新共享对象的代理 - 请参阅 BaseManager.register()method_to_typeid 参数的文档。

如果调用引发异常,则由 _callmethod() 重新引发。如果在管理器进程中引发了其他异常,则将其转换为 RemoteError 异常,并由 _callmethod() 引发。

特别要注意,如果 methodname 未被 公开,则会引发异常。

一个使用 _callmethod() 的示例

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回被代理对象的副本。

如果被代理对象不可序列化,则会引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回被代理对象的表示。

清理

代理对象使用弱引用回调,以便在它被垃圾回收时,它会从拥有其被代理对象的管理器中注销自己。

当不再有代理引用共享对象时,该共享对象将从管理器进程中删除。

进程池

可以使用 Pool 类创建一个进程池,该进程池将执行提交给它的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,它控制一个工作进程池,可以向其提交作业。它支持带有超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工作进程数。如果 processesNone,则使用 os.cpu_count() 返回的数字。

如果 initializer 不为 None,则每个工作进程在启动时都会调用 initializer(*initargs)

maxtasksperchild 是一个工作进程在退出并被新的工作进程替换之前可以完成的任务数,以使未使用的资源能够被释放。默认的 maxtasksperchildNone,这意味着工作进程将与池一样长久。

context 可用于指定用于启动工作进程的上下文。通常,池是使用函数 multiprocessing.Pool() 或上下文对象的 Pool() 方法创建的。在这两种情况下,context 都被适当地设置。

请注意,池对象的这些方法只能由创建池的进程调用。

警告

multiprocessing.pool 对象具有需要通过将池用作上下文管理器或手动调用 close()terminate() 来正确管理的内部资源(与任何其他资源一样)。如果未能执行此操作,会导致进程在最终化时挂起。

请注意,不正确的是依赖垃圾收集器来销毁池,因为 CPython 不保证会调用池的终结器(有关更多信息,请参阅 object.__del__())。

在版本 3.2 中更改: 添加了 maxtasksperchild 参数。

在版本 3.4 中更改: 添加了 context 参数。

注意

Pool 中的工作进程通常在池的工作队列的整个持续时间内保持活动状态。在其他系统(如 Apache、mod_wsgi 等)中发现的一种常见模式是允许池中的工作进程仅完成一定数量的工作,然后退出,清理,并生成一个新的进程来替换旧的进程。Poolmaxtasksperchild 参数将此功能公开给最终用户。

apply(func[, args[, kwds]])

使用参数 args 和关键字参数 kwds 调用 func。它会阻塞,直到结果准备就绪。鉴于此阻塞,apply_async() 更适合于并行执行工作。此外,func 仅在池中的一个工作进程中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() 方法的一个变体,它返回一个 AsyncResult 对象。

如果指定了callback,则它应该是一个可调用对象,它接受一个参数。当结果准备就绪时,callback 将应用于它,除非调用失败,在这种情况下,error_callback 将被应用。

如果指定了error_callback,则它应该是一个可调用对象,它接受一个参数。如果目标函数失败,则error_callback 将使用异常实例调用。

回调应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

内置函数 map() 的并行等价物(它只支持一个iterable 参数,对于多个可迭代对象,请参见 starmap())。它会阻塞直到结果准备就绪。

此方法将可迭代对象分成多个块,并将这些块作为单独的任务提交给进程池。可以通过将chunksize 设置为正整数来指定这些块的(近似)大小。

请注意,对于非常长的可迭代对象,它可能会导致高内存使用率。考虑使用 imap()imap_unordered() 以及显式的chunksize 选项以提高效率。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 方法的一个变体,它返回一个 AsyncResult 对象。

如果指定了callback,则它应该是一个可调用对象,它接受一个参数。当结果准备就绪时,callback 将应用于它,除非调用失败,在这种情况下,error_callback 将被应用。

如果指定了error_callback,则它应该是一个可调用对象,它接受一个参数。如果目标函数失败,则error_callback 将使用异常实例调用。

回调应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

map() 的更懒惰版本。

chunksize 参数与 map() 方法中使用的参数相同。对于非常长的可迭代对象,使用较大的chunksize 值可以使作业完成的速度比使用默认值 1 快得多。

此外,如果chunksize1,则 imap() 方法返回的迭代器的 next() 方法有一个可选的timeout 参数:next(timeout) 如果结果无法在timeout 秒内返回,则会引发 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,只是返回的迭代器中结果的顺序应被视为任意的。(只有当只有一个工作进程时,顺序才保证是“正确的”。)

starmap(func, iterable[, chunksize])

map() 相同,只是iterable 的元素预计是可迭代对象,这些可迭代对象将被解包为参数。

因此,[(1,2), (3, 4)]iterable 会导致 [func(1,2), func(3,4)]

在 3.3 版本中添加。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() 的组合,它迭代iterable 的可迭代对象,并使用解包的可迭代对象调用func。返回一个结果对象。

在 3.3 版本中添加。

close()

阻止向池中提交更多任务。一旦所有任务都已完成,工作进程将退出。

terminate()

立即停止工作进程,而不完成未完成的工作。当池对象被垃圾回收时,terminate() 将立即被调用。

join()

等待工作进程退出。必须在使用 join() 之前调用 close()terminate()

在 3.3 版中更改: Pool 对象现在支持上下文管理协议 - 请参见 上下文管理器类型__enter__() 返回池对象,而 __exit__() 调用 terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() 返回的结果的类。

get([timeout])

结果到达时返回结果。如果timeout不为None,并且结果未在timeout秒内到达,则会引发multiprocessing.TimeoutError。如果远程调用引发异常,则该异常将由get()重新引发。

wait([timeout])

等待结果可用或timeout秒过去。

ready()

返回调用是否已完成。

successful()

返回调用是否在不引发异常的情况下完成。如果结果未准备好,则会引发ValueError

在 3.7 版中更改: 如果结果未准备好,则会引发ValueError,而不是AssertionError

以下示例演示了池的使用

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

监听器和客户端

通常,进程之间的消息传递是使用队列或使用Connection对象(由Pipe()返回)完成的。

但是,multiprocessing.connection模块允许一些额外的灵活性。它基本上为处理套接字或 Windows 命名管道提供了一个高级消息面向 API。它还支持使用hmac模块进行摘要身份验证,以及同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

向连接的另一端发送随机生成的消息,并等待回复。

如果回复与使用authkey作为密钥的消息摘要匹配,则会向连接的另一端发送欢迎消息。否则,将引发AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用authkey作为密钥计算消息摘要,然后将摘要发送回去。

如果没有收到欢迎消息,则会引发AuthenticationError

multiprocessing.connection.Client(address[, family[, authkey]])

尝试建立与使用地址address的监听器之间的连接,返回一个Connection

连接的类型由family参数确定,但通常可以省略,因为它通常可以从address的格式推断出来。(参见地址格式

如果给出authkey且不为None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证挑战的密钥。如果authkeyNone,则不会进行身份验证。如果身份验证失败,则会引发AuthenticationError。参见身份验证密钥

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

绑定套接字或 Windows 命名管道的包装器,它正在“监听”连接。

address是监听器对象绑定的套接字或命名管道使用的地址。

注意

如果使用“0.0.0.0”地址,则该地址在 Windows 上不会是可连接的端点。如果您需要可连接的端点,则应使用“127.0.0.1”。

family是要使用的套接字(或命名管道)的类型。它可以是以下字符串之一:'AF_INET'(对于 TCP 套接字)、'AF_UNIX'(对于 Unix 域套接字)或'AF_PIPE'(对于 Windows 命名管道)。其中只有第一个保证可用。如果familyNone,则从address的格式推断出该族。如果address也为None,则选择默认值。此默认值是假定为最快可用的族。参见地址格式。请注意,如果family'AF_UNIX'且地址为None,则套接字将在使用tempfile.mkstemp()创建的私有临时目录中创建。

如果监听器对象使用套接字,则backlog(默认值为 1)将传递给套接字的listen()方法,一旦它被绑定。

如果给出authkey且不为None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证挑战的密钥。如果authkeyNone,则不会进行身份验证。如果身份验证失败,则会引发AuthenticationError。参见身份验证密钥

accept()

在监听器对象的绑定套接字或命名管道上接受连接,并返回一个Connection对象。如果尝试身份验证但失败,则会引发AuthenticationError

close()

关闭监听器对象的绑定套接字或命名管道。当监听器被垃圾回收时,会自动调用它。但是,建议显式调用它。

监听器对象具有以下只读属性

address

监听器对象正在使用的地址。

last_accepted

最后一个接受的连接来自的地址。如果不可用,则为None

版本 3.3 中的变更: 监听器对象现在支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 返回监听器对象,而 __exit__() 调用 close().

multiprocessing.connection.wait(object_list, timeout=None)

等待直到 *object_list* 中的对象准备好。返回 *object_list* 中已准备好的对象的列表。如果 *timeout* 是一个浮点数,则调用最多阻塞该秒数。如果 *timeout* 是 None,则它将无限期阻塞。负超时等效于零超时。

对于 POSIX 和 Windows,如果对象是,则它可以出现在 *object_list* 中

当有数据可供从连接或套接字对象读取,或者另一端已关闭时,连接或套接字对象已准备好。

POSIX: wait(object_list, timeout) 几乎等效于 select.select(object_list, [], [], timeout)。区别在于,如果 select.select() 被信号中断,它可能会引发 OSError,错误号为 EINTR,而 wait() 不会。

Windows: *object_list* 中的项目必须是可等待的整数句柄(根据 Win32 函数 WaitForMultipleObjects() 文档中使用的定义),或者它可以是具有 fileno() 方法的对象,该方法返回套接字句柄或管道句柄。(请注意,管道句柄和套接字句柄不是可等待的句柄。)

在 3.3 版本中添加。

示例

以下服务器代码创建一个监听器,该监听器使用 'secret password' 作为身份验证密钥。然后它等待连接并向客户端发送一些数据

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用 wait() 来等待来自多个进程的消息

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 一个 'AF_INET' 地址是一个元组,形式为 (hostname, port),其中 *hostname* 是一个字符串,*port* 是一个整数。

  • 一个 'AF_UNIX' 地址是一个字符串,表示文件系统上的文件名。

  • 一个 'AF_PIPE' 地址是一个字符串,形式为 r'\\.\pipe\PipeName'。要使用 Client() 连接到名为 *ServerName* 的远程计算机上的命名管道,应使用形式为 r'\\ServerName\pipe\PipeName' 的地址。

请注意,默认情况下,任何以两个反斜杠开头的字符串都被假定为 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

身份验证密钥

当使用 Connection.recv 时,接收到的数据会自动解封。不幸的是,从不受信任的来源解封数据存在安全风险。因此,ListenerClient() 使用 hmac 模块来提供摘要身份验证。

身份验证密钥是一个字节字符串,可以看作是密码:一旦建立连接,两端都会要求证明对方知道身份验证密钥。(证明两端使用的是同一个密钥不涉及通过连接发送密钥。)

如果请求身份验证但未指定身份验证密钥,则使用 current_process().authkey 的返回值(请参阅 Process)。此值将由当前进程创建的任何 Process 对象自动继承。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该密钥可以在设置进程之间的连接时使用。

也可以使用 os.urandom() 生成合适的身份验证密钥。

日志记录

提供了一些日志记录支持。但是,请注意,logging 包不使用进程共享锁,因此有可能(取决于处理程序类型)来自不同进程的消息会混在一起。

multiprocessing.get_logger()

返回 multiprocessing 使用的日志记录器。如有必要,将创建一个新的日志记录器。

首次创建时,日志记录器级别为 logging.NOTSET 且没有默认处理程序。默认情况下,发送到此日志记录器的消息不会传播到根日志记录器。

请注意,在 Windows 上,子进程只会继承父进程日志记录器的级别 - 日志记录器的任何其他自定义设置都不会被继承。

multiprocessing.log_to_stderr(level=None)

此函数调用 get_logger(),但除了返回由 get_logger 创建的日志记录器外,它还添加了一个处理程序,该处理程序使用格式 '[%(levelname)s/%(processName)s] %(message)s' 将输出发送到 sys.stderr。您可以通过传递 level 参数来修改日志记录器的 levelname

以下是启用日志记录的示例会话

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表格,请参阅 logging 模块。

multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,但它只是 threading 模块的包装器。

特别是,multiprocessing.dummy 提供的 Pool 函数返回 ThreadPool 的实例,它是 Pool 的子类,支持所有相同的函数调用,但使用的是工作线程池而不是工作进程池。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

一个线程池对象,它控制一个工作线程池,作业可以提交到该池。 ThreadPool 实例与 Pool 实例完全兼容,它们的资源也必须得到适当管理,要么使用池作为上下文管理器,要么通过调用 close()terminate() 手动管理。

processes 是要使用的工作线程数。如果 processesNone,则使用 os.cpu_count() 返回的值。

如果 initializer 不为 None,则每个工作进程在启动时都会调用 initializer(*initargs)

Pool 不同,maxtasksperchildcontext 无法提供。

注意

一个 ThreadPoolPool 共享相同的接口,该接口是围绕进程池设计的,早于 concurrent.futures 模块的引入。因此,它继承了一些对由线程支持的池没有意义的操作,并且它有自己的类型来表示异步作业的状态,AsyncResult,其他库不理解它。

用户通常应该更喜欢使用 concurrent.futures.ThreadPoolExecutor,它有一个更简单的接口,从一开始就围绕线程设计,并且返回 concurrent.futures.Future 实例,这些实例与许多其他库兼容,包括 asyncio

编程指南

使用 multiprocessing 时,应该遵守某些指南和习惯用法。

所有启动方法

以下适用于所有启动方法。

避免共享状态

尽可能地,应该尝试避免在进程之间传递大量数据。

最好坚持使用队列或管道进行进程间通信,而不是使用更低级别的同步原语。

可腌制性

确保代理方法的参数是可腌制的。

代理的线程安全性

除非使用锁保护代理对象,否则不要从多个线程使用代理对象。

(多个进程使用同一个代理永远不会有问题。)

加入僵尸进程

在 POSIX 上,当一个进程完成但尚未加入时,它会变成僵尸进程。永远不应该有太多僵尸进程,因为每次启动新进程时(或调用 active_children() 时),所有已完成但尚未加入的进程都会被加入。此外,调用已完成进程的 Process.is_alive 会加入该进程。即使如此,最好还是显式地加入你启动的所有进程。

最好继承而不是腌制/解腌制

使用 spawnforkserver 启动方法时,来自 multiprocessing 的许多类型都需要是可腌制的,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,你应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

避免终止进程

使用 Process.terminate() 方法停止进程可能会导致进程当前正在使用的任何共享资源(如锁、信号量、管道和队列)损坏或无法供其他进程使用。

因此,最好只考虑对从未使用任何共享资源的进程使用 Process.terminate()

加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都被“馈送”线程馈送到底层管道。(子进程可以调用队列的 Queue.cancel_join_thread() 方法来避免这种行为。)

这意味着,无论何时使用队列,都需要确保在加入进程之前,所有已放入队列的项目最终都会被移除。否则,你无法确定已将项目放入队列的进程是否会终止。还要记住,非守护进程会自动加入。

以下是一个会死锁的示例

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的解决方法是交换最后两行(或者干脆删除 p.join() 行)。

显式地将资源传递给子进程

在 POSIX 上使用 fork 启动方法,子进程可以使用父进程中创建的共享资源,使用全局资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 和其他启动方法兼容之外,这也确保只要子进程还活着,对象就不会在父进程中被垃圾回收。如果在父进程中对象被垃圾回收时释放了一些资源,这可能很重要。

例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应该改写为

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

注意不要用“类似文件对象”替换 sys.stdin

multiprocessing 最初在 multiprocessing.Process._bootstrap() 方法中无条件地调用

os.close(sys.stdin.fileno())

这导致了进程在进程中的问题。这已更改为

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

这解决了进程相互冲突导致错误文件描述符错误的基本问题,但为用“类似文件对象”替换 sys.stdin() 的应用程序带来了潜在的危险,该对象具有输出缓冲。这种危险是,如果多个进程在该类似文件对象上调用 close(),可能会导致相同的数据多次刷新到该对象,从而导致损坏。

如果你编写一个类似文件对象并实现自己的缓存,你可以通过在将数据追加到缓存时存储 pid 并当 pid 发生变化时丢弃缓存来使其成为 fork 安全的。例如

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参阅 bpo-5155bpo-5313bpo-5331

spawnforkserver 启动方法

有一些额外的限制不适用于 fork 启动方法。

更多可腌制性

确保 Process.__init__() 的所有参数都是可腌制的。此外,如果你子类化 Process,那么确保在调用 Process.start 方法时实例是可腌制的。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的(如果有)值可能与调用 Process.start 时父进程中的值不同。

但是,仅仅是模块级常量的全局变量不会造成任何问题。

安全导入主模块

确保主模块可以被新的 Python 解释器安全地导入,而不会造成意外的副作用(例如启动新进程)。

例如,使用 spawnforkserver 启动方法运行以下模块将导致 RuntimeError 错误。

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

相反,应该使用 if __name__ == '__main__': 来保护程序的“入口点”,如下所示

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程序正常运行而不是被冻结,则可以省略 freeze_support() 行。)

这允许新生成的 Python 解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建了池或管理器,则也会有类似的限制。

示例

演示如何创建和使用自定义管理器和代理

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

一个示例,展示如何使用队列将任务馈送到一组工作进程并收集结果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()