multiprocessing.shared_memory — 进程间直接访问的共享内存

源代码: Lib/multiprocessing/shared_memory.py

3.8 版新增。


此模块提供了一个类 SharedMemory,用于分配和管理要在多核或对称多处理器 (SMP) 机器上的一个或多个进程之间访问的共享内存。为了帮助管理共享内存的生命周期,尤其是在不同进程之间,BaseManager 的子类 SharedMemoryManager 也在 multiprocessing.managers 模块中提供。

在此模块中,共享内存指的是“POSIX 风格”的共享内存块(尽管不一定明确地实现为这样),而不是指“分布式共享内存”。这种风格的共享内存允许不同的进程潜在地读写易失性内存的公共(或共享)区域。通常情况下,进程只能访问自己的进程内存空间,但共享内存允许在进程之间共享数据,从而避免了在包含数据的进程之间发送消息的需要。与通过磁盘、套接字或其他需要序列化/反序列化和复制数据的通信方式共享数据相比,直接通过内存共享数据可以显著提高性能。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

创建 SharedMemory 类的实例,用于创建新的共享内存块或附加到现有的共享内存块。每个共享内存块都被分配了一个唯一的名称。这样,一个进程可以使用特定的名称创建一个共享内存块,而另一个进程可以使用相同的名称附加到同一个共享内存块。

作为跨进程共享数据的资源,共享内存块的生存期可能超过创建它们的原始进程。当一个进程不再需要访问其他进程可能仍然需要的共享内存块时,应该调用 close() 方法。当任何进程都不再需要共享内存块时,应该调用 unlink() 方法以确保正确清理。

参数:
  • name (str | None) – 请求的共享内存的唯一名称,指定为字符串。创建新的共享内存块时,如果为名称提供 None(默认值),则将生成一个新的名称。

  • create (bool) – 控制是创建新的共享内存块 (True) 还是附加到现有的共享内存块 (False)。

  • size (int) – 创建新的共享内存块时请求的字节数。由于某些平台选择根据平台的内存页面大小分配内存块,因此共享内存块的确切大小可能大于或等于请求的大小。附加到现有的共享内存块时,将忽略 size 参数。

close()

从此实例关闭对共享内存的访问。为了确保正确清理资源,一旦不再需要该实例,所有实例都应该调用 close()。请注意,调用 close() 不会导致共享内存块本身被销毁。

请求销毁底层的共享内存块。为了确保正确清理资源,应该在所有需要共享内存块的进程中调用一次(且仅调用一次)unlink()。在请求销毁后,共享内存块可能会也可能不会立即被销毁,并且此行为在不同平台上可能有所不同。在调用 unlink() 后尝试访问共享内存块中的数据可能会导致内存访问错误。注意:放弃对共享内存块的最后一个进程可以按任意顺序调用 unlink()close()

buf

共享内存块内容的 memoryview。

name

对共享内存块的唯一名称的只读访问。

size

对共享内存块大小(以字节为单位)的只读访问。

以下示例演示了 SharedMemory 实例的低级用法

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例演示了 SharedMemory 类与 NumPy 数组 的实际用法,从两个不同的 Python shell 访问同一个 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

multiprocessing.managers.BaseManager 的子类,可用于管理跨进程的共享内存块。

SharedMemoryManager 实例上调用 start() 会启动一个新进程。这个新进程的唯一目的是管理通过它创建的所有共享内存块的生命周期。要触发释放该进程管理的所有共享内存块,请在实例上调用 shutdown()。这将对该进程管理的所有 SharedMemory 对象触发 unlink() 调用,然后停止进程本身。通过 SharedMemoryManager 创建 SharedMemory 实例,我们避免了手动跟踪和触发释放共享内存资源的需要。

此类提供了创建和返回 SharedMemory 实例以及创建由共享内存支持的类似列表的对象(ShareableList)的方法。

有关继承的可选输入参数 *address* 和 *authkey* 的描述,以及如何使用它们从其他进程连接到现有的 SharedMemoryManager 服务,请参阅 BaseManager

SharedMemory(size)

创建并返回一个新的 SharedMemory 对象,其指定 *size* 以字节为单位。

ShareableList(sequence)

创建并返回一个新的 ShareableList 对象,由输入 *sequence* 中的值初始化。

以下示例演示了 SharedMemoryManager 的基本机制

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下示例描述了一种可能更方便的使用 SharedMemoryManager 对象的模式,即通过 with 语句来确保所有共享内存块在不再需要时都被释放

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with 语句中使用 SharedMemoryManager 时,使用该管理器创建的共享内存块将在 with 语句的代码块完成执行后全部释放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一个可变的类似列表的对象,其中存储的所有值都存储在一个共享内存块中。这将可存储的值限制为以下内置数据类型

  • int(有符号 64 位)

  • float

  • bool

  • str(以 UTF-8 编码时,每个字符串小于 10MB)

  • bytes(每个小于 10MB)

  • None

它与内置的 list 类型也有显著的不同,因为这些列表不能改变它们的总长度(即没有 append()insert() 等),并且不支持通过切片动态创建新的 ShareableList 实例。

*sequence* 用于填充一个新的、充满值的 ShareableList。设置为 None 则可以通过其唯一的共享内存名称附加到已存在的 ShareableList

*name* 是请求的共享内存的唯一名称,如 SharedMemory 的定义中所述。当附加到现有的 ShareableList 时,请指定其共享内存块的唯一名称,同时将 *sequence* 设置为 None

注意

bytesstr 值存在一个已知问题。如果它们以 \x00 空字节或字符结尾,则从 ShareableList 中按索引获取它们时,这些字节或字符可能会被 *静默地删除*。这种 .rstrip(b'\x00') 行为被认为是一个错误,将来可能会消失。请参阅 gh-106939

对于删除尾随空值会导致问题的应用程序,可以通过在存储此类值时始终无条件地在末尾追加一个非 0 字节,并在获取时无条件地将其删除来解决此问题

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

返回 *value* 的出现次数。

index(value)

返回 value 的第一个索引位置。如果 value 不存在,则引发 ValueError

format

只读属性,包含当前存储的所有值使用的 struct 打包格式。

shm

存储值的 SharedMemory 实例。

以下示例演示了 ShareableList 实例的基本用法

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了一个、两个或多个进程如何通过提供其背后的共享内存块的名称来访问同一个 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下示例演示了如果需要,可以对 ShareableList(以及底层的 SharedMemory)对象进行序列化和反序列化。请注意,它仍然是同一个共享对象。发生这种情况是因为反序列化的对象具有相同的唯一名称,并且只是附加到具有相同名称的现有对象(如果该对象仍然存在)。

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()