multiprocessing — 基于进程的并行

源代码: Lib/multiprocessing/


可用性: 不支持 Android, 不支持 iOS, 不支持 WASI。

此模块不支持 移动平台WebAssembly 平台

简介

multiprocessing 是一个支持使用类似于 threading 模块的 API 来生成进程的包。multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程来有效地绕过 全局解释器锁。因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它可以在 POSIX 和 Windows 上运行。

multiprocessing 模块还引入了在 threading 模块中没有对应 API 的接口。一个典型的例子是 Pool 对象,它提供了一种方便的方式来并行化一个函数在多个输入值上的执行,并将输入数据分发到各个进程(数据并行)。下面的示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个使用 Pool 的数据并行基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将会打印到标准输出

[1, 4, 9]

另请参阅

concurrent.futures.ProcessPoolExecutor 提供了一个更高级的接口,可以将任务推送到后台进程,而不会阻塞调用进程的执行。与直接使用 Pool 接口相比,concurrent.futures API 更容易将工作提交到底层进程池与等待结果的操作分离。

Process

multiprocessing 中,进程是通过创建 Process 对象,然后调用其 start() 方法来生成的。Process 遵循 threading.Thread 的 API。一个简单的多进程程序示例是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了展示所涉及的各个进程 ID,这里有一个扩展的示例

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

关于为什么需要 if __name__ == '__main__' 部分的解释,请参阅 编程指南

上下文和启动方法

根据平台的不同,multiprocessing 支持三种启动进程的方法。这些启动方法

spawn

父进程启动一个新的 Python 解释器进程。子进程只会继承运行进程对象的 run() 方法所必需的资源。特别是,父进程中不必要的文件描述符和句柄不会被继承。与使用 forkforkserver 相比,使用此方法启动进程速度较慢。

在 POSIX 和 Windows 平台上可用。在 Windows 和 macOS 上默认为此方法。

fork

父进程使用 os.fork() 来 fork Python 解释器。当子进程开始时,它实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地 fork 多线程进程是有问题的。

在 POSIX 系统上可用。目前在 POSIX 上(macOS 除外)默认为此方法。

注意

在 Python 3.14 中,默认启动方法将不再是 fork。需要 fork 的代码应通过 get_context()set_start_method() 显式指定。

在 3.12 版本中变更: 如果 Python 检测到你的进程有多个线程,此启动方法内部调用的 os.fork() 函数将引发一个 DeprecationWarning。请使用不同的启动方法。有关详细解释,请参阅 os.fork() 文档。

forkserver

当程序启动并选择 *forkserver* 启动方法时,会生成一个服务器进程。从那时起,每当需要一个新进程时,父进程都会连接到服务器并请求它 fork 一个新进程。除非系统库或预加载的导入以副作用的方式生成线程,否则 fork 服务器进程是单线程的,因此通常可以安全地使用 os.fork()。不会继承不必要的资源。

在支持通过 Unix 管道传递文件描述符的 POSIX 平台上可用,例如 Linux。

在 3.4 版本中变更: 在所有 POSIX 平台上添加了 *spawn*,在某些 POSIX 平台上添加了 *forkserver*。子进程不再继承 Windows 上父进程的所有可继承句柄。

在 3.8 版本中变更: 在 macOS 上,*spawn* 启动方法现在是默认方法。*fork* 启动方法应被认为是不安全的,因为它可能导致子进程崩溃,因为 macOS 系统库可能会启动线程。请参阅 bpo-33725

在 POSIX 上使用 *spawn* 或 *forkserver* 启动方法还会启动一个*资源跟踪器*进程,该进程跟踪程序进程创建的未链接的命名系统资源(例如命名信号量或 SharedMemory 对象)。当所有进程都退出后,资源跟踪器会取消链接任何剩余的跟踪对象。通常应该没有剩余对象,但如果进程被信号杀死,可能会有一些“泄漏”的资源。(泄漏的信号量和共享内存段都不会自动取消链接,直到下次重启。这对于这两个对象都是有问题的,因为系统只允许有限数量的命名信号量,并且共享内存段占用主内存中的一些空间。)

要选择启动方法,请在主模块的 if __name__ == '__main__' 子句中使用 set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 在程序中不应使用多次。

或者,你可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的 API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用 *fork* 上下文创建的锁不能传递给使用 *spawn* 或 *forkserver* 启动方法启动的进程。

想要使用特定启动方法的库可能应该使用 get_context(),以避免干扰库用户的选择。

警告

在 POSIX 系统上,'spawn''forkserver' 启动方法通常不能与“冻结”的可执行文件(即,由 PyInstallercx_Freeze 等包生成的二进制文件)一起使用。如果代码不使用线程,'fork' 启动方法可能会起作用。

进程间交换对象

multiprocessing 支持进程之间的两种类型的通信通道

队列

Queue 类几乎是 queue.Queue 的克隆。例如

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。放入 multiprocessing 队列中的任何对象都将被序列化。

管道

Pipe() 函数返回一对通过管道连接的连接对象,默认情况下是双向的(双向)。例如

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的两个连接对象代表管道的两端。每个连接对象都有 send()recv() 方法(以及其他方法)。请注意,如果两个进程(或线程)尝试同时从管道的 *同一* 端读取或写入数据,则管道中的数据可能会损坏。当然,如果进程同时使用管道的不同端,则不存在损坏的风险。

send() 方法序列化对象,recv() 方法重新创建对象。

进程之间的同步

multiprocessing 包含来自 threading 的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用锁,来自不同进程的输出很可能会混在一起。

进程之间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。当使用多个进程时,这一点尤其如此。

但是,如果你确实需要使用一些共享数据,那么 multiprocessing 提供了几种方法来实现。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。例如,以下代码

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

在创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型代码:'d' 表示双精度浮点数,'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。

服务器进程

Manager() 返回的管理器对象控制一个服务器进程,该进程持有 Python 对象并允许其他进程使用代理来操作它们。

Manager() 返回的管理器将支持 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 等类型。例如,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以被设计为支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用工作进程池

Pool 类表示一个工作进程池。它具有一些方法,允许以几种不同的方式将任务卸载到工作进程。

例如

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,池的方法应仅由创建它的进程使用。

注意

此包中的功能要求子进程可以导入 __main__ 模块。这在 编程指南 中有介绍,但在此处指出也很有必要。这意味着某些示例(例如 multiprocessing.pool.Pool 示例)在交互式解释器中将无法正常工作。例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(如果您尝试这样做,实际上会以半随机的方式输出三个完整的追溯信息,然后您可能需要以某种方式停止父进程。)

参考

multiprocessing 包主要复制了 threading 模块的 API。

Process 和异常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process 对象表示在单独进程中运行的活动。Process 类具有 threading.Thread 的所有方法的等价物。

构造函数应始终使用关键字参数调用。group 应该始终为 None;它存在仅仅是为了与 threading.Thread 兼容。target 是要由 run() 方法调用的可调用对象。它默认为 None,这意味着不调用任何内容。name 是进程名称(有关更多详细信息,请参阅 name)。args 是目标调用的参数元组。kwargs 是目标调用的关键字参数字典。如果提供,则仅关键字 daemon 参数会将进程 daemon 标志设置为 TrueFalse。如果为 None (默认值),则此标志将从创建进程继承。

默认情况下,不向 target 传递任何参数。args 参数默认为 (),可用于指定要传递给 target 的参数列表或元组。

如果子类覆盖了构造函数,则必须确保在对进程执行任何其他操作之前调用基类构造函数 (Process.__init__())。

在 3.3 版本中更改: 添加了 daemon 参数。

run()

表示进程活动的方法。

您可以在子类中重写此方法。标准的 run() 方法使用从 argskwargs 参数中获取的顺序参数和关键字参数,调用传递给对象构造函数作为目标参数的可调用对象(如果有)。

使用列表或元组作为传递给 Processargs 参数可以达到相同的效果。

示例

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

开始进程的活动。

每个进程对象最多只能调用一次。它安排在单独的进程中调用对象的 run() 方法。

join([timeout])

如果可选参数 timeoutNone (默认值),则该方法会阻塞,直到调用其 join() 方法的进程终止。如果 timeout 是正数,则最多阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None。检查进程的 exitcode 以确定它是否已终止。

一个进程可以被多次连接。

进程不能加入自身,因为这会导致死锁。尝试在进程启动之前加入进程是错误的。

name

进程的名称。该名称是一个字符串,仅用于标识目的。它没有语义。可以为多个进程指定相同的名称。

初始名称由构造函数设置。如果未向构造函数提供显式名称,则会构造一个形如“Process-N1:N2:…:Nk”的名称,其中每个 Nk 是其父级的第 N 个子级。

is_alive()

返回进程是否处于活动状态。

粗略地说,从 start() 方法返回到子进程终止时,进程对象都处于活动状态。

daemon

进程的守护程序标志,一个布尔值。必须在调用 start() 之前设置此值。

初始值从创建进程继承。

当进程退出时,它会尝试终止其所有守护进程子进程。

请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,则会留下其子进程孤立。此外,这些不是 Unix 守护程序或服务,它们是普通的进程,如果非守护进程已退出,则会被终止(而不是被连接)。

除了 threading.Thread API 之外,Process 对象还支持以下属性和方法:

pid

返回进程 ID。在进程生成之前,此值将为 None

exitcode

子进程的退出代码。如果进程尚未终止,则此值为 None

如果子进程的 run() 方法正常返回,则退出代码将为 0。如果它通过带有整数参数 Nsys.exit() 终止,则退出代码将为 N

如果子进程由于未在 run() 中捕获的异常而终止,则退出代码将为 1。如果它被信号 N 终止,则退出代码将为负值 -N

authkey

进程的身份验证密钥(字节字符串)。

multiprocessing 初始化时,主进程会被分配一个使用 os.urandom() 生成的随机字符串。

当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改此密钥。

请参阅 身份验证密钥

sentinel

一个系统对象的数字句柄,当进程结束时它将变为“就绪”。

如果您想使用 multiprocessing.connection.wait() 一次等待多个事件,则可以使用此值。否则,调用 join() 更简单。

在 Windows 上,这是一个可与 WaitForSingleObjectWaitForMultipleObjects 系列 API 调用一起使用的 OS 句柄。在 POSIX 上,这是一个可与 select 模块中的原语一起使用的文件描述符。

3.3 版本中新增。

terminate()

终止进程。在 POSIX 上,这是使用 SIGTERM 信号完成的;在 Windows 上,使用 TerminateProcess()。请注意,不会执行退出处理程序和 finally 子句等。

请注意,进程的后代进程将不会被终止 - 它们将简单地变为孤立进程。

警告

如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并且可能无法被其他进程使用。同样,如果该进程已获取锁或信号量等,则终止它可能会导致其他进程死锁。

kill()

terminate() 相同,但在 POSIX 上使用 SIGKILL 信号。

3.7 版本中新增。

close()

关闭 Process 对象,释放与其关联的所有资源。如果底层进程仍在运行,则会引发 ValueError。一旦 close() 成功返回,Process 对象的大多数其他方法和属性将引发 ValueError

3.7 版本中新增。

请注意,start()join()is_alive()terminate()exitcode 方法应仅由创建进程对象的进程调用。

一些 Process 方法的示例用法

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 异常的基类。

exception multiprocessing.BufferTooShort

当提供的缓冲区对象太小,无法读取消息时,Connection.recv_bytes_into() 引发的异常。

如果 eBufferTooShort 的实例,则 e.args[0] 将以字节字符串的形式给出消息。

exception multiprocessing.AuthenticationError

当出现身份验证错误时引发。

exception multiprocessing.TimeoutError

当超时时间到期时,具有超时的方法会引发此异常。

管道和队列

当使用多个进程时,通常使用消息传递来实现进程之间的通信,并避免使用任何同步原语(如锁)。

对于传递消息,可以使用 Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

QueueSimpleQueueJoinableQueue 类型是多生产者、多消费者 FIFO 队列,建模于标准库中的 queue.Queue 类。它们的区别在于,Queue 缺少 Python 2.5 的 queue.Queue 类中引入的 task_done()join() 方法。

如果使用 JoinableQueue,则**必须**对从队列中删除的每个任务调用 JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量可能会最终溢出,从而引发异常。

与其他 Python 队列实现的一个区别是,multiprocessing 队列使用 pickle 序列化放入其中的所有对象。get 方法返回的对象是重新创建的对象,它不与原始对象共享内存。

请注意,还可以使用管理器对象创建共享队列 - 请参阅 管理器

注意

multiprocessing 使用通常的 queue.Emptyqueue.Full 异常来指示超时。它们在 multiprocessing 命名空间中不可用,因此需要从 queue 导入它们。

注意

当一个对象被放入队列时,该对象会被序列化(pickled),然后后台线程会将序列化后的数据刷新到下层的管道中。这会产生一些有点出乎意料的后果,但应该不会造成任何实际困难——如果这些问题真的困扰你,那么你可以改用使用管理器创建的队列。

  1. 在将一个对象放入空队列后,可能会有一个极小的延迟,之后队列的empty()方法返回False,并且get_nowait()可以返回而不引发queue.Empty异常。

  2. 如果多个进程正在将对象入队,则对象可能以乱序的方式在另一端被接收。但是,同一个进程入队的多个对象之间始终会保持预期的顺序。

警告

如果一个进程在使用Queue时被使用Process.terminate()os.kill()终止,那么队列中的数据很可能会损坏。这可能会导致任何其他进程在稍后尝试使用该队列时出现异常。

警告

如上所述,如果一个子进程已将项目放入队列(并且它没有使用JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道中。

这意味着,除非你确定所有放入队列的项目都已被消耗,否则如果你尝试加入该进程,可能会出现死锁。类似地,如果子进程是非守护进程,那么父进程在退出时尝试加入其所有非守护子进程时可能会挂起。

请注意,使用管理器创建的队列没有这个问题。请参阅编程指南

有关使用队列进行进程间通信的示例,请参阅示例

multiprocessing.Pipe([duplex])

返回一对 (conn1, conn2) Connection 对象,表示管道的两端。

如果 duplexTrue (默认值),则管道是双向的。如果 duplexFalse,则管道是单向的:conn1 只能用于接收消息,而 conn2 只能用于发送消息。

send() 方法使用 pickle 序列化对象,而 recv() 则重新创建对象。

class multiprocessing.Queue([maxsize])

返回一个使用管道和一些锁/信号量实现的进程共享队列。当进程首次将一个项目放入队列时,会启动一个馈送线程,该线程将对象从缓冲区传输到管道中。

标准的 queue.Emptyqueue.Full 异常(来自标准库的 queue 模块)会被引发以指示超时。

Queue 实现了 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回队列的近似大小。由于多线程/多进程语义,此数字不可靠。

请注意,在像 macOS 这样未实现 sem_getvalue() 的平台上,这可能会引发 NotImplementedError 异常。

empty()

如果队列为空,则返回 True,否则返回 False。由于多线程/多进程语义,这是不可靠的。

可能会在关闭的队列上引发 OSError。(不保证)

full()

如果队列已满,则返回 True,否则返回 False。由于多线程/多进程语义,这是不可靠的。

put(obj[, block[, timeout]])

将 obj 放入队列。如果可选参数 blockTrue (默认值)并且 timeoutNone (默认值),则在有可用空闲槽位之前阻塞。如果 timeout 是一个正数,它会最多阻塞 timeout 秒,并且如果在该时间内没有可用的空闲槽位,则会引发 queue.Full 异常。否则(blockFalse),如果有可用的空闲槽位,则将项目放入队列,否则引发 queue.Full 异常(在这种情况下,timeout 将被忽略)。

在 3.8 版本中更改: 如果队列已关闭,则会引发 ValueError 异常,而不是 AssertionError 异常。

put_nowait(obj)

等效于 put(obj, False)

get([block[, timeout]])

从队列中移除并返回一个项目。如果可选参数 blockTrue (默认值)并且 timeoutNone (默认值),则在有可用项目之前阻塞。如果 timeout 是一个正数,它会最多阻塞 timeout 秒,并且如果在该时间内没有可用的项目,则会引发 queue.Empty 异常。否则(block 为 False),如果有可用的项目,则返回一个项目,否则引发 queue.Empty 异常(在这种情况下,timeout 将被忽略)。

在 3.8 版本中更改: 如果队列已关闭,则会引发 ValueError 异常,而不是 OSError 异常。

get_nowait()

等效于 get(False)

multiprocessing.Queue 具有一些 queue.Queue 中没有的额外方法。这些方法对于大多数代码通常是不必要的。

close()

表示当前进程将不再向此队列放入数据。一旦后台线程将所有缓冲数据刷新到管道,它就会退出。当队列被垃圾回收时,会自动调用此方法。

join_thread()

加入后台线程。此方法只能在 close() 被调用后使用。它会阻塞直到后台线程退出,确保缓冲区中的所有数据都被刷新到管道。

默认情况下,如果进程不是队列的创建者,则在退出时,它将尝试加入队列的后台线程。进程可以调用 cancel_join_thread() 来使 join_thread() 不执行任何操作。

cancel_join_thread()

防止 join_thread() 阻塞。特别是,这会阻止后台线程在进程退出时自动加入 —— 请参阅 join_thread()

这个方法更好的名称可能是 allow_exit_without_flush()。 它很可能会导致排队的数据丢失,而且你几乎肯定不需要使用它。 只有当您需要当前进程立即退出,而不等待将排队的数据刷新到底层管道,并且您不在乎丢失数据时,它才真正有用。

注意

此类的功能需要在主机操作系统上具有正常工作的共享信号量实现。如果没有,则此类的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。有关其他信息,请参阅 bpo-3770。对于下面列出的任何特殊队列类型,情况也是如此。

class multiprocessing.SimpleQueue

它是一个简化的 Queue 类型,非常接近锁定的 Pipe

close()

关闭队列:释放内部资源。

队列关闭后不得再使用。 例如,不得再调用 get(), put()empty() 方法。

在 3.9 版本中新增。

empty()

如果队列为空,则返回 True,否则返回 False

如果 SimpleQueue 已关闭,则始终引发 OSError

get()

从队列中移除并返回一个项目。

put(item)

item放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 的子类,它还具有 task_done()join() 方法。

task_done()

表示先前入队的任务已完成。由队列消费者使用。对于每个用于获取任务的 get(),随后调用 task_done() 会告诉队列该任务的处理已完成。

如果 join() 当前正在阻塞,则当所有项目都已处理完毕时,它将恢复(这意味着对于放入队列的每个项目,都收到了 task_done() 调用)。

如果调用的次数多于放入队列的项目数,则会引发 ValueError

join()

阻塞直到队列中的所有项目都已被获取和处理。

每当向队列添加一个项目时,未完成任务的计数就会增加。每当消费者调用 task_done() 以指示该项目已被检索并且所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 将解除阻塞。

杂项

multiprocessing.active_children()

返回当前进程的所有活动子进程的列表。

调用此函数会产生“加入”任何已完成的进程的副作用。

multiprocessing.cpu_count()

返回系统中的 CPU 数量。

此数字不等同于当前进程可以使用的 CPU 数量。 可用 CPU 的数量可以使用 os.process_cpu_count() (或 len(os.sched_getaffinity(0)))获取。

当无法确定 CPU 数量时,会引发 NotImplementedError

在 3.13 版本中变更: 返回值也可以使用 -X cpu_count 标志或 PYTHON_CPU_COUNT 覆盖,因为这仅仅是对 os cpu count API 的封装。

multiprocessing.current_process()

返回与当前进程对应的 Process 对象。

类似于 threading.current_thread()

multiprocessing.parent_process()

返回与 current_process() 的父进程对应的 Process 对象。对于主进程,parent_process 将为 None

3.8 版本中新增。

multiprocessing.freeze_support()

添加对使用 multiprocessing 的程序被冻结以生成 Windows 可执行文件时的支持。(已使用 py2exePyInstallercx_Freeze 进行测试。)

需要在主模块的 if __name__ == '__main__' 行之后立即调用此函数。例如:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略了 freeze_support() 行,则尝试运行冻结的可执行文件将引发 RuntimeError

在 Windows 以外的任何操作系统上调用 freeze_support() 均无效。此外,如果该模块在 Windows 上由 Python 解释器正常运行(程序未被冻结),则 freeze_support() 也无效。

multiprocessing.get_all_start_methods()

返回支持的启动方法列表,其中第一个是默认方法。可能的启动方法有 'fork''spawn''forkserver'。并非所有平台都支持所有方法。请参阅 上下文和启动方法

3.4 版本中新增。

multiprocessing.get_context(method=None)

返回一个上下文对象,该对象具有与 multiprocessing 模块相同的属性。

如果 methodNone,则返回默认上下文。否则,method 应为 'fork''spawn''forkserver'。如果指定的启动方法不可用,则引发 ValueError。请参阅 上下文和启动方法

3.4 版本中新增。

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的启动方法的名称。

如果启动方法尚未固定且 allow_none 为 false,则启动方法固定为默认值并返回名称。如果启动方法尚未固定且 allow_none 为 true,则返回 None

返回值可以是 'fork''spawn''forkserver'None。请参阅 上下文和启动方法

3.4 版本中新增。

在 3.8 版本中更改: 在 macOS 上,*spawn* 启动方法现在是默认方法。*fork* 启动方法应被视为不安全,因为它可能导致子进程崩溃。请参阅 bpo-33725

multiprocessing.set_executable(executable)

设置启动子进程时要使用的 Python 解释器的路径。(默认情况下使用 sys.executable)。嵌入器可能需要执行以下操作

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

然后才能创建子进程。

在 3.4 版本中更改: 当使用 'spawn' 启动方法时,现在在 POSIX 上受支持。

在 3.11 版本中更改: 接受 path-like object

multiprocessing.set_forkserver_preload(module_names)

设置 forkserver 主进程尝试导入的模块名称列表,以便 forked 进程继承其已导入的状态。执行此操作时,任何 ImportError 都将被静默忽略。这可以用作性能增强,以避免在每个进程中重复工作。

为了使其正常工作,必须在 forkserver 进程启动之前(在创建 Pool 或启动 Process 之前)调用它。

仅在使用 'forkserver' 启动方法时才有意义。请参阅 上下文和启动方法

3.4 版本中新增。

multiprocessing.set_start_method(method, force=False)

设置应用于启动子进程的方法。method 参数可以是 'fork''spawn''forkserver'。如果已设置启动方法且 force 不是 True,则会引发 RuntimeError。如果 methodNoneforceTrue,则启动方法设置为 None。如果 methodNoneforceFalse,则上下文设置为默认上下文。

请注意,此函数最多应调用一次,并且应在主模块的 if __name__ == '__main__' 子句中进行保护。

请参阅 上下文和启动方法

3.4 版本中新增。

连接对象

连接对象允许发送和接收可 pickle 的对象或字符串。可以将它们视为面向消息的已连接套接字。

连接对象通常使用 Pipe 创建 – 另请参阅 侦听器和客户端

class multiprocessing.connection.Connection
send(obj)

将一个对象发送到连接的另一端,该对象应使用 recv() 读取。

该对象必须是可序列化的。非常大的 pickle (大约 32 MiB+,虽然取决于操作系统) 可能会引发 ValueError 异常。

recv()

返回使用 send() 从连接另一端发送的对象。会阻塞直到有数据接收。如果没有任何内容可接收并且另一端已关闭,则引发 EOFError

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾回收时,会自动调用此方法。

poll([timeout])

返回是否有任何数据可供读取。

如果未指定 timeout,则会立即返回。如果 timeout 是一个数字,则指定阻塞的最大时间(以秒为单位)。如果 timeoutNone,则使用无限超时。

请注意,可以使用 multiprocessing.connection.wait() 一次轮询多个连接对象。

send_bytes(buffer[, offset[, size]])

类字节对象 发送字节数据作为完整消息。

如果给出了 offset,则从 buffer 中的该位置读取数据。如果给出了 size,则将从 buffer 中读取该字节数。非常大的缓冲区(大约 32 MiB+,虽然取决于操作系统)可能会引发 ValueError 异常。

recv_bytes([maxlength])

返回从连接另一端发送的字节数据的完整消息,并以字符串形式返回。会阻塞直到有数据接收。如果没有任何内容可接收并且另一端已关闭,则引发 EOFError

如果指定了 maxlength 并且消息长度大于 maxlength,则会引发 OSError,并且连接将不再可读。

在 3.3 版本中更改: 此函数过去引发 IOError,现在是 OSError 的别名。

recv_bytes_into(buffer[, offset])

将从连接另一端发送的字节数据的完整消息读取到 buffer 中,并返回消息中的字节数。会阻塞直到有数据接收。如果没有任何内容可接收并且另一端已关闭,则引发 EOFError

buffer 必须是可写的 类字节对象。如果给出了 offset,则将从该位置开始将消息写入缓冲区。Offset 必须是一个非负整数,并且小于 buffer 的长度(以字节为单位)。

如果缓冲区太短,则会引发 BufferTooShort 异常,并且完整消息可用作 e.args[0],其中 e 是异常实例。

在 3.3 版本中更改: 现在可以使用 Connection.send()Connection.recv() 在进程之间传输连接对象本身。

连接对象现在还支持上下文管理协议 - 请参见 上下文管理器类型__enter__() 返回连接对象,__exit__() 调用 close()

例如

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法会自动反序列化它接收的数据,除非您可以信任发送消息的进程,否则这可能存在安全风险。

因此,除非使用 Pipe() 生成了连接对象,否则您应仅在执行某种身份验证后才使用 recv()send() 方法。请参阅 身份验证密钥

警告

如果进程在尝试读取或写入管道时被终止,则管道中的数据很可能会损坏,因为可能无法确定消息边界在哪里。

同步原语

通常,在多进程程序中,同步原语不如在多线程程序中那么必要。请参阅 threading 模块的文档。

请注意,还可以通过使用管理器对象创建同步原语 - 请参阅 管理器

class multiprocessing.Barrier(parties[, action[, timeout]])

栅栏对象:threading.Barrier 的克隆。

3.3 版本中新增。

class multiprocessing.BoundedSemaphore([value])

有界信号量对象:与 threading.BoundedSemaphore 非常相似。

与其密切对应的对象存在一个细微的区别:它的 acquire 方法的第一个参数被命名为 block,这与 Lock.acquire() 一致。

注意

在 macOS 上,它与 Semaphore 没有区别,因为 sem_getvalue() 未在该平台上实现。

class multiprocessing.Condition([lock])

条件变量:threading.Condition 的别名。

如果指定了 lock,则它应该是来自 multiprocessingLockRLock 对象。

在 3.3 版本中更改: 添加了 wait_for() 方法。

class multiprocessing.Event

threading.Event 的一个克隆。

class multiprocessing.Lock

一个非递归锁对象:与 threading.Lock 非常相似。一旦进程或线程获取了锁,后续任何进程或线程尝试获取该锁都会被阻塞,直到该锁被释放;任何进程或线程都可以释放该锁。 threading.Lock 的概念和行为(在应用于线程时)在 multiprocessing.Lock 中被复制,应用于进程或线程,除非另有说明。

请注意,Lock 实际上是一个工厂函数,它返回一个使用默认上下文初始化的 multiprocessing.synchronize.Lock 实例。

Lock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

如果 block 参数设置为 True(默认值),则方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定状态并返回 True。请注意,第一个参数的名称与 threading.Lock.acquire() 中的名称不同。

如果 block 参数设置为 False,则方法调用不会阻塞。如果锁当前处于锁定状态,则返回 False;否则,将锁设置为锁定状态并返回 True

当使用正浮点值 timeout 调用时,只要无法获取锁,最多会阻塞 timeout 指定的秒数。使用负值 timeout 调用等效于 timeout 为零。使用 timeout 值为 None(默认值)的调用将超时时间设置为无限。请注意,timeout 的负值或 None 值的处理方式与 threading.Lock.acquire() 中实现的有所不同。如果 block 参数设置为 False,则 timeout 参数没有实际意义,因此会被忽略。如果已获取锁,则返回 True;如果超时时间已过,则返回 False

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与 threading.Lock.release() 中的行为相同,但当在未锁定的锁上调用时,会引发 ValueError

class multiprocessing.RLock

一个递归锁对象:与 threading.RLock 非常相似。递归锁必须由获取它的进程或线程释放。一旦进程或线程获取了递归锁,同一进程或线程可以再次获取它而不会阻塞;该进程或线程必须在每次获取后释放一次。

请注意,RLock 实际上是一个工厂函数,它返回一个使用默认上下文初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 上下文管理器 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

当使用 block 参数设置为 True 调用时,会阻塞,直到锁处于未锁定状态(不归任何进程或线程所有),除非锁已归当前进程或线程所有。然后,当前进程或线程获取锁的所有权(如果它尚未拥有所有权),并且锁内的递归级别加 1,导致返回值为 True。请注意,与 threading.RLock.acquire() 的实现相比,此第一个参数的行为存在一些差异,从参数本身的名称开始。

当使用 block 参数设置为 False 调用时,不会阻塞。如果锁已被另一个进程或线程获取(因此被拥有),则当前进程或线程不会获取所有权,并且锁内的递归级别不会更改,导致返回值为 False。如果锁处于未锁定状态,则当前进程或线程获取所有权,并且递归级别会递增,导致返回值为 True

timeout 参数的使用和行为与 Lock.acquire() 中的相同。请注意,timeout 的某些行为与 threading.RLock.acquire() 中实现的有所不同。

release()

释放锁,递归级别递减。如果在递减后递归级别为零,则将锁重置为未锁定状态(不归任何进程或线程所有),并且如果有任何其他进程或线程被阻塞等待锁变为未锁定状态,则允许其中一个继续进行。如果在递减后递归级别仍然不为零,则锁保持锁定状态,并由调用进程或线程拥有。

仅当调用进程或线程拥有锁时才调用此方法。如果此方法由所有者以外的进程或线程调用,或者如果锁处于未锁定(未拥有)状态,则会引发 AssertionError。请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中实现的有所不同。

class multiprocessing.Semaphore([value])

一个信号量对象:与 threading.Semaphore 非常相似。

与其密切对应的对象存在一个细微的区别:它的 acquire 方法的第一个参数被命名为 block,这与 Lock.acquire() 一致。

注意

在 macOS 上,不支持 sem_timedwait,因此使用超时调用 acquire() 将使用睡眠循环来模拟该函数的行为。

注意

此软件包的某些功能需要在主机操作系统上具有正常工作的共享信号量实现。如果没有,将禁用 multiprocessing.synchronize 模块,并且尝试导入它将导致 ImportError。有关更多信息,请参阅 bpo-3770

共享 ctypes 对象

可以使用共享内存创建可由子进程继承的共享对象。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回一个从共享内存分配的 ctypes 对象。默认情况下,返回值实际上是该对象的同步包装器。可以通过 Valuevalue 属性访问对象本身。

typecode_or_type 决定返回对象的类型:它可以是一个 ctypes 类型,也可以是 array 模块使用的那种单字符类型码。*args 会传递给该类型的构造函数。

如果 lockTrue(默认值),则会创建一个新的递归锁对象,用于同步对值的访问。如果 lock 是一个 LockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动受到锁的保护,因此它不一定是“进程安全的”。

诸如 += 这类涉及读取和写入的操作不是原子的。因此,例如,如果要原子地递增一个共享值,仅仅执行以下操作是不够的:

counter.value += 1

假设关联的锁是递归的(默认情况下是递归的),你可以改为执行以下操作:

with counter.get_lock():
    counter.value += 1

请注意,lock 是一个仅限关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回一个从共享内存分配的 ctypes 数组。默认情况下,返回值实际上是该数组的同步包装器。

typecode_or_type 决定返回数组元素的类型:它可以是一个 ctypes 类型,也可以是 array 模块使用的那种单字符类型码。如果 size_or_initializer 是一个整数,则它决定数组的长度,并且数组最初将被置零。否则,size_or_initializer 是一个序列,该序列用于初始化数组,并且其长度决定数组的长度。

如果 lockTrue(默认值),则会创建一个新的锁对象,用于同步对值的访问。如果 lock 是一个 LockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意,lock 是一个仅限关键字的参数。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许您使用它来存储和检索字符串。

multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了用于从共享内存分配 ctypes 对象的函数,这些对象可以被子进程继承。

注意

虽然可以在共享内存中存储指针,但请记住,这将引用特定进程的地址空间中的位置。但是,该指针很可能在第二个进程的上下文中无效,并且尝试从第二个进程取消引用该指针可能会导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回一个从共享内存分配的 ctypes 数组。

typecode_or_type 决定返回数组元素的类型:它可以是一个 ctypes 类型,也可以是 array 模块使用的那种单字符类型码。如果 size_or_initializer 是一个整数,则它决定数组的长度,并且数组最初将被置零。否则,size_or_initializer 是一个序列,该序列用于初始化数组,并且其长度决定数组的长度。

请注意,设置和获取元素可能不是原子的——请改用 Array() 来确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回一个从共享内存分配的 ctypes 对象。

typecode_or_type 决定返回对象的类型:它可以是一个 ctypes 类型,也可以是 array 模块使用的那种单字符类型码。*args 会传递给该类型的构造函数。

请注意,设置和获取值可能不是原子的——请改用 Value() 来确保使用锁自动同步访问。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许您使用它来存储和检索字符串——请参阅 ctypes 的文档。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,不同之处在于,根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始的 ctypes 数组。

如果 lockTrue(默认值),则会创建一个新的锁对象,用于同步对值的访问。如果 lock 是一个 LockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意,lock 是一个仅限关键字的参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,不同之处在于,根据 lock 的值,可能会返回一个进程安全的同步包装器,而不是原始的 ctypes 对象。

如果 lockTrue(默认值),则会创建一个新的锁对象,用于同步对值的访问。如果 lock 是一个 LockRLock 对象,则将使用该对象来同步对值的访问。如果 lockFalse,则对返回对象的访问不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意,lock 是一个仅限关键字的参数。

multiprocessing.sharedctypes.copy(obj)

返回一个从共享内存分配的 ctypes 对象,它是 ctypes 对象 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回一个 ctypes 对象的进程安全包装器对象,该对象使用 lock 来同步访问。如果 lockNone(默认值),则会自动创建一个 multiprocessing.RLock 对象。

同步包装器除了它包装的对象的方法外,还将有两个方法:get_obj() 返回包装的对象,get_lock() 返回用于同步的锁对象。

请注意,通过包装器访问 ctypes 对象可能比访问原始 ctypes 对象慢得多。

在 3.5 版本中更改: 同步对象支持 上下文管理器 协议。

下表比较了从共享内存创建共享 ctypes 对象的语法与常规 ctypes 语法。(在表中,MyStructctypes.Structure 的某个子类。)

ctypes

使用类型的 sharedctypes

使用类型码的 sharedctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

下面是一个示例,其中一些 ctypes 对象被子进程修改

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一种创建可以在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制一个服务器进程,该进程管理共享对象。其他进程可以通过使用代理来访问共享对象。

multiprocessing.Manager()

返回一个已启动的 SyncManager 对象,该对象可用于在进程之间共享对象。返回的管理器对象对应于一个已衍生的子进程,并具有可创建共享对象并返回相应代理的方法。

管理器进程会在它们被垃圾回收或其父进程退出时立即关闭。管理器类在 multiprocessing.managers 模块中定义。

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

创建一个 BaseManager 对象。

一旦创建,应该调用 start()get_server().serve_forever() 以确保管理器对象引用一个已启动的管理器进程。

address 是管理器进程侦听新连接的地址。如果 addressNone,则会选择一个任意地址。

authkey 是用于检查到服务器进程的传入连接有效性的身份验证密钥。如果 authkeyNone,则使用 current_process().authkey。否则,使用 authkey,并且它必须是一个字节字符串。

serializer 必须是 'pickle' (使用 pickle 序列化)或 'xmlrpclib' (使用 xmlrpc.client 序列化)。

ctx 是一个上下文对象,或 None (使用当前上下文)。请参阅 get_context() 函数。

shutdown_timeout 是一个以秒为单位的超时时间,用于等待管理器使用的进程在 shutdown() 方法中完成。如果关闭超时,则进程被终止。如果终止进程也超时,则进程被强制结束。

在 3.11 版本中更改: 添加了 shutdown_timeout 参数。

start([initializer[, initargs]])

启动一个子进程来启动管理器。如果 initializer 不是 None,则子进程会在启动时调用 initializer(*initargs)

get_server()

返回一个 Server 对象,该对象表示受管理器控制的实际服务器。Server 对象支持 serve_forever() 方法

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 另外还有一个 address 属性。

connect()

将本地管理器对象连接到远程管理器进程

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的进程。仅当已使用 start() 启动服务器进程时,此方法才可用。

可以多次调用此方法。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

一个类方法,可用于向管理器类注册类型或可调用对象。

typeid 是一个“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。

callable 是一个可调用对象,用于为该类型标识符创建对象。如果管理器实例将使用 connect() 方法连接到服务器,或者如果 create_method 参数为 False,则可以将其保留为 None

proxytypeBaseProxy 的子类,用于为具有此 typeid 的共享对象创建代理。如果为 None,则会自动创建一个代理类。

exposed 用于指定一系列方法名称,应允许此 typeid 的代理使用 BaseProxy._callmethod() 访问。(如果 exposedNone,则如果存在 proxytype._exposed_,则会改用它。)在未指定公开列表的情况下,共享对象的所有“公共方法”都将可以访问。(这里的“公共方法”是指任何具有 __call__() 方法并且其名称不以 '_' 开头的属性。)

method_to_typeid 是一个映射,用于指定应返回代理的那些公开方法的返回类型。它将方法名称映射到 typeid 字符串。(如果 method_to_typeidNone,则如果存在 proxytype._method_to_typeid_,则会改用它。)如果方法的名称不是此映射的键,或者如果映射为 None,则该方法返回的对象将按值复制。

create_method 确定是否应创建名称为 typeid 的方法,该方法可用于告知服务器进程创建新的共享对象并为其返回代理。默认情况下为 True

BaseManager 实例还具有一个只读属性

address

管理器使用的地址。

在 3.3 版本中更改: 管理器对象支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 启动服务器进程(如果尚未启动),然后返回管理器对象。__exit__() 调用 shutdown()

在以前的版本中,如果管理器的服务器进程尚未启动,则 __enter__() 不会启动该进程。

class multiprocessing.managers.SyncManager

BaseManager 的子类,可用于进程同步。此类型的对象由 multiprocessing.Manager() 返回。

它的方法会创建并返回用于跨进程同步的多种常用数据类型的 代理对象。这主要包括共享列表和字典。

Barrier(parties[, action[, timeout]])

创建一个共享的 threading.Barrier 对象并返回它的代理。

3.3 版本中新增。

BoundedSemaphore([value])

创建一个共享的 threading.BoundedSemaphore 对象并返回它的代理。

Condition([lock])

创建一个共享的 threading.Condition 对象并返回它的代理。

如果提供了 *lock*,则它应该是 threading.Lockthreading.RLock 对象的代理。

在 3.3 版本中更改: 添加了 wait_for() 方法。

Event()

创建一个共享的 threading.Event 对象并返回它的代理。

Lock()

创建一个共享的 threading.Lock 对象并返回它的代理。

Namespace()

创建一个共享的 Namespace 对象并返回它的代理。

Queue([maxsize])

创建一个共享的 queue.Queue 对象并返回它的代理。

RLock()

创建一个共享的 threading.RLock 对象并返回它的代理。

Semaphore([value])

创建一个共享的 threading.Semaphore 对象并返回它的代理。

Array(typecode, sequence)

创建一个数组并返回它的代理。

Value(typecode, value)

创建一个具有可写 value 属性的对象,并返回它的代理。

dict()
dict(mapping)
dict(sequence)

创建一个共享的 dict 对象并返回它的代理。

list()
list(sequence)

创建一个共享的 list 对象并返回它的代理。

在 3.6 版本中变更: 共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。

class multiprocessing.managers.Namespace

是可以向 SyncManager 注册的类型。

命名空间对象没有公共方法,但具有可写属性。它的表示形式显示其属性的值。

但是,当使用命名空间对象的代理时,以 '_' 开头的属性将是代理的属性,而不是引用对象的属性

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

自定义管理器

要创建自己的管理器,可以创建 BaseManager 的子类,并使用 register() 类方法向管理器类注册新类型或可调用对象。例如

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用远程管理器

可以在一台机器上运行管理器服务器,并允许客户端从其他机器上使用它(假设所涉及的防火墙允许这样做)。

运行以下命令会为单个共享队列创建一个服务器,远程客户端可以访问该队列

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以按如下方式访问服务器

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用上面客户端的代码远程访问它

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理对象

代理是一个 *引用* 共享对象的对象,该共享对象(可能)存在于不同的进程中。共享对象被称为代理的 *引用对象*。多个代理对象可能具有相同的引用对象。

代理对象具有调用其引用对象的相应方法的方法(尽管并非引用对象的每个方法都可以通过代理使用)。通过这种方式,代理的使用方式与引用对象相同

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

请注意,将 str() 应用于代理将返回引用对象的表示形式,而应用 repr() 将返回代理的表示形式。

代理对象的一个重要特性是它们是可序列化的,因此可以在进程之间传递。因此,引用对象可以包含 代理对象。这允许对这些托管列表、字典和其他 代理对象 进行嵌套

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

类似地,字典和列表代理可以相互嵌套

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果引用对象中包含标准(非代理)listdict 对象,则对这些可变值的修改不会通过管理器传播,因为代理无法知道何时修改了其中包含的值。但是,在容器代理中存储值(这会在代理对象上触发 __setitem__)会通过管理器传播,因此要有效地修改此类项目,可以将修改后的值重新分配给容器代理

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

这种方法可能不如对大多数用例使用嵌套的 代理对象 方便,但它也展示了对同步的控制程度。

注意

multiprocessing 中的代理类型不支持按值比较。例如,我们有

>>> manager.list([1,2,3]) == [1,2,3]
False

进行比较时,应改为使用引用对象的副本。

class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy 子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理对象的引用对象的方法的结果。

如果 proxy 是一个代理,其引用对象是 obj,那么表达式

proxy._callmethod(methodname, args, kwds)

将会计算表达式

getattr(obj, methodname)(*args, **kwds)

在管理器进程中。

返回的值将是调用结果的副本或新共享对象的代理 - 请参阅 BaseManager.register()method_to_typeid 参数的文档。

如果调用引发异常,则 _callmethod() 将重新引发该异常。如果在管理器进程中引发其他异常,则将其转换为 RemoteError 异常,并由 _callmethod() 引发。

请特别注意,如果 methodname 没有被 *公开*,则会引发异常。

以下是 _callmethod() 的使用示例

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用对象的副本。

如果引用对象不可 pickle,则会引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回引用对象的表示。

清理

代理对象使用弱引用回调,以便在被垃圾回收时,它会从拥有其引用对象的管理器中注销自身。

当不再有任何代理引用共享对象时,该共享对象将从管理器进程中删除。

进程池

可以使用 Pool 类创建一个进程池,该进程池将执行提交给它的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,它控制一个工作进程池,可以向该进程池提交作业。它支持带有超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工作进程的数量。如果 processesNone,则使用 os.process_cpu_count() 返回的数字。

如果 initializer 不为 None,则每个工作进程在启动时都会调用 initializer(*initargs)

maxtasksperchild 是一个工作进程在退出并被新的工作进程替换之前可以完成的任务数,以便释放未使用的资源。默认的 maxtasksperchildNone,这意味着工作进程将与池的生命周期一样长。

context 可用于指定用于启动工作进程的上下文。通常,使用函数 multiprocessing.Pool() 或上下文对象的 Pool() 方法创建池。在这两种情况下,都会适当地设置 context

请注意,池对象的方法只能由创建该池的进程调用。

警告

multiprocessing.pool 对象具有内部资源,需要通过将池用作上下文管理器或手动调用 close()terminate() 来进行适当的管理(像任何其他资源一样)。否则可能会导致进程在最终化时挂起。

请注意,不正确的是依赖垃圾回收器来销毁池,因为 CPython 不保证会调用池的 finalizer(有关更多信息,请参见 object.__del__())。

在 3.2 版本中更改: 添加了 maxtasksperchild 参数。

在 3.4 版本中更改: 添加了 context 参数。

在 3.13 版本中更改: processes 默认使用 os.process_cpu_count(),而不是 os.cpu_count()

注意

Pool 中的工作进程通常在其工作队列的整个持续时间内存活。在其他系统中(如 Apache、mod_wsgi 等)中,为了释放工作进程持有的资源,常见的一种模式是允许池中的工作进程仅完成一定量的工作,然后退出,进行清理,并生成一个新的进程来替换旧的进程。Poolmaxtasksperchild 参数向最终用户公开了此功能。

apply(func[, args[, kwds]])

使用参数 args 和关键字参数 kwds 调用 func。它会阻塞,直到结果准备就绪。鉴于此会阻塞,apply_async() 更适合并行执行工作。此外,func 仅在池的一个工作进程中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() 方法的变体,返回一个 AsyncResult 对象。

如果指定了 callback,则它应该是一个接受单个参数的可调用对象。当结果准备就绪时,callback 将应用于该结果,除非调用失败,在这种情况下,将改为应用 error_callback

如果指定了 error_callback,则它应该是一个接受单个参数的可调用对象。如果目标函数失败,则将使用异常实例调用 error_callback

回调应立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

是内置函数 map() 的并行等效函数(但它只支持一个可迭代参数,对于多个可迭代参数,请参见 starmap())。它会阻塞直到结果准备就绪。

此方法将可迭代对象分割成若干块,然后将其作为单独的任务提交给进程池。可以通过将 chunksize 设置为正整数来指定这些块的(大致)大小。

请注意,对于非常长的可迭代对象,这可能会导致较高的内存使用率。为了获得更好的效率,请考虑使用带有显式 chunksize 选项的 imap()imap_unordered()

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 方法的一个变体,它返回一个 AsyncResult 对象。

如果指定了 callback,则它应该是一个接受单个参数的可调用对象。当结果准备就绪时,callback 将应用于该结果,除非调用失败,在这种情况下,将改为应用 error_callback

如果指定了 error_callback,则它应该是一个接受单个参数的可调用对象。如果目标函数失败,则将使用异常实例调用 error_callback

回调应立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

map() 的惰性版本。

chunksize 参数与 map() 方法使用的参数相同。对于非常长的可迭代对象,使用较大的 chunksize 值可以使作业完成速度快得多,而不是使用默认值 1

此外,如果 chunksize1,则 imap() 方法返回的迭代器的 next() 方法有一个可选的 timeout 参数:如果结果无法在 timeout 秒内返回,next(timeout) 将引发 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,但返回的迭代器的结果顺序应被认为是任意的。(只有当只有一个工作进程时,才能保证顺序是“正确”的。)

starmap(func, iterable[, chunksize])

map() 类似,只是 iterable 的元素预计是可迭代的,并且会被解包为参数。

因此,[(1,2), (3, 4)]iterable 会产生 [func(1,2), func(3,4)]

3.3 版本中新增。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() 的组合,它迭代 iterable 的可迭代对象,并使用解包后的可迭代对象调用 func。返回一个结果对象。

3.3 版本中新增。

close()

阻止向池提交任何更多任务。一旦所有任务都已完成,工作进程将退出。

terminate()

立即停止工作进程,而不完成未完成的工作。当池对象被垃圾回收时,将立即调用 terminate()

join()

等待工作进程退出。必须在使用 join() 之前调用 close()terminate()

在 3.3 版本中变更: 池对象现在支持上下文管理协议 – 请参见 上下文管理器类型__enter__() 返回池对象,__exit__() 调用 terminate()

class multiprocessing.pool.AsyncResult

是由 Pool.apply_async()Pool.map_async() 返回的结果的类。

get([timeout])

当结果到达时返回结果。如果 timeout 不是 None 并且结果没有在 timeout 秒内到达,则会引发 multiprocessing.TimeoutError。如果远程调用引发了异常,则该异常将由 get() 重新引发。

wait([timeout])

等待直到结果可用或直到 timeout 秒过去。

ready()

返回调用是否已完成。

successful()

返回调用是否在未引发异常的情况下完成。如果结果尚未就绪,则会引发 ValueError 异常。

在 3.7 版本中变更: 如果结果尚未就绪,则会引发 ValueError 异常,而不是 AssertionError 异常。

以下示例演示了如何使用进程池

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

监听器和客户端

通常,进程之间的消息传递是使用队列或使用由 Pipe() 返回的 Connection 对象完成的。

但是,multiprocessing.connection 模块提供了一些额外的灵活性。它基本上为处理套接字或 Windows 命名管道提供了一个高级的面向消息的 API。它还支持使用 hmac 模块进行摘要认证,并支持同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

向连接的另一端发送一条随机生成的消息,并等待回复。

如果回复与使用 authkey 作为密钥的消息摘要匹配,则会向连接的另一端发送欢迎消息。否则,会引发 AuthenticationError 异常。

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用 authkey 作为密钥计算消息的摘要,然后将摘要发送回去。

如果没有收到欢迎消息,则会引发 AuthenticationError 异常。

multiprocessing.connection.Client(address[, family[, authkey]])

尝试建立与使用地址 address 的监听器的连接,返回一个 Connection

连接的类型由 family 参数确定,但通常可以省略此参数,因为它通常可以从 address 的格式推断出来。(请参阅 地址格式

如果给定了 authkey 且不为 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证挑战的密钥。如果 authkeyNone,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError 异常。请参阅 身份验证密钥

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

绑定套接字或 Windows 命名管道的包装器,它“监听”连接。

address 是监听器对象的绑定套接字或命名管道使用的地址。

注意

如果使用 '0.0.0.0' 的地址,则该地址在 Windows 上将不是可连接的端点。 如果需要可连接的端点,则应使用 '127.0.0.1'。

family 是要使用的套接字(或命名管道)的类型。它可以是字符串 'AF_INET' (用于 TCP 套接字)、'AF_UNIX' (用于 Unix 域套接字) 或 'AF_PIPE' (用于 Windows 命名管道) 中的一个。 在这些中,只有第一个保证可用。 如果 familyNone,则会从 address 的格式推断出 family。 如果 address 也为 None,则会选择默认值。 此默认值是假定为最快的可用 family。 请参阅 地址格式。请注意,如果 family'AF_UNIX' 且地址为 None,则将在使用 tempfile.mkstemp() 创建的私有临时目录中创建套接字。

如果监听器对象使用套接字,则 backlog (默认为 1) 会在绑定后传递给套接字的 listen() 方法。

如果给定了 authkey 且不为 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证挑战的密钥。如果 authkeyNone,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError 异常。请参阅 身份验证密钥

accept()

接受监听器对象的绑定套接字或命名管道上的连接,并返回一个 Connection 对象。如果尝试身份验证失败,则会引发 AuthenticationError 异常。

close()

关闭监听器对象的绑定套接字或命名管道。当监听器被垃圾回收时,会自动调用此方法。但是,建议显式调用它。

监听器对象具有以下只读属性

address

监听器对象正在使用的地址。

last_accepted

上次接受的连接的来源地址。如果此地址不可用,则为 None

在 3.3 版本中变更: 监听器对象现在支持上下文管理协议 – 请参阅 上下文管理器类型__enter__() 返回监听器对象,而 __exit__() 调用 close()

multiprocessing.connection.wait(object_list, timeout=None)

等待直到 object_list 中的对象准备就绪。返回 object_list 中准备就绪的对象的列表。如果 timeout 为浮点数,则调用最多会阻塞那么多秒。如果 timeoutNone,则将无限期阻塞。负超时等效于零超时。

对于 POSIX 和 Windows,如果对象满足以下条件,则可以出现在 object_list 中:

当有数据可从连接或套接字对象读取时,或者另一端已关闭时,该连接或套接字对象就绪。

POSIX: wait(object_list, timeout) 几乎等同于 select.select(object_list, [], [], timeout)。区别在于,如果 select.select() 被信号中断,它可能会引发错误号为 EINTROSError,而 wait() 不会。

Windows: object_list 中的项必须是一个可等待的整数句柄(根据 Win32 函数 WaitForMultipleObjects() 的文档定义),或者是一个具有 fileno() 方法的对象,该方法返回套接字句柄或管道句柄。(请注意,管道句柄和套接字句柄不是可等待的句柄。)

3.3 版本中新增。

示例

以下服务器代码创建一个监听器,它使用 'secret password' 作为身份验证密钥。然后它等待连接,并向客户端发送一些数据

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器,并接收来自服务器的一些数据

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用 wait() 来等待来自多个进程的消息

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 'AF_INET' 地址是一个 (hostname, port) 形式的元组,其中 hostname 是一个字符串,port 是一个整数。

  • 'AF_UNIX' 地址是一个表示文件系统中文件名的字符串。

  • 'AF_PIPE' 地址是 r'\\.\pipe\PipeName' 形式的字符串。要使用 Client() 连接到名为 ServerName 的远程计算机上的命名管道,应使用 r'\\ServerName\pipe\PipeName' 形式的地址代替。

请注意,默认情况下,任何以两个反斜杠开头的字符串都被假定为 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

身份验证密钥

当使用 Connection.recv 时,接收到的数据会自动反序列化。不幸的是,从不受信任的来源反序列化数据存在安全风险。因此,ListenerClient() 使用 hmac 模块提供摘要身份验证。

身份验证密钥是一个字节字符串,可以将其视为密码:一旦建立连接,两端都会要求证明另一端知道身份验证密钥。(证明两端都使用相同的密钥涉及通过连接发送密钥。)

如果请求身份验证但未指定身份验证密钥,则使用 current_process().authkey 的返回值(请参阅 Process)。此值将自动被当前进程创建的任何 Process 对象继承。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该密钥可用于在它们之间建立连接。

也可以使用 os.urandom() 生成合适的身份验证密钥。

日志记录

提供了一些日志记录支持。但是请注意,logging 包不使用进程共享锁,因此来自不同进程的消息可能会混淆(取决于处理程序类型)。

multiprocessing.get_logger()

返回 multiprocessing 使用的记录器。如有必要,将创建一个新的记录器。

首次创建时,记录器的级别为 logging.NOTSET,并且没有默认的处理程序。默认情况下,发送到此记录器的消息不会传播到根记录器。

请注意,在 Windows 上,子进程只会继承父进程记录器的级别 - 记录器的任何其他自定义设置都不会被继承。

multiprocessing.log_to_stderr(level=None)

此函数调用 get_logger(),除了返回由 get_logger 创建的记录器之外,它还添加一个处理程序,该处理程序使用格式 '[%(levelname)s/%(processName)s] %(message)s' 将输出发送到 sys.stderr。可以通过传递 level 参数来修改记录器的 levelname

以下是启用日志记录的示例会话

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表格,请参阅 logging 模块。

multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,但它只不过是 threading 模块的包装器。

特别是,multiprocessing.dummy 提供的 Pool 函数返回 ThreadPool 的实例,它是 Pool 的子类,支持所有相同的方法调用,但使用工作线程池而不是工作进程。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

线程池对象,用于控制可向其提交作业的工作线程池。ThreadPool 实例与 Pool 实例完全接口兼容,并且它们的资源也必须得到妥善管理,可以通过将池用作上下文管理器,或者手动调用 close()terminate() 来实现。

processes 是要使用的工作线程数。如果 processesNone,则使用 os.process_cpu_count() 返回的数字。

如果 initializer 不为 None,则每个工作进程在启动时都会调用 initializer(*initargs)

Pool 不同,maxtasksperchildcontext 不能被提供。

注意

ThreadPoolPool 共享相同的接口,后者是围绕进程池设计的,并且早于 concurrent.futures 模块的引入。因此,它继承了一些对于由线程支持的池没有意义的操作,并且它拥有自己的类型来表示异步作业的状态,AsyncResult,这种类型不被任何其他库理解。

用户通常应首选使用 concurrent.futures.ThreadPoolExecutor,它具有从一开始就围绕线程设计的更简单的接口,并且返回与包括 asyncio 在内的许多其他库兼容的 concurrent.futures.Future 实例。

编程指南

使用 multiprocessing 时,应遵守某些指南和习惯用法。

所有启动方法

以下内容适用于所有启动方法。

避免共享状态

应尽可能避免在进程之间移动大量数据。

最好坚持使用队列或管道进行进程间通信,而不是使用较低级别的同步原语。

可 Pickling 性

确保代理方法的参数是可 pickling 的。

代理的线程安全性

除非使用锁保护,否则不要从多个线程使用代理对象。

(不同进程使用*相同*代理永远不会有问题。)

加入僵尸进程

在 POSIX 上,当进程完成但未被加入时,它会变成僵尸。不应该有很多,因为每次启动新进程(或调用 active_children() 时),所有尚未加入的已完成进程都将被加入。另外,调用已完成进程的 Process.is_alive 将会加入该进程。即使如此,显式加入你启动的所有进程可能也是一个好习惯。

最好继承而不是 pickle/unpickle

当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型需要是可 pickling 的,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,你应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

避免终止进程

使用 Process.terminate 方法停止进程可能会导致进程当前正在使用的任何共享资源(例如锁、信号量、管道和队列)损坏或对其他进程不可用。

因此,最好只考虑在从不使用任何共享资源的进程上使用 Process.terminate

加入使用队列的进程

请记住,已将项目放入队列的进程在终止之前会等待,直到“馈送器”线程将所有缓冲的项目馈送到基础管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法来避免此行为。)

这意味着,无论何时使用队列,都需要确保在进程加入之前,最终会删除已放入队列中的所有项目。否则,你无法确定将项目放入队列的进程是否会终止。还要记住,非守护进程将自动加入。

一个会发生死锁的例子如下

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的解决方法是交换最后两行(或直接删除 p.join() 行)。

显式将资源传递给子进程

在 POSIX 上,使用 fork 启动方法,子进程可以使用全局资源利用在父进程中创建的共享资源。但是,最好将该对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还可以确保只要子进程仍然处于活动状态,该对象就不会在父进程中被垃圾回收。如果某些资源在父进程中被垃圾回收时被释放,这可能很重要。

因此例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应该重写为

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

注意将 sys.stdin 替换为“类似文件的对象”

multiprocessing 最初无条件调用

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中——这导致了进程内进程的问题。这已更改为

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

这解决了进程相互冲突导致文件描述符错误的根本问题,但是给将 sys.stdin() 替换为带有输出缓冲的“类文件对象”的应用程序带来了潜在的危险。这种危险是,如果多个进程在此类文件对象上调用 close(),则可能导致相同的数据多次刷新到该对象,从而导致损坏。

如果你编写了一个类文件对象并实现了自己的缓存,则可以通过在每次添加到缓存时存储 pid,并在 pid 更改时丢弃缓存来使其对 fork 安全。例如

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参见 bpo-5155bpo-5313bpo-5331

spawnforkserver 启动方法

有一些额外的限制不适用于 fork 启动方法。

更多的可 Pickling 性

确保 Process.__init__() 的所有参数都是可 pickling 的。另外,如果你子类化 Process,请确保在调用 Process.start 方法时,实例将是可 pickling 的。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,则它看到的值(如果有)可能与调用 Process.start 时父进程中的值不同。

但是,只是模块级常量的全局变量不会引起问题。

安全导入主模块

确保主模块可以被新的 Python 解释器安全地导入,而不会引起意外的副作用(例如启动新进程)。

例如,使用 spawnforkserver 启动方法运行以下模块将失败,并出现 RuntimeError

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

而是应该使用 if __name__ == '__main__': 来保护程序的“入口点”,如下所示:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程序是正常运行而不是被冻结,则可以省略 freeze_support() 行。)

这允许新生成的 Python 解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建了池(pool)或管理器(manager),则适用类似的限制。

示例

演示如何创建和使用自定义的管理器和代理

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

一个示例,展示如何使用队列来向一组工作进程提供任务并收集结果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()