multiprocessing — 基于进程的并行

源代码: Lib/multiprocessing/


可用性: 非 Android、非 iOS、非 WASI。

该模块在移动平台WebAssembly 平台上不受支持。

引言

multiprocessing 是一个支持使用类似于 threading 模块的 API 来创建进程的包。multiprocessing 包同时提供本地和远程并发,通过使用子进程而非线程来有效规避 全局解释器锁。因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它可在 POSIX 和 Windows 上运行。

multiprocessing 模块还引入了在 threading 模块中没有对应项的 API。一个主要的例子是 Pool 对象,它提供了一种便捷的方式,可以在多个输入值上并行执行函数,将输入数据分发到多个进程(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个使用 Pool 的数据并行基本示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

参见

concurrent.futures.ProcessPoolExecutor 提供了一个更高级别的接口,可以将任务推送到后台进程而不会阻塞调用进程的执行。与直接使用 Pool 接口相比,concurrent.futures API 更容易将工作提交到底层进程池与等待结果分开。

Process 类

multiprocessing 中,通过创建一个 Process 对象,然后调用其 start() 方法来生成进程。Process 遵循 threading.Thread 的 API。一个简单的多进程程序示例是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了显示涉及的各个进程 ID,这里有一个扩展示例

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关为什么需要 if __name__ == '__main__' 部分的解释,请参阅 编程指南

Process 的参数通常需要能够从子进程内部进行反序列化。如果您尝试将上述示例直接输入 REPL,则可能导致子进程在尝试在 __main__ 模块中找到 *f* 函数时出现 AttributeError

上下文和启动方法

根据平台,multiprocessing 支持三种启动进程的方式。这些 *启动方法* 是

spawn(生成)

父进程启动一个新的 Python 解释器进程。子进程将只继承运行进程对象的 run() 方法所需的资源。特别是,父进程中不必要的文件描述符和句柄将不会被继承。与使用 *fork* 或 *forkserver* 相比,使用此方法启动进程相当慢。

在 POSIX 和 Windows 平台上可用。在 Windows 和 macOS 上是默认值。

fork(分叉)

父进程使用 os.fork() 来分叉 Python 解释器。子进程开始时,实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地分叉多线程进程存在问题。

在 POSIX 系统上可用。

3.14 版本更改: 这不再是任何平台上的默认启动方法。需要 *fork* 的代码必须通过 get_context()set_start_method() 明确指定。

3.12 版本更改: 如果 Python 能够检测到您的进程有多个线程,此启动方法内部调用的 os.fork() 函数将引发 DeprecationWarning。请使用不同的启动方法。有关进一步解释,请参阅 os.fork() 文档。

forkserver(分叉服务器)

当程序启动并选择 *forkserver* 启动方法时,会生成一个服务器进程。从那时起,每当需要一个新进程时,父进程连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,除非系统库或预加载的导入作为副作用生成线程,因此通常可以安全地使用 os.fork()。不会继承不必要的资源。

在支持通过 Unix 管道传递文件描述符的 POSIX 平台(例如 Linux)上可用。在这些平台上是默认值。

3.14 版本更改: 这成为 POSIX 平台上的默认启动方法。

3.4 版本更改: 在所有 POSIX 平台上添加了 *spawn*,并为某些 POSIX 平台添加了 *forkserver*。子进程不再继承 Windows 上父进程的所有可继承句柄。

3.8 版本更改: 在 macOS 上,*spawn* 启动方法现在是默认值。*fork* 启动方法应被视为不安全,因为它可能导致子进程崩溃,因为 macOS 系统库可能会启动线程。请参阅 bpo-33725

3.14 版本更改: 在 POSIX 平台上,默认启动方法从 *fork* 更改为 *forkserver*,以保持性能并避免常见的多线程进程不兼容性。请参阅 gh-84559

在 POSIX 上使用 *spawn* 或 *forkserver* 启动方法还将启动一个 *资源跟踪器* 进程,该进程跟踪程序进程创建的未链接的命名系统资源(例如命名信号量或 SharedMemory 对象)。当所有进程都退出时,资源跟踪器会取消链接任何剩余的已跟踪对象。通常不应该有,但如果进程被信号杀死,可能会有一些“泄漏”的资源。(无论是泄漏的信号量还是共享内存段都不会在下次重新启动之前自动取消链接。这对这两种对象都存在问题,因为系统只允许有限数量的命名信号量,并且共享内存段占用主内存中的一些空间。)

要选择启动方法,您可以在主模块的 if __name__ == '__main__' 子句中使用 set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 不应在程序中多次使用。

或者,您可以使用 get_context() 获取一个上下文对象。上下文对象与 multiprocessing 模块具有相同的 API,并允许在同一个程序中使用多个启动方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用 *fork* 上下文创建的锁不能传递给使用 *spawn* 或 *forkserver* 启动方法启动的进程。

使用 multiprocessingProcessPoolExecutor 的库应设计为允许其用户提供自己的多进程上下文。在库中使用自己的特定上下文可能导致与库用户应用程序的其余部分不兼容。如果您的库需要特定的启动方法,请务必记录。

警告

在 POSIX 系统上,'spawn''forkserver' 启动方法通常不能与“冻结”的可执行文件(即,由 PyInstallercx_Freeze 等包生成的可执行文件)一起使用。'fork' 启动方法可能有效,如果代码不使用线程。

进程间交换对象

multiprocessing 支持两种进程间通信通道

队列

Queue 类是 queue.Queue 的近乎克隆。例如

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。放入 multiprocessing 队列的任何对象都将被序列化。

管道

Pipe() 函数返回一对由管道连接的连接对象,该管道默认是双工的(双向)。例如

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的两个连接对象代表管道的两端。每个连接对象都有 send()recv() 方法(以及其他)。请注意,如果两个进程(或线程)同时尝试从管道的 *同一* 端读取或写入,则管道中的数据可能会损坏。当然,进程同时使用管道的不同端没有损坏的风险。

send() 方法序列化对象,recv() 重新创建对象。

进程间同步

multiprocessing 包含 threading 中的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用锁,不同进程的输出可能会完全混淆。

进程间共享状态

如上所述,在进行并发编程时,最好尽可能避免使用共享状态。在使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么 multiprocessing 提供了几种实现方式。

共享内存

数据可以使用 ValueArray 存储在共享内存映射中。例如,以下代码

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型代码:'d' 表示双精度浮点数,'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。

服务器进程

Manager() 返回的管理器对象控制一个服务器进程,该进程持有 Python 对象并允许其他进程使用代理操纵它们。

Manager() 返回的管理器将支持 listdictsetNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 等类型。例如,

from multiprocessing import Process, Manager

def f(d, l, s):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
    s.add('a')
    s.add('b')

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        s = manager.set()

        p = Process(target=f, args=(d, l, s))
        p.start()
        p.join()

        print(d)
        print(l)
        print(s)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
{'a', 'b'}

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程之间共享。但是,它们比使用共享内存慢。

使用工作进程池

Pool 类代表一个工作进程池。它有方法可以通过几种不同的方式将任务卸载到工作进程。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,池的方法应仅由创建它的进程使用。

备注

此包中的功能要求子进程可以导入 __main__ 模块。这在 编程指南 中有所介绍,但在此处指出也很重要。这意味着一些示例,例如 multiprocessing.pool.Pool 示例,将无法在交互式解释器中运行。例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(如果您尝试这样做,它实际上会以半随机的方式交错输出三个完整的追溯,然后您可能需要以某种方式停止父进程。)

参考

multiprocessing 包大部分复制了 threading 模块的 API。

Process 和异常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process 对象代表在单独进程中运行的活动。Process 类具有 threading.Thread 所有方法的等价物。

构造函数应始终使用关键字参数调用。*group* 应始终为 None;它仅为了与 threading.Thread 兼容而存在。*target* 是要由 run() 方法调用的可调用对象。它默认为 None,表示不调用任何内容。*name* 是进程名称(有关更多详细信息,请参阅 name)。*args* 是目标调用的参数元组。*kwargs* 是目标调用的关键字参数字典。如果提供,仅关键字参数 *daemon* 将进程 daemon 标志设置为 TrueFalse。如果为 None(默认值),此标志将从创建进程继承。

默认情况下,不向 *target* 传递任何参数。*args* 参数(默认为 ())可用于指定要传递给 *target* 的参数列表或元组。

如果子类重写构造函数,它必须确保在对进程执行任何其他操作之前调用基类构造函数(super().__init__())。

备注

通常,Process 的所有参数都必须是可序列化的。当尝试从 REPL 使用本地定义的 *target* 函数创建 Process 或使用 concurrent.futures.ProcessPoolExecutor 时,经常会观察到这一点。

传递在当前 REPL 会话中定义的可调用对象会导致子进程通过未捕获的 AttributeError 异常终止,因为 *target* 必须在可导入模块中定义才能在反序列化期间加载。

子进程中这种不可捕获的错误示例

>>> import multiprocessing as mp
>>> def knigit():
...     print("Ni!")
...
>>> process = mp.Process(target=knigit)
>>> process.start()
>>> Traceback (most recent call last):
  File ".../multiprocessing/spawn.py", line ..., in spawn_main
  File ".../multiprocessing/spawn.py", line ..., in _main
AttributeError: module '__main__' has no attribute 'knigit'
>>> process
<SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>

请参阅 spawn 和 forkserver 启动方法。尽管如果使用 "fork" 启动方法,此限制不成立,但截至 Python 3.14,它不再是任何平台上的默认值。请参阅 上下文和启动方法。另请参阅 gh-132898

3.3 版本更改: 添加了 *daemon* 参数。

run()

表示进程活动的 方法。

您可以在子类中重写此方法。标准的 run() 方法会调用传递给对象构造函数作为 *target* 参数的可调用对象(如果有),并使用 *args* 和 *kwargs* 参数中的顺序参数和关键字参数。

将列表或元组作为传递给 Process 的 *args* 参数会达到相同的效果。

示例

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

开始进程的活动。

每个进程对象最多只能调用一次此方法。它会安排对象的 run() 方法在单独的进程中被调用。

join([timeout])

如果可选参数 *timeout* 为 None(默认值),则该方法会阻塞,直到调用其 join() 方法的进程终止。如果 *timeout* 是正数,它最多阻塞 *timeout* 秒。请注意,如果其进程终止或方法超时,该方法将返回 None。检查进程的 exitcode 以确定它是否终止。

一个进程可以被多次连接。

一个进程不能连接自身,因为这会导致死锁。在进程启动之前尝试连接进程是错误的。

name

进程的名称。名称是仅用于标识目的的字符串。它没有语义。多个进程可以被赋予相同的名称。

初始名称由构造函数设置。如果未向构造函数提供显式名称,则会构造一个形式为“Process-N1:N2:…:Nk”的名称,其中每个 Nk 是其父进程的第 N 个子进程。

is_alive()

返回进程是否存活。

大致来说,一个进程对象从 start() 方法返回的那一刻起,直到子进程终止,都是存活的。

daemon

进程的守护进程标志,一个布尔值。这必须在调用 start() 之前设置。

初始值从创建进程继承。

当一个进程退出时,它会尝试终止所有其守护进程子进程。

请注意,守护进程不允许创建子进程。否则,如果守护进程在其父进程退出时被终止,它将使其子进程成为孤儿。此外,这些 不是 Unix 守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是连接)。

除了 threading.Thread API 之外,Process 对象还支持以下属性和方法

pid

返回进程 ID。在进程生成之前,这将是 None

exitcode

子进程的退出代码。如果进程尚未终止,则此值为 None

如果子进程的 run() 方法正常返回,退出代码将为 0。如果它通过 sys.exit() 并带有一个整数参数 N 终止,退出代码将为 N

如果子进程由于 run() 中未捕获的异常而终止,退出代码将为 1。如果它被信号 N 终止,退出代码将为负值 -N

authkey

进程的认证密钥(一个字节字符串)。

multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。

当创建 Process 对象时,它将继承其父进程的认证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改此密钥。

参见 认证密钥

sentinel

一个系统对象的数字句柄,当进程结束时,该对象将变为“就绪”。

如果您想使用 multiprocessing.connection.wait() 同时等待多个事件,则可以使用此值。否则,调用 join() 更简单。

在 Windows 上,这是一个可与 WaitForSingleObjectWaitForMultipleObjects 系列 API 调用一起使用的操作系统句柄。在 POSIX 上,这是一个文件描述符,可与 select 模块中的原语一起使用。

在 3.3 版本加入。

interrupt()

终止进程。在 POSIX 上使用 SIGINT 信号。Windows 上的行为未定义。

默认情况下,这通过引发 KeyboardInterrupt 来终止子进程。可以通过在子进程中为 SIGINT 设置相应的信号处理程序 signal.signal() 来改变此行为。

注意:如果子进程捕获并丢弃 KeyboardInterrupt,则进程将不会被终止。

注意:默认行为还会将 exitcode 设置为 1,就像在子进程中引发了未捕获的异常一样。要获得不同的 exitcode,您可以简单地捕获 KeyboardInterrupt 并调用 exit(your_code)

在 3.14 版本加入。

terminate()

终止进程。在 POSIX 上使用 SIGTERM 信号完成;在 Windows 上使用 TerminateProcess()。请注意,退出处理程序和 finally 子句等将不会执行。

请注意,进程的后代进程将不会被终止——它们将简单地成为孤儿进程。

警告

如果当关联进程正在使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。同样,如果进程已获取锁或信号量等,则终止它可能会导致其他进程死锁。

kill()

terminate() 相同,但在 POSIX 上使用 SIGKILL 信号。

在 3.7 版本加入。

close()

关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError。一旦 close() 成功返回,Process 对象的其他大多数方法和属性都将引发 ValueError

在 3.7 版本加入。

请注意,start()join()is_alive()terminate()exitcode 方法只能由创建进程对象的进程调用。

Process 的一些方法的使用示例

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 异常的基类。

exception multiprocessing.BufferTooShort

当提供的缓冲区对象对于读取的消息来说太小时,由 Connection.recv_bytes_into() 引发的异常。

如果 eBufferTooShort 的实例,则 e.args[0] 将以字节字符串形式给出消息。

exception multiprocessing.AuthenticationError

当出现认证错误时引发。

exception multiprocessing.TimeoutError

当超时过期时,由带有超时的各种方法引发。

管道和队列

在使用多个进程时,通常使用消息传递在进程之间进行通信,并避免使用任何同步原语,例如锁。

对于传递消息,可以使用 Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

QueueSimpleQueueJoinableQueue 类型是多生产者、多消费者的 FIFO 队列,它们模仿标准库中的 queue.Queue 类。它们的不同之处在于 Queue 缺少 Python 2.5 的 queue.Queue 类中引入的 task_done()join() 方法。

如果使用 JoinableQueue,则对于从队列中删除的每个任务,必须调用 JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量最终可能会溢出,从而引发异常。

与其它 Python 队列实现的一个区别是,multiprocessing 队列使用 pickle 序列化所有放入其中的对象。通过 get 方法返回的对象是一个重新创建的对象,它不与原始对象共享内存。

请注意,也可以通过使用管理器对象创建共享队列——参见 管理器

备注

multiprocessing 使用通常的 queue.Emptyqueue.Full 异常来指示超时。它们在 multiprocessing 命名空间中不可用,因此您需要从 queue 中导入它们。

备注

当一个对象被放入队列时,该对象会被 pickle 序列化,然后一个后台线程会将序列化的数据刷新到底层管道中。这会产生一些令人惊讶的后果,但应该不会造成任何实际困难——如果它们真的困扰您,那么您可以改用由 管理器 创建的队列。

  1. 将对象放入空队列后,可能会有极小的延迟,之后队列的 empty() 方法会返回 False,并且 get_nowait() 可以返回而不会引发 queue.Empty

  2. 如果多个进程正在入队对象,则对象可能会在另一端以乱序接收。但是,由同一进程入队的对象将始终相对于彼此以预期顺序排列。

警告

如果一个进程在使用 Process.terminate()os.kill() 尝试使用 Queue 时被杀死,则队列中的数据很可能会损坏。这可能导致任何其他进程在稍后尝试使用队列时获得异常。

警告

如上所述,如果子进程已将项目放入队列中(并且未使用 JoinableQueue.cancel_join_thread),则该进程在所有缓冲项目刷新到管道之前不会终止。

这意味着如果你尝试连接该进程,你可能会死锁,除非你确定所有已放入队列的项目都已被消费。同样,如果子进程是非守护进程,则父进程在退出时尝试连接所有非守护子进程时可能会挂起。

请注意,使用管理器创建的队列没有此问题。请参阅 编程指南

有关队列用于进程间通信的示例,请参阅 示例

multiprocessing.Pipe([duplex])

返回一对 (conn1, conn2)Connection 对象,表示管道的两端。

如果 duplexTrue (默认),则管道是双向的。如果 duplexFalse,则管道是单向的:conn1 只能用于接收消息,conn2 只能用于发送消息。

send() 方法使用 pickle 序列化对象,而 recv() 则重新创建对象。

class multiprocessing.Queue([maxsize])

返回一个使用管道和一些锁/信号量实现的进程共享队列。当一个进程首次将项目放入队列时,会启动一个 feeder 线程,该线程将对象从缓冲区传输到管道中。

标准库 queue 模块中通常的 queue.Emptyqueue.Full 异常用于表示超时。

Queue 实现了 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回队列的大致大小。由于多线程/多进程语义,这个数字不可靠。

请注意,这可能会在 macOS 等平台上引发 NotImplementedError,因为 sem_getvalue() 未实现。

empty()

如果队列为空,则返回 True,否则返回 False。由于多线程/多进程语义,这不可靠。

在关闭的队列上可能引发 OSError。(不保证)

full()

如果队列已满,则返回 True,否则返回 False。由于多线程/多进程语义,这不可靠。

put(obj[, block[, timeout]])

将 obj 放入队列。如果可选参数 blockTrue(默认值)且 timeoutNone(默认值),则必要时阻塞直到有空闲槽可用。如果 timeout 是正数,它最多阻塞 timeout 秒,如果在此时间内没有空闲槽可用,则引发 queue.Full 异常。否则(blockFalse),如果立即有空闲槽可用,则将项目放入队列,否则引发 queue.Full 异常(在这种情况下,timeout 被忽略)。

3.8 版本发生变化: 如果队列已关闭,则会引发 ValueError 而不是 AssertionError

put_nowait(obj)

等同于 put(obj, False)

get([block[, timeout]])

从队列中移除并返回一个项目。如果可选参数 blockTrue(默认值)且 timeoutNone(默认值),则必要时阻塞直到有项目可用。如果 timeout 是正数,它最多阻塞 timeout 秒,如果在此时间内没有项目可用,则引发 queue.Empty 异常。否则(blockFalse),如果立即有项目可用,则返回一个项目,否则引发 queue.Empty 异常(在这种情况下,timeout 被忽略)。

3.8 版本发生变化: 如果队列已关闭,则会引发 ValueError 而不是 OSError

get_nowait()

等同于 get(False)

multiprocessing.Queue 还有一些 queue.Queue 中没有的额外方法。这些方法对于大多数代码通常是不必要的。

close()

关闭队列:释放内部资源。

队列一旦关闭,就不能再使用。例如,get()put()empty() 方法不能再调用。

后台线程在将所有缓冲数据刷新到管道后将退出。这在队列被垃圾回收时会自动调用。

join_thread()

等待后台线程。这只能在调用 close() 之后使用。它会阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。

默认情况下,如果一个进程不是队列的创建者,那么在退出时它会尝试加入队列的后台线程。该进程可以调用 cancel_join_thread() 使 join_thread() 不执行任何操作。

cancel_join_thread()

阻止 join_thread() 阻塞。特别是,这会阻止后台线程在进程退出时自动被连接——参见 join_thread()

这个方法的一个更好的名称可能是 allow_exit_without_flush()。它可能会导致入队数据丢失,而且你几乎肯定不需要使用它。它实际上只在你需要当前进程立即退出而无需等待将入队数据刷新到底层管道,并且你不关心数据丢失时才存在。

备注

此类的功能需要主机操作系统上可用的共享信号量实现。如果没有,此类的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。有关更多信息,请参见 bpo-3770。以下列出的任何专用队列类型也同样如此。

class multiprocessing.SimpleQueue

它是一种简化的 Queue 类型,与加锁的 Pipe 非常相似。

close()

关闭队列:释放内部资源。

队列一旦关闭,就不能再使用。例如,get()put()empty() 方法不能再调用。

在 3.9 版本中新增。

empty()

如果队列为空,则返回 True,否则返回 False

如果 SimpleQueue 已关闭,则总是引发 OSError

get()

从队列中移除并返回一个项目。

put(item)

item 放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 的子类,它额外具有 task_done()join() 方法。

task_done()

指示先前排队的任务已完成。由队列消费者使用。对于每个用于获取任务的 get() 调用,后续对 task_done() 的调用会告知队列该任务的处理已完成。

如果当前 join() 正在阻塞,它将在所有项目都被处理后恢复(这意味着对于队列中每个已被 put() 的项目,都收到了一个 task_done() 调用)。

如果调用次数多于队列中放置的项目数,则会引发 ValueError

join()

阻塞直到队列中的所有项目都已被获取和处理。

每当一个项目被添加到队列时,未完成任务的计数就会增加。每当消费者调用 task_done() 以指示该项目已被检索并且其所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 解除阻塞。

杂项

multiprocessing.active_children()

返回当前进程的所有活动子进程的列表。

调用此函数会产生“连接”任何已完成的进程的副作用。

multiprocessing.cpu_count()

返回系统中的 CPU 数量。

此数字不等于当前进程可以使用的 CPU 数量。可用 CPU 的数量可以通过 os.process_cpu_count()(或 len(os.sched_getaffinity(0)))获取。

当无法确定 CPU 数量时,会引发 NotImplementedError

3.13 版本中的变化: 返回值也可以使用 -X cpu_count 标志或 PYTHON_CPU_COUNT 进行覆盖,因为它只是 os CPU 计数 API 的一个包装器。

multiprocessing.current_process()

返回与当前进程对应的 Process 对象。

类似于 threading.current_thread()

multiprocessing.parent_process()

返回与 current_process() 的父进程对应的 Process 对象。对于主进程,parent_process 将为 None

在 3.8 版本加入。

multiprocessing.freeze_support()

为使用 multiprocessing 的程序在冻结以生成可执行文件时添加支持。(已使用 py2exePyInstallercx_Freeze 进行测试。)

需要在主模块的 if __name__ == '__main__' 行之后立即调用此函数。例如

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略 freeze_support() 行,则尝试运行冻结的可执行文件将引发 RuntimeError

当启动方法不是 spawn 时,调用 freeze_support() 没有效果。此外,如果模块是由 Python 解释器正常运行(程序未被冻结),则 freeze_support() 没有效果。

multiprocessing.get_all_start_methods()

返回支持的启动方法列表,其中第一个是默认方法。可能的启动方法是 'fork''spawn''forkserver'。并非所有平台都支持所有方法。参见 上下文和启动方法

在 3.4 版本加入。

multiprocessing.get_context(method=None)

返回一个上下文对象,该对象具有与 multiprocessing 模块相同的属性。

如果 methodNone,则返回默认上下文。请注意,如果尚未设置全局启动方法,此操作将将其设置为默认方法。否则 method 应该为 'fork''spawn''forkserver'。如果指定的启动方法不可用,则会引发 ValueError。参见 上下文和启动方法

在 3.4 版本加入。

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的启动方法的名称。

如果尚未设置全局启动方法且 allow_noneFalse,则将启动方法设置为默认方法并返回其名称。如果尚未设置启动方法且 allow_noneTrue,则返回 None

返回值可以是 'fork''spawn''forkserver'None。参见 上下文和启动方法

在 3.4 版本加入。

3.8 版本中的变化: 在 macOS 上,spawn 启动方法现在是默认方法。fork 启动方法应被视为不安全,因为它可能导致子进程崩溃。参见 bpo-33725

multiprocessing.set_executable(executable)

设置启动子进程时使用的 Python 解释器的路径。(默认使用 sys.executable)。嵌入者可能需要做一些类似以下的事情

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

才能创建子进程。

3.4 版本中的变化: 现在在 POSIX 上使用 'spawn' 启动方法时受支持。

3.11 版本中的变化: 接受 类路径对象

multiprocessing.set_forkserver_preload(module_names)

设置一个模块名称列表,供 forkserver 主进程尝试导入,以便它们已导入的状态可由分叉进程继承。在此过程中发生的任何 ImportError 都将被静默忽略。这可以作为一种性能增强,以避免在每个进程中重复工作。

为此,必须在 forkserver 进程启动之前(在创建 Pool 或启动 Process 之前)调用它。

仅在使用 'forkserver' 启动方法时有意义。参见 上下文和启动方法

在 3.4 版本加入。

multiprocessing.set_start_method(method, force=False)

设置用于启动子进程的方法。method 参数可以是 'fork''spawn''forkserver'。如果启动方法已经设置且 force 不是 True,则引发 RuntimeError。如果 methodNoneforceTrue,则启动方法设置为 None。如果 methodNoneforceFalse,则上下文设置为默认上下文。

请注意,此函数最多只能调用一次,并且应在主模块的 if __name__ == '__main__' 子句中受到保护。

参见 上下文和启动方法

在 3.4 版本加入。

连接对象

连接对象允许发送和接收可 pickle 的对象或字符串。它们可以被视为面向消息的连接套接字。

连接对象通常使用 Pipe 创建——另请参见 监听器和客户端

class multiprocessing.connection.Connection
send(obj)

向连接的另一端发送一个对象,该对象应该使用 recv() 读取。

该对象必须是可 pickle 的。非常大的 pickle(大约 32 MiB+,尽管这取决于操作系统)可能会引发 ValueError 异常。

recv()

返回使用 send() 从连接的另一端发送的对象。阻塞直到有东西可以接收。如果没有可接收的数据且另一端已关闭,则引发 EOFError

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾回收时会自动调用此方法。

poll([timeout])

返回是否有任何数据可供读取。

如果未指定 timeout,则立即返回。如果 timeout 是一个数字,则指定阻塞的最大时间(秒)。如果 timeoutNone,则使用无限超时。

请注意,可以使用 multiprocessing.connection.wait() 同时轮询多个连接对象。

send_bytes(buffer[, offset[, size]])

类字节对象 发送字节数据作为完整消息。

如果给定了 offset,则从 buffer 中的该位置读取数据。如果给定了 size,则将从 buffer 中读取这么多字节。非常大的缓冲区(大约 32 MiB+,尽管这取决于操作系统)可能会引发 ValueError 异常

recv_bytes([maxlength])

返回从连接的另一端发送的完整字节数据消息作为字符串。阻塞直到有东西可以接收。如果没有可接收的数据且另一端已关闭,则引发 EOFError

如果指定了 maxlength 并且消息长度超过 maxlength,则会引发 OSError,并且连接将不再可读。

3.3 版本中的变化: 此函数以前会引发 IOError,现在它是 OSError 的别名。

recv_bytes_into(buffer[, offset])

将从连接另一端发送的完整字节数据消息读取到 buffer 中,并返回消息中的字节数。阻塞直到有数据可接收。如果没有剩余数据可接收且另一端已关闭,则引发 EOFError

buffer 必须是可写的 类字节对象。如果指定了 offset,则消息将从缓冲区的该位置写入。offset 必须是一个非负整数,小于 buffer 的长度(以字节为单位)。

如果缓冲区太短,则会引发 BufferTooShort 异常,并且完整消息可在 e.args[0] 中获取,其中 e 是异常实例。

3.3 版本中的变化: 连接对象现在可以通过 Connection.send()Connection.recv() 在进程之间传输。

连接对象现在也支持上下文管理协议——参见 上下文管理器类型__enter__() 返回连接对象,而 __exit__() 调用 close()

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

由于 Connection.recv() 方法会自动解包它接收到的数据,这可能带来安全风险,除非您信任发送消息的进程。

因此,除非连接对象是使用 Pipe() 产生的,否则您应该在执行某种形式的身份验证之后才使用 recv()send() 方法。参见 认证密钥

警告

如果一个进程在尝试读写管道时被终止,则管道中的数据很可能会损坏,因为可能无法确定消息边界的位置。

同步原语

通常,同步原语在多进程程序中不像在多线程程序中那样必要。请参阅 threading 模块的文档。

请注意,也可以通过使用管理器对象来创建同步原语——参见 管理器

class multiprocessing.Barrier(parties[, action[, timeout]])

一个屏障对象:threading.Barrier 的克隆。

在 3.3 版本加入。

class multiprocessing.BoundedSemaphore([value])

一个有界信号量对象:与 threading.BoundedSemaphore 紧密相似。

与其紧密相似的类有一个唯一的区别:其 acquire 方法的第一个参数名为 block,这与 Lock.acquire() 保持一致。

备注

在 macOS 上,这与 Semaphore 没有区别,因为 sem_getvalue() 在该平台上未实现。

class multiprocessing.Condition([lock])

一个条件变量:threading.Condition 的别名。

如果指定了 lock,它应该是一个来自 multiprocessing 模块的 LockRLock 对象。

3.3 版更改: 添加了 wait_for() 方法。

class multiprocessing.Event

threading.Event 的克隆。

class multiprocessing.Lock

一个非递归锁对象:与 threading.Lock 紧密相似。一旦某个进程或线程获得了锁,任何其他进程或线程的后续获取尝试都将被阻塞,直到锁被释放;任何进程或线程都可以释放它。threading.Lock 应用于线程的概念和行为,在这里被复制到 multiprocessing.Lock 并应用于进程或线程,但另有说明的除外。

请注意,Lock 实际上是一个工厂函数,它返回一个用默认上下文初始化的 multiprocessing.synchronize.Lock 实例。

Lock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

block 参数设置为 True(默认值)时,方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定状态并返回 True。请注意,这个第一个参数的名称与 threading.Lock.acquire() 中的不同。

block 参数设置为 False 时,方法调用不阻塞。如果锁当前处于锁定状态,则返回 False;否则将锁设置为锁定状态并返回 True

当以一个正浮点值作为 timeout 调用时,最多阻塞 timeout 指定的秒数,只要锁无法被获取。当以负值作为 timeout 调用时,等同于 timeout 为零。当以 None(默认值)作为 timeout 调用时,超时周期设置为无限。请注意,对 timeout 的负值或 None 值的处理与 threading.Lock.acquire() 中实现的行为不同。如果 block 参数设置为 False,则 timeout 参数没有实际意义,因此会被忽略。如果锁已被获取,则返回 True,如果超时周期已过,则返回 False

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与 threading.Lock.release() 相同,不同之处在于,当在一个未锁定(已释放)的锁上调用时,会引发 ValueError

locked()

返回一个布尔值,指示此对象当前是否已锁定。

在 3.14 版本加入。

class multiprocessing.RLock

一个可重入锁对象:与 threading.RLock 紧密相似。可重入锁必须由获取它的进程或线程释放。一旦一个进程或线程获取了一个可重入锁,同一个进程或线程可以在不阻塞的情况下再次获取它;该进程或线程必须为每次获取释放一次。

请注意,RLock 实际上是一个工厂函数,它返回一个用默认上下文初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

当以 block 参数设置为 True 调用时,阻塞直到锁处于未锁定状态(不被任何进程或线程拥有),除非锁已被当前进程或线程拥有。当前进程或线程然后获取锁的所有权(如果它还没有所有权),并且锁内部的递归级别增加一,导致返回值为 True。请注意,与 threading.RLock.acquire() 的实现相比,这个第一个参数的行为有几个不同之处,从参数本身的名称开始。

当以 block 参数设置为 False 调用时,不阻塞。如果锁已被其他进程或线程获取(并因此被拥有),当前进程或线程不获取所有权,并且锁内的递归级别不改变,导致返回值为 False。如果锁处于未锁定状态,当前进程或线程获取所有权,并且递归级别递增,导致返回值为 True

timeout 参数的使用和行为与 Lock.acquire() 中相同。请注意,timeout 的某些行为与 threading.RLock.acquire() 中实现的行为不同。

release()

释放锁,递减递归级别。如果递减后递归级别为零,则将锁重置为未锁定(不被任何进程或线程拥有),并且如果任何其他进程或线程正在阻塞等待锁变为未锁定,则允许其中一个进程或线程继续。如果递减后递归级别仍非零,则锁保持锁定状态并由调用进程或线程拥有。

仅当调用进程或线程拥有锁时才调用此方法。如果此方法由非所有者的进程或线程调用,或者如果锁处于未锁定(未拥有)状态,则会引发 AssertionError。请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中实现的行为不同。

locked()

返回一个布尔值,指示此对象当前是否已锁定。

在 3.14 版本加入。

class multiprocessing.Semaphore([value])

一个信号量对象:与 threading.Semaphore 紧密相似。

与其紧密相似的类有一个唯一的区别:其 acquire 方法的第一个参数名为 block,这与 Lock.acquire() 保持一致。

备注

在 macOS 上,sem_timedwait 不受支持,因此使用超时调用 acquire() 将使用休眠循环来模拟该函数的行为。

备注

此软件包的某些功能需要在主机操作系统上具有功能正常的共享信号量实现。如果没有,multiprocessing.synchronize 模块将被禁用,尝试导入它将导致 ImportError。有关更多信息,请参阅 bpo-3770

共享 ctypes 对象

可以使用共享内存创建可由子进程继承的共享对象。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存分配的 ctypes 对象。默认情况下,返回值实际上是该对象的同步包装器。对象本身可以通过 Valuevalue 属性访问。

typecode_or_type 决定返回对象的类型:它要么是一个 ctypes 类型,要么是一个 array 模块使用的单字符类型码。*args 传递给该类型的构造函数。

如果 lockTrue(默认值),则会创建一个新的可重入锁对象来同步对值的访问。如果 lockLockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问将不会自动受锁保护,因此它不一定“进程安全”。

诸如 += 等涉及读写操作的操作不是原子性的。因此,例如,如果你想原子地递增一个共享值,仅仅这样做是不够的:

counter.value += 1

假设关联的锁是递归的(默认情况下是),你可以这样做:

with counter.get_lock():
    counter.value += 1

请注意,lock 是一个仅限关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的 ctypes 数组。默认情况下,返回值实际上是该数组的同步包装器。

typecode_or_type 决定返回数组元素的类型:它要么是一个 ctypes 类型,要么是一个 array 模块使用的单字符类型码。如果 size_or_initializer 是一个整数,则它决定了数组的长度,并且数组将最初归零。否则,size_or_initializer 是一个用于初始化数组的序列,其长度决定了数组的长度。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问将不会自动受锁保护,因此它不一定“进程安全”。

请注意,lock 是一个仅限关键字的参数。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许使用它来存储和检索字符串。

multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了从共享内存分配 ctypes 对象的函数,这些对象可以由子进程继承。

备注

虽然可以在共享内存中存储指针,但请记住这将引用特定进程地址空间中的位置。然而,该指针在第二个进程的上下文中很可能是无效的,并且尝试从第二个进程解引用该指针可能会导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的 ctypes 数组。

typecode_or_type 决定返回数组元素的类型:它要么是一个 ctypes 类型,要么是一个 array 模块使用的单字符类型码。如果 size_or_initializer 是一个整数,则它决定了数组的长度,并且数组将最初归零。否则 size_or_initializer 是一个用于初始化数组的序列,其长度决定了数组的长度。

请注意,设置和获取元素可能不是原子性的——请改用 Array() 以确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的 ctypes 对象。

typecode_or_type 决定返回对象的类型:它要么是一个 ctypes 类型,要么是一个 array 模块使用的单字符类型码。*args 传递给该类型的构造函数。

请注意,设置和获取值可能不是原子性的——请改用 Value() 以确保使用锁自动同步访问。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许使用它来存储和检索字符串——请参阅 ctypes 的文档。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,不同之处在于,根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始的 ctypes 数组。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问将不会自动受锁保护,因此它不一定“进程安全”。

请注意,lock 是一个仅限关键字的参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,不同之处在于,根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始的 ctypes 对象。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对值的访问。如果 lockLockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问将不会自动受锁保护,因此它不一定“进程安全”。

请注意,lock 是一个仅限关键字的参数。

multiprocessing.sharedctypes.copy(obj)

返回一个从共享内存分配的 ctypes 对象,它是 ctypes 对象 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回 ctypes 对象的进程安全包装器对象,该包装器使用 lock 同步访问。如果 lockNone(默认值),则会自动创建一个 multiprocessing.RLock 对象。

同步包装器除了包装对象的方法外,还有两个方法:get_obj() 返回被包装的对象,get_lock() 返回用于同步的锁对象。

请注意,通过包装器访问 ctypes 对象可能比直接访问原始 ctypes 对象慢得多。

3.5 版更改: 同步对象支持 上下文管理器 协议。

下表比较了使用共享内存创建共享 ctypes 对象的语法与普通 ctypes 语法的区别。(在表中 MyStructctypes.Structure 的某个子类。)

ctypes

使用类型时的 sharedctypes

使用类型码时的 sharedctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue('d', 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray('h', 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray('i', (9, 2, 8))

下面是一个示例,其中子进程修改了多个 ctypes 对象

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一种创建可以在不同进程之间共享数据的方式,包括通过网络在不同机器上运行的进程之间共享数据。管理器对象控制一个管理 共享对象 的服务器进程。其他进程可以使用代理访问共享对象。

multiprocessing.Manager()

返回一个已启动的 SyncManager 对象,可用于在进程之间共享对象。返回的管理器对象对应于一个已派生的子进程,并具有创建共享对象和返回相应代理的方法。

管理器进程会在它们被垃圾回收或其父进程退出时立即关闭。管理器类定义在 multiprocessing.managers 模块中

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

创建一个 BaseManager 对象。

创建后,应调用 start()get_server().serve_forever(),以确保管理器对象引用已启动的管理器进程。

address 是管理器进程监听新连接的地址。如果 addressNone,则会选择一个任意地址。

authkey 是身份验证密钥,用于检查传入到服务器进程的连接的有效性。如果 authkeyNone,则使用 current_process().authkey。否则,使用 authkey,它必须是一个字节字符串。

serializer 必须是 'pickle'(使用 pickle 序列化)或 'xmlrpclib'(使用 xmlrpc.client 序列化)。

ctx 是一个上下文对象,或 None(使用当前上下文)。参见 get_context() 函数。

shutdown_timeout 是一个以秒为单位的超时时间,用于等待管理器使用的进程在 shutdown() 方法中完成。如果关闭超时,则终止进程。如果终止进程也超时,则杀死进程。

3.11 版本中的变化: 添加了 shutdown_timeout 参数。

start([initializer[, initargs]])

启动一个子进程来启动管理器。如果 initializer 不是 None,那么子进程启动时将调用 initializer(*initargs)

get_server()

返回一个 Server 对象,该对象代表管理器控制下的实际服务器。Server 对象支持 serve_forever() 方法。

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 还有一个 address 属性。

connect()

将本地管理器对象连接到远程管理器进程。

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的进程。这仅在已使用 start() 启动服务器进程时才可用。

可以多次调用此方法。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

一个类方法,可用于向管理器类注册类型或可调用对象。

typeid 是一个“类型标识符”,用于标识特定类型的共享对象。它必须是一个字符串。

callable 是一个可调用对象,用于为此类型标识符创建对象。如果管理器实例将使用 connect() 方法连接到服务器,或者如果 create_method 参数为 False,则可以将其保留为 None

proxytypeBaseProxy 的子类,用于为此 typeid 创建共享对象的代理。如果为 None,则会自动创建一个代理类。

exposed 用于指定一组方法名称,对于此 typeid 的代理应允许使用 BaseProxy._callmethod() 访问这些方法。(如果 exposedNone,则改为使用 proxytype._exposed_(如果存在)。)如果未指定暴露列表,则共享对象的所有“公共方法”都将可访问。(此处“公共方法”指任何具有 __call__() 方法且名称不以 '_' 开头的属性。)

method_to_typeid 是一个映射,用于指定那些应该返回代理的暴露方法的返回类型。它将方法名称映射到 typeid 字符串。(如果 method_to_typeidNone,则改为使用 proxytype._method_to_typeid_(如果存在)。)如果方法名称不是此映射的键,或者如果映射为 None,则方法返回的对象将按值复制。

create_method 决定是否应该创建一个名称为 typeid 的方法,该方法可用于告诉服务器进程创建一个新的共享对象并返回其代理。默认情况下,它为 True

BaseManager 实例还具有一个只读属性

address

管理器使用的地址。

3.3 版本中的变化: 管理器对象支持上下文管理协议——参见 上下文管理器类型__enter__() 启动服务器进程(如果尚未启动),然后返回管理器对象。__exit__() 调用 shutdown()

在以前的版本中,如果管理器的服务器进程尚未启动,__enter__() 不会启动它。

class multiprocessing.managers.SyncManager

BaseManager 的子类,可用于进程同步。multiprocessing.Manager() 返回此类对象。

它的方法为一些常用的数据类型创建并返回 代理对象,这些数据类型可以在进程间同步。这尤其包括共享列表和字典。

Barrier(parties[, action[, timeout]])

创建一个共享的 threading.Barrier 对象并返回其代理。

在 3.3 版本加入。

BoundedSemaphore([value])

创建一个共享的 threading.BoundedSemaphore 对象并返回其代理。

Condition([lock])

创建一个共享的 threading.Condition 对象并返回其代理。

如果提供了 lock,则它应该是一个 threading.Lockthreading.RLock 对象的代理。

3.3 版更改: 添加了 wait_for() 方法。

Event()

创建一个共享的 threading.Event 对象并返回其代理。

Lock()

创建一个共享的 threading.Lock 对象并返回其代理。

Namespace()

创建一个共享的 Namespace 对象并返回其代理。

Queue([maxsize])

创建一个共享的 queue.Queue 对象并返回其代理。

RLock()

创建一个共享的 threading.RLock 对象并返回其代理。

Semaphore([value])

创建一个共享的 threading.Semaphore 对象并返回其代理。

Array(typecode, sequence)

创建一个数组并返回其代理。

Value(typecode, value)

创建一个具有可写 value 属性的对象并返回其代理。

dict()
dict(mapping)
dict(sequence)

创建一个共享的 dict 对象并返回其代理。

list()
list(sequence)

创建一个共享的 list 对象并返回其代理。

set()
set(sequence)
set(mapping)

创建一个共享的 set 对象并返回其代理。

3.14 版本新增: 添加了 set 支持。

3.6 版本中的变化: 共享对象可以嵌套。例如,一个共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。

class multiprocessing.managers.Namespace

一种可以向 SyncManager 注册的类型。

命名空间对象没有公共方法,但有可写属性。它的表示显示其属性的值。

然而,在使用命名空间对象的代理时,以 '_' 开头的属性将是代理的属性,而不是引用对象的属性。

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

自定义管理器

要创建自己的管理器,可以创建一个 BaseManager 的子类,并使用 register() 类方法向管理器类注册新的类型或可调用对象。例如:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用远程管理器

可以在一台机器上运行管理器服务器,并让客户端从其他机器上使用它(假设相关的防火墙允许)。

运行以下命令会创建一个用于单个共享队列的服务器,远程客户端可以访问它:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以如下访问服务器:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用上面客户端的代码来远程访问它:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理对象

代理是一个**引用**共享对象(假定位于不同进程中)的对象。共享对象被称为代理的**引用对象**。多个代理对象可以具有相同的引用对象。

代理对象具有调用其引用对象相应方法的方法(尽管并非引用对象的每个方法都一定能通过代理访问)。这样,代理可以像其引用对象一样使用:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

请注意,对代理应用 str() 将返回引用对象的表示,而应用 repr() 将返回代理的表示。

代理对象的一个重要特性是它们是可序列化的,因此可以在进程之间传递。因此,引用对象可以包含 代理对象。这允许这些托管列表、字典和其他 代理对象 的嵌套。

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

同样,字典和列表代理可以相互嵌套:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果标准(非代理)listdict 对象包含在引用对象中,则对这些可变值的修改不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,将值存储在容器代理中(这会触发代理对象上的 __setitem__)会通过管理器传播,因此要有效修改此类项,可以将被修改的值重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

对于大多数用例来说,这种方法可能不如使用嵌套的 代理对象 方便,但也展示了对同步的控制级别。

备注

multiprocessing 中的代理类型不支持按值比较。因此,例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

进行比较时,应该直接使用引用对象的副本。

class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy 子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理引用对象方法的执行结果。

如果 proxy 是一个代理,其引用对象是 obj,那么表达式

proxy._callmethod(methodname, args, kwds)

将计算表达式

getattr(obj, methodname)(*args, **kwds)

在管理器进程中。

返回的值将是调用结果的副本,或者是新共享对象的代理——请参阅 BaseManager.register()method_to_typeid 参数文档。

如果在调用时引发异常,则 _callmethod() 将重新引发该异常。如果管理器进程中引发了其他异常,则将其转换为 RemoteError 异常,并由 _callmethod() 重新引发。

特别注意,如果 methodname 未被**暴露**,则会引发异常。

使用 _callmethod() 的示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用对象的副本。

如果引用对象不可序列化,则会引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回引用对象的表示。

清理

代理对象使用弱引用回调,这样当它被垃圾回收时,它会从拥有其引用对象的管理器中注销自己。

当不再有任何代理引用共享对象时,该对象将从管理器进程中删除。

进程池

可以使用 Pool 类创建进程池,以执行提交给它的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,控制着一个工作进程池,可以将任务提交给它。它支持带超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工人进程数。如果 processesNone,则使用 os.process_cpu_count() 返回的数字。

如果 initializer 不是 None,则每个工作进程启动时将调用 initializer(*initargs)

maxtasksperchild 是一个工作进程在退出并被一个新的工作进程替换之前可以完成的任务数量,以便释放未使用的资源。默认的 maxtasksperchildNone,这意味着工作进程将与池一样长寿。

context 可用于指定启动工作进程所用的上下文。通常,池是使用函数 multiprocessing.Pool() 或上下文对象的 Pool() 方法创建的。在这两种情况下,context 都被适当地设置。

请注意,池对象的方法只能由创建池的进程调用。

警告

multiprocessing.pool 对象具有需要通过将池用作上下文管理器或手动调用 close()terminate() 来正确管理(像任何其他资源一样)的内部资源。未能这样做可能导致进程在终结时挂起。

请注意,依赖垃圾回收器销毁池是不正确的,因为 CPython 不保证会调用池的终结器(更多信息请参见 object.__del__())。

3.2 版本中已更改: 添加了 maxtasksperchild 参数。

3.4 版本中已更改: 添加了 context 参数。

3.13 版本中已更改: processes 默认使用 os.process_cpu_count(),而不是 os.cpu_count()

备注

Pool 中的工作进程通常在池的工作队列的整个生命周期内存在。其他系统(如 Apache、mod_wsgi 等)中常见的释放工作进程所占用的资源模式是允许池中的工作进程在退出、清理并生成新进程替换旧进程之前仅完成一定量的工作。Poolmaxtasksperchild 参数向最终用户公开了此功能。

apply(func[, args[, kwds]])

使用参数 args 和关键字参数 kwds 调用 func。它会阻塞直到结果就绪。考虑到这种阻塞,apply_async() 更适合并行执行工作。此外,func 只在池中的一个工作进程中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() 方法的一个变体,它返回一个 AsyncResult 对象。

如果指定了 callback,它应该是一个接受单个参数的可调用对象。当结果就绪时,callback 会应用于它,除非调用失败,在这种情况下会改为应用 error_callback

如果指定了 error_callback,它应该是一个接受单个参数的可调用对象。如果目标函数失败,则会使用异常实例调用 error_callback

回调函数应该立即完成,否则处理结果的线程将阻塞。

map(func, iterable[, chunksize])

内置函数 map() 的并行等效版本(它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap())。它会阻塞直到结果就绪。

此方法将可迭代对象切分成若干块,并将其作为单独的任务提交给进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

请注意,对于非常长的可迭代对象,这可能会导致高内存使用率。为了提高效率,请考虑使用带有显式 chunksize 选项的 imap()imap_unordered()

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 方法的一个变体,它返回一个 AsyncResult 对象。

如果指定了 callback,它应该是一个接受单个参数的可调用对象。当结果就绪时,callback 会应用于它,除非调用失败,在这种情况下会改为应用 error_callback

如果指定了 error_callback,它应该是一个接受单个参数的可调用对象。如果目标函数失败,则会使用异常实例调用 error_callback

回调函数应该立即完成,否则处理结果的线程将阻塞。

imap(func, iterable[, chunksize])

map() 的一个更懒惰的版本。

chunksize 参数与 map() 方法使用的参数相同。对于非常长的可迭代对象,使用较大的 chunksize 值可以使作业完成速度比使用默认值 1 快得多

此外,如果 chunksize1,则 imap() 方法返回的迭代器的 next() 方法有一个可选的 timeout 参数:如果结果不能在 timeout 秒内返回,next(timeout) 将引发 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,只是返回的迭代器中结果的顺序应被视为任意。(只有当只有一个工作进程时,顺序才保证是“正确”的。)

starmap(func, iterable[, chunksize])

类似于 map(),只是 iterable 的元素预期是作为参数解包的可迭代对象。

因此,一个 [(1,2), (3, 4)]iterable 会得到 [func(1,2), func(3,4)]

在 3.3 版本加入。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() 的组合,它遍历可迭代对象中的可迭代对象,并使用解包后的可迭代对象调用 func。返回一个结果对象。

在 3.3 版本加入。

close()

阻止向池提交更多任务。一旦所有任务完成,工作进程将退出。

terminate()

立即停止工作进程,不完成未完成的工作。当池对象被垃圾回收时,terminate() 将立即被调用。

join()

等待工作进程退出。在使用 join() 之前必须调用 close()terminate()

3.3 版本中已更改: 池对象现在支持上下文管理协议 – 参见 上下文管理器类型__enter__() 返回池对象,__exit__() 调用 terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() 返回的结果的类。

get([timeout])

结果到达时返回结果。如果 timeout 不是 None 并且结果未在 timeout 秒内到达,则会引发 multiprocessing.TimeoutError。如果远程调用引发异常,则 get() 将重新引发该异常。

wait([timeout])

等待直到结果可用或直到 timeout 秒过去。

ready()

返回调用是否已完成。

successful()

返回调用是否成功完成而未引发异常。如果结果尚未就绪,则会引发 ValueError

3.7 版本中已更改: 如果结果尚未就绪,则引发 ValueError 而不是 AssertionError

以下示例演示了池的使用

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

监听器和客户端

通常,进程间的消息传递是通过队列或使用 Pipe() 返回的 Connection 对象完成的。

然而,multiprocessing.connection 模块提供了一些额外的灵活性。它基本上为处理套接字或 Windows 命名管道提供了高级的面向消息的 API。它还支持使用 hmac 模块进行摘要认证,并支持同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

向连接的另一端发送一个随机生成的消息并等待回复。

如果回复与使用 authkey 作为密钥的消息摘要匹配,则向连接的另一端发送欢迎消息。否则会引发 AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用 authkey 作为密钥计算消息的摘要,然后将摘要发送回去。

如果未收到欢迎消息,则会引发 AuthenticationError

multiprocessing.connection.Client(address[, family[, authkey]])

尝试建立与使用地址 address 的监听器的连接,返回一个 Connection

连接的类型由 family 参数决定,但通常可以省略,因为它通常可以从 address 的格式中推断出来。(参见 地址格式

如果给出了 authkey 且不为 None,它应该是一个字节字符串,并将用作基于 HMAC 认证挑战的密钥。如果 authkeyNone,则不进行认证。如果认证失败,则会引发 AuthenticationError。参见 认证密钥

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

一个已绑定套接字或 Windows 命名管道的包装器,它“监听”连接。

address 是监听器对象绑定的套接字或命名管道要使用的地址。

备注

如果使用地址“0.0.0.0”,则该地址在 Windows 上将不是可连接的端点。如果需要可连接的端点,应使用“127.0.0.1”。

family 是要使用的套接字(或命名管道)的类型。它可以是字符串 'AF_INET' (用于 TCP 套接字),'AF_UNIX' (用于 Unix 域套接字) 或 'AF_PIPE' (用于 Windows 命名管道)。其中只有第一个保证可用。如果 familyNone,则从 address 的格式推断 family。如果 address 也是 None,则选择一个默认值。此默认值被认为是可用的最快的 family。请参阅 地址格式。请注意,如果 family'AF_UNIX' 并且 address 是 None,则套接字将使用 tempfile.mkstemp() 创建的私有临时目录中创建。

如果监听器对象使用套接字,则 backlog (默认为 1) 在套接字绑定后传递给套接字的 listen() 方法。

如果给出了 authkey 且不为 None,它应该是一个字节字符串,并将用作基于 HMAC 认证挑战的密钥。如果 authkeyNone,则不进行认证。如果认证失败,则会引发 AuthenticationError。参见 认证密钥

accept()

接受监听器对象的绑定套接字或命名管道上的连接,并返回一个 Connection 对象。如果尝试认证失败,则会引发 AuthenticationError

close()

关闭监听器对象的绑定套接字或命名管道。当监听器被垃圾回收时,会自动调用此方法。然而,建议显式调用它。

监听器对象具有以下只读属性

address

监听器对象正在使用的地址。

last_accepted

上次接受连接的地址。如果不可用,则为 None

3.3 版本中已更改: 监听器对象现在支持上下文管理协议 – 参见 上下文管理器类型__enter__() 返回监听器对象,__exit__() 调用 close()

multiprocessing.connection.wait(object_list, timeout=None)

等待 object_list 中的某个对象就绪。返回 object_list 中已就绪的那些对象的列表。如果 timeout 是浮点数,则调用最多阻塞该秒数。如果 timeoutNone,则会无限期阻塞。负数超时等效于零超时。

对于 POSIX 和 Windows,如果对象是以下类型,它就可以出现在 object_list 中:

当有数据可从中读取,或者另一端已关闭时,连接或套接字对象就绪。

POSIX: wait(object_list, timeout) 几乎等同于 select.select(object_list, [], [], timeout)。区别在于,如果 select.select() 被信号中断,它可能会引发错误号为 EINTROSError,而 wait() 则不会。

Windows: object_list 中的项必须是可等待的整数句柄(根据 Win32 函数 WaitForMultipleObjects() 文档中使用的定义)或者可以是具有 fileno() 方法的对象,该方法返回套接字句柄或管道句柄。(请注意,管道句柄和套接字句柄不是可等待的句柄。)

在 3.3 版本加入。

示例:

以下服务器代码创建一个使用 'secret password' 作为认证密钥的监听器。然后它等待连接并向客户端发送一些数据

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用 wait() 同时等待来自多个进程的消息

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 一个 'AF_INET' 地址是一个 (hostname, port) 形式的元组,其中 hostname 是一个字符串,而 port 是一个整数。

  • 一个 'AF_UNIX' 地址是一个表示文件系统上文件名的字符串。

  • 一个 'AF_PIPE' 地址是 r'\\.\pipe\PipeName' 形式的字符串。要使用 Client() 连接到远程计算机 ServerName 上的命名管道,应该使用 r'\\ServerName\pipe\PipeName' 形式的地址。

请注意,任何以两个反斜杠开头的字符串默认都被认为是 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

认证密钥

当使用 Connection.recv 时,接收到的数据会自动解封装。不幸的是,从不受信任的来源解封装数据存在安全风险。因此,ListenerClient() 使用 hmac 模块提供摘要认证。

认证密钥是一个字节字符串,可以看作是一个密码:一旦建立连接,两端都将要求对方提供知道认证密钥的证据。(证明两端使用相同的密钥 涉及在连接上传输密钥。)

如果请求认证但未指定认证密钥,则使用 current_process().authkey 的返回值(参见 Process)。当前进程创建的任何 Process 对象都将自动继承此值。这意味着(默认情况下)多进程程序的所有进程将共享一个认证密钥,该密钥可用于在它们之间建立连接。

也可以通过使用 os.urandom() 生成合适的认证密钥。

日志记录

提供了一些日志记录支持。但是,请注意 logging 包不使用进程共享锁,因此(取决于处理程序类型)不同进程的消息可能会混淆。

multiprocessing.get_logger()

返回 multiprocessing 使用的日志记录器。如果需要,将创建一个新的日志记录器。

首次创建时,日志记录器的级别为 logging.NOTSET,没有默认处理程序。发送到此日志记录器的消息默认不会传播到根日志记录器。

请注意,在 Windows 上,子进程只会继承父进程日志记录器的级别——日志记录器的任何其他自定义都不会被继承。

multiprocessing.log_to_stderr(level=None)

此函数调用 get_logger(),但除了返回由 get_logger 创建的日志记录器之外,它还添加了一个处理程序,使用格式 '[%(levelname)s/%(processName)s] %(message)s' 将输出发送到 sys.stderr。您可以通过传递 level 参数来修改日志记录器的 levelname

下面是一个开启日志记录的示例会话

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表格,请参阅 logging 模块。

multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,但它仅仅是 threading 模块的一个包装器。

特别是,由 multiprocessing.dummy 提供的 Pool 函数返回 ThreadPool 的一个实例,它是 Pool 的一个子类,支持所有相同的方法调用,但使用的是工作线程池而不是工作进程池。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

一个线程池对象,它控制一个工作线程池,可以将作业提交给这些工作线程。 ThreadPool 实例与 Pool 实例完全接口兼容,并且它们的资源也必须得到妥善管理,要么通过将池用作上下文管理器,要么手动调用 close()terminate()

processes 是要使用的工作线程数。如果 processesNone,则使用 os.process_cpu_count() 返回的数字。

如果 initializer 不是 None,则每个工作进程启动时将调用 initializer(*initargs)

Pool 不同,不能提供 maxtasksperchildcontext

备注

ThreadPool 共享与 Pool 相同的接口,该接口是围绕进程池设计的,并且早于 concurrent.futures 模块的引入。因此,它继承了一些对由线程支持的池没有意义的操作,并且它有自己的类型来表示异步作业的状态,即 AsyncResult,该类型不被任何其他库所理解。

用户通常应优先使用 concurrent.futures.ThreadPoolExecutor,它具有更简单的接口,从一开始就是围绕线程设计的,并且返回 concurrent.futures.Future 实例,这些实例与许多其他库兼容,包括 asyncio

编程指南

在使用 multiprocessing 时,应遵循某些指南和惯例。

所有启动方法

以下适用于所有启动方法。

避免共享状态

应尽可能避免在进程之间传输大量数据。

最好坚持使用队列或管道进行进程间通信,而不是使用较低级别的同步原语。

可封装性

确保代理方法的所有参数都是可封装的。

代理的线程安全性

除非您使用锁保护代理对象,否则不要从多个线程使用它。

(不同的进程使用 同一个 代理永远不会有问题。)

连接僵尸进程

在 POSIX 上,当一个进程完成但尚未被连接时,它会成为僵尸进程。僵尸进程的数量不应该太多,因为每次启动新进程(或调用 active_children())时,所有已完成但尚未连接的进程都将被连接。此外,调用已完成进程的 Process.is_alive 也将连接该进程。即便如此,显式连接您启动的所有进程可能仍是良好的实践。

继承优于封装/解封装

当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型需要可封装,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

避免终止进程

使用 Process.terminate 方法停止进程可能会导致该进程当前使用的任何共享资源(例如锁、信号量、管道和队列)损坏或对其他进程不可用。

因此,最好只考虑对从不使用任何共享资源的进程使用 Process.terminate

连接使用队列的进程

请记住,已将项放入队列的进程在终止之前会等待,直到“馈送器”线程将所有缓冲项馈送到底层管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法来避免此行为。)

这意味着无论何时使用队列,您都需要确保在连接进程之前,所有已放入队列的项最终都会被移除。否则,您无法确定已将项放入队列的进程是否会终止。还要记住,非守护进程会自动连接。

以下是一个会死锁的示例

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的解决方法是交换最后两行(或简单地删除 p.join() 行)。

显式地将资源传递给子进程

在 POSIX 上,使用 fork 启动方法时,子进程可以使用在父进程中创建的共享资源(通过全局资源)。然而,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程仍然存活,对象就不会在父进程中被垃圾回收。如果父进程中对象被垃圾回收时释放了一些资源,这可能很重要。

例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应该改写为

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

小心不要用“类文件对象”替换 sys.stdin

multiprocessing 最初无条件地调用

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中——这导致了进程内进程的问题。这已更改为

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

这解决了进程相互冲突导致坏文件描述符错误的基本问题,但给将 sys.stdin() 替换为带有输出缓冲的“类文件对象”的应用程序带来了潜在危险。这种危险是,如果多个进程在此类文件对象上调用 close(),则可能导致相同的数据多次刷新到对象中,从而导致损坏。

如果您编写一个类文件对象并实现自己的缓存,您可以通过在每次向缓存追加时存储进程 ID,并在进程 ID 更改时丢弃缓存来使其成为 fork-safe。例如

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参阅 bpo-5155bpo-5313bpo-5331

spawnforkserver 启动方法

有一些额外的限制不适用于 fork 启动方法。

更强的可封装性

确保 Process 的所有参数都是可封装的。此外,如果您子类化 Process.__init__,您必须确保在调用 Process.start 方法时实例是可封装的。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与在调用 Process.start 时父进程中的值不同。

然而,仅仅是模块级别常量的全局变量不会引起任何问题。

主模块的安全导入

确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)。

例如,使用 spawnforkserver 启动方法运行以下模块将因 RuntimeError 而失败

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

相反,应该通过使用 if __name__ == '__main__': 来保护程序的“入口点”,如下所示

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程序正常运行而不是冻结,则可以省略 freeze_support() 行。)

这允许新生成的 Python 解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建了池或管理器,也适用类似的限制。

示例

演示如何创建和使用自定义管理器和代理

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

一个示例,展示如何使用队列将任务馈送给一组工作进程并收集结果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()