heapq
— 堆队列算法¶
源代码: Lib/heapq.py
此模块实现了堆队列算法,也称为优先队列算法。
堆是一种二叉树,其中每个父节点的值都小于或等于其任何子节点的值。我们将此条件称为堆不变量。
此实现使用数组,其中对于所有 k(从零开始计数元素),heap[k] <= heap[2*k+1]
且 heap[k] <= heap[2*k+2]
。为了进行比较,不存在的元素被认为是无限大的。堆的一个有趣特性是它的最小元素始终是根节点,即 heap[0]
。
以下 API 与教科书中的堆算法有两个方面的不同:(a) 我们使用从零开始的索引。这使得节点索引与其子节点索引之间的关系稍微不那么明显,但更适合 Python,因为 Python 使用从零开始的索引。(b) 我们的 pop 方法返回最小的项,而不是最大的项(在教科书中称为“最小堆”;“最大堆”在文本中更常见,因为它适用于原地排序)。
这两个方面使得可以将堆视为一个常规的 Python 列表,而不会感到意外:heap[0]
是最小的项,并且 heap.sort()
维护堆不变量!
要创建堆,请使用初始化为 []
的列表,或者可以通过函数 heapify()
将填充的列表转换为堆。
提供了以下函数
- heapq.heappush(heap, item)¶
将值 item 推送到 heap 上,并维护堆不变量。
- heapq.heappop(heap)¶
弹出并返回 heap 中最小的项,并维护堆不变量。如果堆为空,则引发
IndexError
。要访问最小的项而不弹出它,请使用heap[0]
。
- heapq.heappushpop(heap, item)¶
将 item 推送到堆上,然后弹出并返回堆中最小的项。组合操作比先调用
heappush()
然后单独调用heappop()
更有效率。
- heapq.heapify(x)¶
将列表 x 原地转换为堆,时间复杂度为线性时间。
- heapq.heapreplace(heap, item)¶
弹出并返回 heap 中最小的项,并推送新的 item。堆的大小不变。如果堆为空,则引发
IndexError
。此一步操作比先执行
heappop()
然后执行heappush()
更有效率,并且在使用固定大小的堆时可能更合适。弹出/推送组合始终从堆中返回一个元素,并将其替换为 item。返回值可能大于添加的 item。如果不希望这样,请考虑改用
heappushpop()
。它的推送/弹出组合返回两个值中较小的一个,并将较大的值保留在堆上。
该模块还提供了三个基于堆的通用函数。
- heapq.merge(*iterables, key=None, reverse=False)¶
将多个已排序的输入合并为一个已排序的输出(例如,合并来自多个日志文件的带时间戳的条目)。返回已排序值的 迭代器。
类似于
sorted(itertools.chain(*iterables))
,但返回一个迭代器,不会一次性将所有数据拉入内存,并假设每个输入流都已排序(从小到大)。有两个可选参数,必须作为关键字参数指定。
key 指定一个 键函数,该函数接受一个参数,用于从每个输入元素中提取比较键。默认值为
None
(直接比较元素)。reverse 是一个布尔值。如果设置为
True
,则合并输入元素时,就好像每次比较都被反转一样。为了实现类似于sorted(itertools.chain(*iterables), reverse=True)
的行为,所有迭代器都必须从大到小排序。在 3.5 版更改: 添加了可选的 key 和 reverse 参数。
- heapq.nlargest(n, iterable, key=None)¶
返回由 iterable 定义的数据集中 n 个最大元素的列表。如果提供 key,则指定一个参数的函数,该函数用于从 iterable 中的每个元素中提取比较键(例如,
key=str.lower
)。等效于:sorted(iterable, key=key, reverse=True)[:n]
。
- heapq.nsmallest(n, iterable, key=None)¶
返回由 iterable 定义的数据集中 n 个最小元素的列表。如果提供 key,则指定一个参数的函数,该函数用于从 iterable 中的每个元素中提取比较键(例如,
key=str.lower
)。等效于:sorted(iterable, key=key)[:n]
。
后两个函数在 n 值较小时性能最佳。对于较大的值,使用 sorted()
函数效率更高。此外,当 n==1
时,使用内置的 min()
和 max()
函数效率更高。如果需要重复使用这些函数,请考虑将可迭代对象转换为实际的堆。
基本示例¶
可以通过将所有值推送到堆上,然后一次弹出最小的值来实现 堆排序
>>> def heapsort(iterable):
... h = []
... for value in iterable:
... heappush(h, value)
... return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
这类似于 sorted(iterable)
,但与 sorted()
不同,此实现不稳定。
堆元素可以是元组。这对于在跟踪主要记录的同时分配比较值(例如任务优先级)非常有用
>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')
优先级队列实现说明¶
优先级队列 是堆的常见用途,它提出了一些实现挑战
排序稳定性:如何按最初添加的顺序返回具有相同优先级的两个任务?
如果优先级相同且任务没有默认比较顺序,则元组比较对于(优先级,任务)对会中断。
如果任务的优先级发生变化,如何将其移动到堆中的新位置?
或者,如果需要删除挂起的任务,如何找到它并将其从队列中删除?
解决前两个挑战的方法是将条目存储为包含优先级、条目计数和任务的 3 元素列表。条目计数用作决胜局,以便按添加顺序返回具有相同优先级的两个任务。并且由于没有两个条目计数相同,因此元组比较永远不会尝试直接比较两个任务。
解决不可比较任务问题的另一种方法是创建一个包装类,该类忽略任务项并且仅比较优先级字段
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)
剩下的挑战围绕着找到挂起的任务并更改其优先级或将其完全删除。可以使用指向队列中条目的字典来查找任务。
删除条目或更改其优先级比较困难,因为它会破坏堆结构不变性。因此,一种可能的解决方案是将条目标记为已删除,并添加一个具有修改后优先级的新条目
pq = [] # list of entries arranged in a heap
entry_finder = {} # mapping of tasks to entries
REMOVED = '<removed-task>' # placeholder for a removed task
counter = itertools.count() # unique sequence count
def add_task(task, priority=0):
'Add a new task or update the priority of an existing task'
if task in entry_finder:
remove_task(task)
count = next(counter)
entry = [priority, count, task]
entry_finder[task] = entry
heappush(pq, entry)
def remove_task(task):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = entry_finder.pop(task)
entry[-1] = REMOVED
def pop_task():
'Remove and return the lowest priority task. Raise KeyError if empty.'
while pq:
priority, count, task = heappop(pq)
if task is not REMOVED:
del entry_finder[task]
return task
raise KeyError('pop from an empty priority queue')
理论¶
堆是满足以下条件的数组:对于所有 k,a[k] <= a[2*k+1]
且 a[k] <= a[2*k+2]
,从 0 开始计数元素。为了进行比较,不存在的元素被认为是无限的。堆的一个有趣特性是 a[0]
始终是其最小元素。
上面奇怪的不变性旨在成为锦标赛的有效内存表示形式。下面的数字是 k,而不是 a[k]
0
1 2
3 4 5 6
7 8 9 10 11 12 13 14
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
在上面的树中,每个单元格 k 都在 2*k+1
和 2*k+2
之上。在我们通常在体育运动中看到的二元锦标赛中,每个单元格都是其顶部两个单元格的获胜者,我们可以沿着树向下追踪获胜者以查看他/她遇到的所有对手。然而,在这种锦标赛的许多计算机应用中,我们不需要追踪获胜者的历史。为了提高内存效率,当一个获胜者被提升时,我们尝试用较低级别的其他东西替换它,规则变成一个单元格及其顶部的两个单元格包含三个不同的项目,但顶部单元格“获胜”超过了两个顶部单元格。
如果始终保护此堆不变性,则索引 0 显然是总体获胜者。删除它并找到“下一个”获胜者的最简单算法方法是将某个失败者(例如上图中的单元格 30)移动到 0 位置,然后沿着树向下渗透这个新的 0,交换值,直到重新建立不变性。这显然与树中项目的总数成对数关系。通过迭代所有项目,您可以获得 O(n log n) 排序。
这种排序的一个很好的特性是,您可以在排序进行时有效地插入新项目,前提是插入的项目不“优于”您提取的最后一个第 0 个元素。这在模拟上下文中特别有用,其中树保存所有传入事件,“获胜”条件意味着最小的计划时间。当一个事件安排其他事件执行时,它们被安排到未来,因此它们可以很容易地进入堆中。因此,堆是实现调度程序的良好结构(这就是我用于 MIDI 音序器的结构 :-)。
已经对用于实现调度程序的各种结构进行了广泛的研究,堆非常适合于此,因为它们相当快,速度几乎恒定,并且最坏情况与平均情况没有太大区别。然而,还有其他表示总体上更有效,但最坏情况可能很糟糕。
堆在大型磁盘排序中也非常有用。您很可能都知道,大型排序意味着生成“运行”(预先排序的序列,其大小通常与 CPU 内存量有关),然后对这些运行进行合并传递,这种合并通常组织得非常巧妙 [1]。初始排序产生尽可能长的运行非常重要。锦标赛是实现这一目标的好方法。如果使用所有可用内存来保存锦标赛,您替换并渗透恰好适合当前运行的项目,您将生成大小是随机输入内存两倍的运行,并且对于模糊排序的输入要好得多。
此外,如果您将第 0 个项目输出到磁盘并获得一个可能不适合当前锦标赛的输入(因为该值“胜过”最后一个输出值),则它不适合堆,因此堆的大小会减小。释放的内存可以立即巧妙地重复使用,以逐步构建第二个堆,该堆以与第一个堆融化完全相同的速率增长。当第一个堆完全消失时,您切换堆并开始新的运行。聪明且非常有效!
总之,堆是有用的内存结构。我在一些应用程序中使用它们,我认为保留一个“堆”模块是很好的。 :-)
脚注