multiprocessing.shared_memory — 用于跨进程直接访问的共享内存

源代码: Lib/multiprocessing/shared_memory.py

在 3.8 版本加入。


此模块提供了一个类 SharedMemory,用于分配和管理多核或对称多处理器 (SMP) 机器上一个或多个进程可访问的共享内存。为了协助共享内存的生命周期管理,尤其是在不同进程之间,BaseManager 的一个子类 SharedMemoryManager 也包含在 multiprocessing.managers 模块中。

在此模块中,共享内存指的是“POSIX 风格”的共享内存块(尽管不一定明确地如此实现),而不是指“分布式共享内存”。这种风格的共享内存允许不同的进程可能读取和写入易失内存的公共(或共享)区域。进程通常仅限于访问其自己的进程内存空间,但共享内存允许在进程之间共享数据,从而避免了在进程之间发送包含该数据的消息的需要。与通过磁盘、套接字或其他需要数据序列化/反序列化和复制的通信方式共享数据相比,通过内存直接共享数据可以提供显著的性能优势。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0, *, track=True)

创建 SharedMemory 类的实例,用于创建新的共享内存块或附加到现有的共享内存块。每个共享内存块都分配有一个唯一的名称。通过这种方式,一个进程可以使用特定名称创建一个共享内存块,而另一个进程可以使用相同的名称附加到该共享内存块。

作为跨进程共享数据的资源,共享内存块的生命周期可能长于创建它们的原始进程。当一个进程不再需要访问某个共享内存块(但其他进程可能仍需要)时,应调用 close() 方法。当任何进程都不再需要某个共享内存块时,应调用 unlink() 方法以确保正确清理。

参数:
  • name (str | None) – 所请求共享内存的唯一名称,指定为字符串。当创建新的共享内存块时,如果为名称提供 None (默认值),将生成一个新名称。

  • create (bool) – 控制是创建新的共享内存块 (True) 还是附加到现有的共享内存块 (False)。

  • size (int) – 创建新的共享内存块时请求的字节数。由于某些平台选择根据其内存页大小分配内存块,因此共享内存块的实际大小可能大于或等于请求的大小。当附加到现有共享内存块时,将忽略 size 参数。

  • track (bool) – 当为 True 时,在操作系统不自动执行此操作的平台上,将共享内存块注册到资源跟踪器进程。资源跟踪器确保即使所有其他有权访问内存的进程在未执行此操作的情况下退出,也能正确清理共享内存。使用 multiprocessing 设施从共同祖先创建的 Python 进程共享一个资源跟踪器进程,并且共享内存段的生命周期在这些进程之间自动处理。以任何其他方式创建的 Python 进程在启用 track 访问共享内存时将获得自己的资源跟踪器。这将导致共享内存被第一个终止的进程的资源跟踪器删除。为避免此问题,当已存在另一个进程执行簿记时,subprocess 或独立 Python 进程的用户应将 track 设置为 Falsetrack 在 Windows 上被忽略,Windows 有自己的跟踪机制,并在所有句柄都关闭时自动删除共享内存。

3.13 版本中的变化: 添加了 track 参数。

close()

关闭此实例到共享内存的文件描述符/句柄。一旦不再需要从此实例访问共享内存块,就应调用 close()。根据操作系统,即使所有句柄都已关闭,底层内存也可能不会被释放。为确保正确清理,请使用 unlink() 方法。

删除底层共享内存块。每个共享内存块只能调用一次此方法,无论有多少句柄指向它,即使在其他进程中也是如此。unlink()close() 可以按任意顺序调用,但在 unlink() 之后尝试访问共享内存块中的数据可能会导致内存访问错误,具体取决于平台。

此方法在 Windows 上无效,在 Windows 上,删除共享内存块的唯一方法是关闭所有句柄。

buf

共享内存块内容的 memoryview。

name

对共享内存块唯一名称的只读访问。

size

对共享内存块大小(以字节为单位)的只读访问。

以下示例演示了 SharedMemory 实例的低级使用

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例演示了 SharedMemory 类与 NumPy 数组 的实际应用,从两个不同的 Python shell 访问相同的 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

multiprocessing.managers.BaseManager 的子类,可用于管理跨进程的共享内存块。

SharedMemoryManager 实例调用 start() 会启动一个新进程。此新进程的唯一目的是管理通过它创建的所有共享内存块的生命周期。要触发释放由该进程管理的所有共享内存块,请在该实例上调用 shutdown()。这会触发对由该进程管理的所有 SharedMemory 对象调用 unlink(),然后停止该进程本身。通过 SharedMemoryManager 创建 SharedMemory 实例,我们避免了手动跟踪和触发释放共享内存资源的需要。

此类提供了创建和返回 SharedMemory 实例以及创建由共享内存支持的类列表对象 (ShareableList) 的方法。

请参阅 BaseManager 以了解继承的 addressauthkey 可选输入参数的描述以及它们如何用于从其他进程连接到现有 SharedMemoryManager 服务。

SharedMemory(size)

创建一个新的 SharedMemory 对象并返回它,其大小为指定的 size 字节。

ShareableList(sequence)

创建一个新的 ShareableList 对象并返回它,该对象由输入 sequence 中的值初始化。

以下示例演示了 SharedMemoryManager 的基本机制

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下示例描述了一种使用 SharedMemoryManager 对象更方便的模式,通过 with 语句确保所有共享内存块在不再需要时都被释放

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

当在 with 语句中使用 SharedMemoryManager 时,使用该管理器创建的所有共享内存块都会在 with 语句的代码块执行完毕时被释放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一个可变类列表对象,其中存储的所有值都存储在共享内存块中。这限制了可存储值为以下内置数据类型

  • int(有符号 64 位)

  • 浮点数

  • bool

  • str(编码为 UTF-8 时每个小于 10M 字节)

  • bytes(每个小于 10M 字节)

  • None

它还与内置的 list 类型显著不同,这些列表不能改变其总长度(例如不支持 append()insert() 等),并且不支持通过切片动态创建新的 ShareableList 实例。

sequence 用于填充新的 ShareableList 值。设置为 None 则通过其唯一的共享内存名称附加到已存在的 ShareableList

name 是请求共享内存的唯一名称,如 SharedMemory 的定义中所述。当附加到现有 ShareableList 时,请指定其共享内存块的唯一名称,同时将 sequence 设置为 None

备注

对于 bytesstr 值存在一个已知问题。如果它们以 \x00 空字节或字符结尾,当通过索引从 ShareableList 获取它们时,这些字节或字符可能会被 静默删除。这种 .rstrip(b'\x00') 行为被认为是一个 bug,将来可能会消失。请参阅 gh-106939

对于尾部空值截断会成为问题的应用程序,可以通过在存储此类值时始终无条件地在其末尾附加一个额外的非 0 字节并在获取时无条件地将其移除来解决此问题

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

返回 value 出现的次数。

index(value)

返回 value 的第一个索引位置。如果 value 不存在,则引发 ValueError

format

只读属性,包含所有当前存储值使用的 struct 打包格式。

shm

存储值的 SharedMemory 实例。

以下示例演示了 ShareableList 实例的基本用法

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了通过提供其背后共享内存块的名称,一个、两个或多个进程如何访问相同的 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下示例演示了 ShareableList(以及底层 SharedMemory)对象在需要时可以进行 pickle 和 unpickle。请注意,它仍将是相同的共享对象。之所以会这样,是因为反序列化的对象具有相同的唯一名称,并且只是附加到具有相同名称的现有对象(如果该对象仍然存在)

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()