multiprocessing.shared_memory
— 跨进程直接访问的共享内存¶
源代码: Lib/multiprocessing/shared_memory.py
3.8 版本加入。
该模块提供了一个类 SharedMemory
,用于分配和管理共享内存,以便在多核或对称多处理器 (SMP) 机器上被一个或多个进程访问。为了帮助管理共享内存的生命周期,尤其是在不同的进程之间,还提供了 BaseManager
的子类 SharedMemoryManager
,该子类在 multiprocessing.managers
模块中。
在此模块中,共享内存是指“POSIX 风格”的共享内存块(尽管不一定明确地以此方式实现),而不是指“分布式共享内存”。这种风格的共享内存允许不同的进程潜在地读写易失性内存的公共(或共享)区域。进程通常被限制为只能访问它们自己的进程内存空间,但共享内存允许在进程之间共享数据,从而避免了在进程之间发送包含该数据的消息的需要。与通过磁盘或套接字或其他需要序列化/反序列化和复制数据的通信方式共享数据相比,通过内存直接共享数据可以提供显著的性能优势。
- class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0, *, track=True)¶
创建
SharedMemory
类的实例,用于创建新的共享内存块或附加到现有的共享内存块。每个共享内存块都分配有一个唯一的名称。通过这种方式,一个进程可以使用特定的名称创建一个共享内存块,而另一个进程可以使用相同的名称附加到该共享内存块。作为跨进程共享数据的资源,共享内存块的生存时间可能比创建它们的原始进程更长。当一个进程不再需要访问其他进程可能仍然需要的共享内存块时,应该调用
close()
方法。当任何进程都不再需要共享内存块时,应调用unlink()
方法以确保正确清理。- 参数:
name (str | None) – 请求的共享内存的唯一名称,指定为字符串。当创建新的共享内存块时,如果为名称提供
None
(默认值),则会生成一个新的名称。create (bool) – 控制是创建新的共享内存块 (
True
) 还是附加到现有的共享内存块 (False
)。size (int) – 创建新的共享内存块时请求的字节数。由于某些平台选择根据该平台的内存页面大小分配内存块,因此共享内存块的实际大小可能大于或等于请求的大小。当附加到现有的共享内存块时,将忽略 size 参数。
track (bool) – 当
True
时,在操作系统不自动执行此操作的平台上,使用资源跟踪器进程注册共享内存块。即使所有其他有权访问内存的进程在没有这样做的情况下退出,资源跟踪器也可以确保正确清理共享内存。使用multiprocessing
工具从公共祖先创建的 Python 进程共享一个资源跟踪器进程,并且这些进程之间会自动处理共享内存段的生命周期。当启用 track 时,以任何其他方式创建的 Python 进程都将收到它们自己的资源跟踪器。这将导致共享内存被第一个终止的进程的资源跟踪器删除。为了避免此问题,当已经有另一个进程负责记账时,subprocess
的用户或独立的 Python 进程应将 track 设置为False
。在 Windows 上会忽略 track,Windows 有自己的跟踪,并且当所有句柄都关闭时会自动删除共享内存。
在 3.13 版本中更改: 添加了 track 参数。
- close()¶
从此实例关闭到共享内存的文件描述符/句柄。一旦不再需要从此实例访问共享内存块,就应该调用
close()
。根据操作系统,即使所有句柄都已关闭,底层内存也可能不会被释放。为了确保正确清理,请使用unlink()
方法。
- unlink()¶
删除底层的共享内存块。无论有多少句柄(即使在其他进程中),每个共享内存块都应只调用一次此方法。
unlink()
和close()
可以以任何顺序调用,但在unlink()
之后尝试访问共享内存块中的数据可能会导致内存访问错误,具体取决于平台。此方法在 Windows 上不起作用,在 Windows 上删除共享内存块的唯一方法是关闭所有句柄。
- buf¶
共享内存块内容的内存视图。
- name¶
对共享内存块的唯一名称的只读访问。
- size¶
对共享内存块的字节大小的只读访问。
以下示例演示了 SharedMemory
实例的底层用法
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
>>> buffer[4] = 100 # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5]) # Access via shm_a
b'howdy'
>>> shm_b.close() # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink() # Call unlink only once to release the shared memory
以下示例演示了 SharedMemory
类与 NumPy 数组 的实际用法,从两个不同的 Python shell 访问相同的 numpy.ndarray
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:] # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name # We did not specify a name so one was chosen for us
'psm_21467_46075'
>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([ 1, 1, 2, 3, 5, 888])
>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([ 1, 1, 2, 3, 5, 888])
>>> # Clean up from within the second Python shell
>>> del c # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()
>>> # Clean up from within the first Python shell
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink() # Free and release the shared memory block at the very end
- class multiprocessing.managers.SharedMemoryManager([address[, authkey]])¶
multiprocessing.managers.BaseManager
的子类,可用于跨进程管理共享内存块。调用
start()
方法会在SharedMemoryManager
实例上启动一个新进程。这个新进程的唯一目的是管理通过它创建的所有共享内存块的生命周期。要触发释放该进程管理的所有共享内存块,请调用该实例上的shutdown()
方法。这将触发对该进程管理的所有SharedMemory
对象调用unlink()
方法,然后停止进程本身。通过SharedMemoryManager
创建SharedMemory
实例,可以避免手动跟踪和触发共享内存资源的释放。此类提供了创建和返回
SharedMemory
实例以及创建由共享内存支持的类似列表的对象 (ShareableList
) 的方法。有关继承的 *address* 和 *authkey* 可选输入参数的说明以及如何使用它们从其他进程连接到现有的
SharedMemoryManager
服务,请参阅BaseManager
。- SharedMemory(size)¶
创建并返回一个新的
SharedMemory
对象,其大小为指定的 *size* (以字节为单位)。
- ShareableList(sequence)¶
创建并返回一个新的
ShareableList
对象,该对象由输入 *sequence* 中的值初始化。
以下示例演示了 SharedMemoryManager
的基本机制
>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start() # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl
以下示例描述了使用 SharedMemoryManager
对象的一种可能更方便的模式,通过 with
语句来确保所有共享内存块在不再需要后被释放
>>> with SharedMemoryManager() as smm:
... sl = smm.ShareableList(range(2000))
... # Divide the work among two processes, storing partial results in sl
... p1 = Process(target=do_work, args=(sl, 0, 1000))
... p2 = Process(target=do_work, args=(sl, 1000, 2000))
... p1.start()
... p2.start() # A multiprocessing.Pool might be more efficient
... p1.join()
... p2.join() # Wait for all work to complete in both processes
... total_result = sum(sl) # Consolidate the partial results now in sl
在 with
语句中使用 SharedMemoryManager
时,使用该管理器创建的共享内存块将在 with
语句的代码块执行完毕后全部释放。
- class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)¶
提供一个可变的类似列表的对象,其中存储的所有值都存储在共享内存块中。这会将可存储的值限制为以下内置数据类型
它与内置的
list
类型也明显不同,因为这些列表无法更改其总长度(即,没有append()
、insert()
等),并且不支持通过切片动态创建新的ShareableList
实例。sequence 用于填充包含值的新
ShareableList
。设置为None
以通过其唯一的共享内存名称附加到已存在的ShareableList
。name 是请求的共享内存的唯一名称,如
SharedMemory
的定义中所述。在附加到现有的ShareableList
时,请指定其共享内存块的唯一名称,同时将 *sequence* 设置为None
。注意
对于
bytes
和str
值存在已知问题。如果它们以\x00
空字节或字符结尾,则在通过索引从ShareableList
中获取它们时,这些字节或字符可能会被静默删除。这种.rstrip(b'\x00')
行为被认为是错误,将来可能会消失。请参阅 gh-106939。对于尾随空值的删除是一个问题的应用程序,通过始终无条件地在存储此类值时在其末尾附加一个额外的非 0 字节,并在获取时无条件地删除它来解决此问题
>>> from multiprocessing import shared_memory >>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00']) >>> nul_bug_demo[0] '?' >>> nul_bug_demo[1] b'\x03\x02\x01' >>> nul_bug_demo.shm.unlink() >>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07']) >>> padded[0][:-1] '?\x00' >>> padded[1][:-1] b'\x03\x02\x01\x00\x00\x00' >>> padded.shm.unlink()
- count(value)¶
返回 *value* 出现的次数。
- index(value)¶
返回 *value* 的第一个索引位置。如果不存在 *value*,则引发
ValueError
。
- shm¶
存储值的
SharedMemory
实例。
以下示例演示了 ShareableList
实例的基本用法
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice' # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a # Use of a ShareableList after call to unlink() is unsupported
以下示例描述了一个、两个或多个进程如何通过提供其背后的共享内存块的名称来访问相同的 ShareableList
>>> b = shared_memory.ShareableList(range(5)) # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()
以下示例演示了如果需要,可以 pickle 和 unpickle ShareableList
(和底层 SharedMemory
)对象。 请注意,它仍然是同一个共享对象。之所以会这样,是因为反序列化的对象具有相同的唯一名称,并且只是附加到具有相同名称的现有对象(如果该对象仍然存在)
>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()