heapq — 堆队列算法

源代码: Lib/heapq.py


此模块提供了堆队列算法的一种实现,也称为优先队列算法。

最小堆(Min-heap)是二叉树,其中每个父节点的值都小于或等于其任何子节点的值。我们将此条件称为堆不变量。

对于最小堆,此实现使用列表,其中对于所有存在的比较元素*k*,heap[k] <= heap[2*k+1] 并且 heap[k] <= heap[2*k+2]。元素从零开始计数。最小堆的一个有趣特性是,其最小的元素始终是根,即 heap[0]

最大堆(Max-heap)满足相反的不变量:每个父节点的值都*大于*其任何子节点的值。它们被实现为列表,其中对于所有存在的比较元素*k*,maxheap[2*k+1] <= maxheap[k] 并且 maxheap[2*k+2] <= maxheap[k]。根,即 maxheap[0],包含*最大*的元素;heap.sort(reverse=True) 维护了最大堆的不变量。

heapq API 在两个方面与教科书中的堆算法不同:(a) 我们使用基于零的索引。这使得节点索引与其子节点索引之间的关系不那么明显,但更适合 Python,因为它使用基于零的索引。(b) 教科书通常关注最大堆,因为它们适合原地排序。我们的实现偏向于最小堆,因为它们更好地对应 Python 的 列表

这两个方面使得可以将堆视为一个普通的 Python 列表,而不会有任何意外:heap[0] 是最小的项,并且 heap.sort() 维护了堆不变量!

list.sort() 类似,此实现仅使用 < 运算符进行比较,无论是最小堆还是最大堆。

在下面的 API 和本文档中,未限定的术语*堆*通常指最小堆。最大堆的 API 以 _max 后缀命名。

要创建一个堆,可以使用初始化为 [] 的列表,或者使用 heapify()heapify_max() 函数将现有列表转换为最小堆或最大堆。

为最小堆提供了以下函数

heapq.heappush(heap, item)

将值 *item* 推入*堆*中,并保持最小堆不变量。

heapq.heappop(heap)

从*堆*中弹出并返回最小的项,并保持最小堆不变量。如果堆为空,则引发 IndexError。要访问最小的项而不弹出它,请使用 heap[0]

heapq.heappushpop(heap, item)

将 *item* 推入堆中,然后从*堆*中弹出并返回最小的项。这个组合操作比先调用 heappush() 再单独调用 heappop() 更高效。

heapq.heapify(x)

以线性时间将列表 *x* 原地转换为最小堆。

heapq.heapreplace(heap, item)

从*堆*中弹出并返回最小的项,同时推入新的*item*。堆的大小不变。如果堆为空,则引发 IndexError

这个一步到位的操作比先 heappop()heappush() 更高效,并且在使用固定大小的堆时可能更合适。弹出/推入的组合总是从堆中返回一个元素并用 *item* 替换它。

返回的值可能比添加的 *item* 更大。如果不希望这样,请考虑改用 heappushpop()。它的推入/弹出组合会返回两个值中较小的一个,将较大的值留在堆上。

对于最大堆,提供了以下函数

heapq.heapify_max(x)

以线性时间将列表 *x* 原地转换为最大堆。

在 3.14 版本加入。

heapq.heappush_max(heap, item)

将值 *item* 推入最大堆 *heap* 中,并保持最大堆不变量。

在 3.14 版本加入。

heapq.heappop_max(heap)

从最大堆 *heap* 中弹出并返回最大的项,并保持最大堆不变量。如果最大堆为空,则引发 IndexError。要访问最大的项而不弹出它,请使用 maxheap[0]

在 3.14 版本加入。

heapq.heappushpop_max(heap, item)

将 *item* 推入最大堆 *heap* 中,然后从 *heap* 中弹出并返回最大的项。这个组合操作比先调用 heappush_max() 再单独调用 heappop_max() 更高效。

在 3.14 版本加入。

heapq.heapreplace_max(heap, item)

从最大堆 *heap* 中弹出并返回最大的项,同时推入新的 *item*。最大堆的大小不变。如果最大堆为空,则引发 IndexError

返回的值可能比添加的 *item* 更小。有关详细用法说明,请参考类似的函数 heapreplace()

在 3.14 版本加入。

该模块还提供了三个基于堆的通用函数。

heapq.merge(*iterables, key=None, reverse=False)

将多个已排序的输入合并为单个已排序的输出(例如,合并来自多个日志文件的时间戳条目)。返回一个包含已排序值的迭代器

类似于 sorted(itertools.chain(*iterables)),但它返回一个可迭代对象,不会一次性将所有数据加载到内存中,并假定每个输入流都已经排序(从小到大)。

有两个可选参数,必须以关键字参数的形式指定。

key 指定一个接受一个参数的键函数,用于从每个输入元素中提取比较键。默认值为 None(直接比较元素)。

reverse 是一个布尔值。如果设置为 True,则输入元素会按每次比较都反向的方式合并。为了达到与 sorted(itertools.chain(*iterables), reverse=True) 类似的行为,所有可迭代对象必须从大到小排序。

在 3.5 版本发生变更:添加了可选的 keyreverse 参数。

heapq.nlargest(n, iterable, key=None)

从 *iterable* 定义的数据集中返回一个包含 *n* 个最大元素的列表。如果提供了 *key*,它指定了一个接受一个参数的函数,用于从 *iterable* 中的每个元素提取比较键(例如,key=str.lower)。等价于:sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

从 *iterable* 定义的数据集中返回一个包含 *n* 个最小元素的列表。如果提供了 *key*,它指定了一个接受一个参数的函数,用于从 *iterable* 中的每个元素提取比较键(例如,key=str.lower)。等价于:sorted(iterable, key=key)[:n]

后两个函数在 *n* 值较小时性能最佳。对于较大的值,使用 sorted() 函数更高效。此外,当 n==1 时,使用内置的 min()max() 函数更高效。如果需要重复使用这些函数,可以考虑将可迭代对象转换为一个真正的堆。

基本示例

可以通过将所有值推入堆中,然后一次一个地弹出最小值来实现堆排序

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

这类似于 sorted(iterable),但与 sorted() 不同,此实现是不稳定的。

堆元素可以是元组。这对于在跟踪主记录的同时分配比较值(例如任务优先级)非常有用

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

优先队列实现说明

优先队列是堆的常见用途,它带来了几个实现上的挑战

  • 排序稳定性:如何让两个具有相同优先级的任务按照它们最初添加的顺序返回?

  • 如果优先级相等且任务没有默认的比较顺序,元组比较对于 (priority, task) 对会失败。

  • 如果任务的优先级发生变化,如何将其移动到堆中的新位置?

  • 或者如果需要删除一个待处理的任务,如何找到并将其从队列中移除?

前两个挑战的一个解决方案是将条目存储为包含优先级、条目计数和任务的 3 元素列表。条目计数作为一个决胜局,这样具有相同优先级的两个任务会按它们添加的顺序返回。并且由于没有两个条目计数是相同的,元组比较永远不会尝试直接比较两个任务。

对于不可比较任务问题的另一个解决方案是创建一个包装类,它忽略任务项,只比较优先级字段

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

剩下的挑战围绕着找到一个待处理的任务并更改其优先级或完全移除它。可以通过一个指向队列中条目的字典来找到任务。

移除条目或更改其优先级更困难,因为它会破坏堆结构的不变量。因此,一个可能的解决方案是将该条目标记为已移除,并添加一个具有修改后优先级的新条目

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

理论

堆是数组,对于所有的 *k*,都满足 a[k] <= a[2*k+1]a[k] <= a[2*k+2],元素从 0 开始计数。为了比较,不存在的元素被认为是无限大。堆的一个有趣特性是 a[0] 永远是其最小的元素。

上面这个奇怪的不变量旨在成为一种高效的锦标赛内存表示。下面的数字是 *k*,而不是 a[k]

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

在上面的树中,每个单元格 *k* 都位于 2*k+12*k+2 之上。在体育比赛中常见的二元锦标赛中,每个单元格是它上面两个单元格的胜者,我们可以沿着树追溯胜者,看到他/她所有的对手。然而,在许多此类锦标赛的计算机应用中,我们不需要追溯胜者的历史。为了更节省内存,当一个胜者被晋升时,我们尝试用一个较低级别的其他东西来替换它,规则就变成了:一个单元格和它上面的两个单元格包含三个不同的项,但顶部的单元格“胜过”下面的两个单元格。

如果这个堆不变量始终保持,那么索引 0 显然是总冠军。移除它并找到“下一个”冠军的最简单的算法方法是,将某个失败者(比如上图中的单元格 30)移动到位置 0,然后将这个新的 0 向下渗透到树中,交换值,直到不变量重新建立。这显然在树中项目总数上是对数级的。通过遍历所有项目,你会得到一个 *O*(*n* log *n*) 的排序。

这种排序的一个很好的特性是,你可以在排序进行中高效地插入新项目,前提是插入的项目不比你提取的最后一个第 0 个元素“更好”。这在模拟场景中特别有用,其中树持有所有传入的事件,而“获胜”条件意味着最小的调度时间。当一个事件调度其他事件执行时,它们被安排在未来,所以它们可以轻松地进入堆。因此,堆是实现调度器的好结构(我就是用它来做我的 MIDI 音序器 :-))。

实现调度器的各种结构已经被广泛研究,而堆在这方面很不错,因为它们速度相当快,速度几乎是恒定的,并且最坏情况与平均情况相差不大。然而,还有其他表示方式在整体上更高效,但其最坏情况可能会非常糟糕。

堆在大型磁盘排序中也非常有用。你们很可能都知道,大型排序意味着要产生“归并段”(它们是预排序的序列,其大小通常与 CPU 内存量有关),然后是对这些归并段进行合并遍(merge pass),这种合并通常被组织得非常巧妙[1]。非常重要的一点是,初始排序要产生尽可能长的归并段。锦标赛是实现这一目标的好方法。如果使用所有可用内存来举行一场锦标赛,你替换并渗透那些恰好适合当前归并段的项目,你将产生对于随机输入而言是内存大小两倍的归并段,对于模糊有序的输入则更好。

此外,如果你将第 0 项输出到磁盘,并得到一个可能不适合当前锦标赛的输入(因为该值“胜过”最后一个输出值),它就不能放入堆中,所以堆的大小会减小。释放的内存可以立即被巧妙地重用,逐步构建第二个堆,它以与第一个堆融化完全相同的速度增长。当第一个堆完全消失时,你切换堆并开始一个新的归并段。巧妙且相当有效!

总之,堆是值得了解的有用的内存结构。我在一些应用程序中使用了它们,我认为手边有一个“heap”模块是件好事。:-)

脚注