heapq — 堆队列算法

源代码: Lib/heapq.py


此模块提供了堆队列算法(也称为优先级队列算法)的实现。

堆是二叉树,其中每个父节点的值都小于或等于其任何子节点的值。我们将此条件称为堆不变性。

此实现使用数组,对于所有 *k*,从零开始计数元素,满足 heap[k] <= heap[2*k+1]heap[k] <= heap[2*k+2]。 为了方便比较,不存在的元素被认为是无限大的。 堆的一个有趣的属性是其最小元素始终是根,即 heap[0]

下面的 API 在两个方面不同于教科书中的堆算法:(a) 我们使用从零开始的索引。这使得节点索引与其子节点索引之间的关系稍微不那么明显,但更适合,因为 Python 使用从零开始的索引。(b) 我们的 pop 方法返回最小的项,而不是最大的项(在教科书中称为“最小堆”;由于其适合就地排序,“最大堆”在文本中更常见)。

这两个特性使得可以将堆视为普通的 Python 列表而不会感到意外:heap[0] 是最小的项,并且 heap.sort() 保持堆不变性!

要创建一个堆,请使用初始化为 [] 的列表,或者你可以通过函数 heapify() 将填充的列表转换为堆。

提供了以下函数

heapq.heappush(heap, item)

将值 *item* 推入 *heap*,并保持堆不变性。

heapq.heappop(heap)

弹出并返回 *heap* 中最小的项,并保持堆不变性。 如果堆为空,则引发 IndexError。 要在不弹出的情况下访问最小的项,请使用 heap[0]

heapq.heappushpop(heap, item)

将 *item* 推入堆中,然后从 *heap* 中弹出并返回最小的项。 此组合操作的运行效率高于 heappush(),然后再单独调用 heappop()

heapq.heapify(x)

在线性时间内将列表 *x* 就地转换为堆。

heapq.heapreplace(heap, item)

从 *heap* 中弹出并返回最小的项,并且还推送新的 *item*。 堆的大小不会更改。 如果堆为空,则引发 IndexError

此单步操作比 heappop() 后跟 heappush() 更有效,并且在使用固定大小的堆时可能更合适。 pop/push 组合始终从堆中返回一个元素,并将其替换为 *item*。

返回的值可能大于添加的 *item*。 如果不希望这样,请考虑改用 heappushpop()。 其 push/pop 组合会返回两个值中较小的一个,而将较大的值留在堆上。

该模块还提供了三个基于堆的通用函数。

heapq.merge(*iterables, key=None, reverse=False)

将多个已排序的输入合并为单个已排序的输出(例如,合并来自多个日志文件的时间戳条目)。 返回排序值的 迭代器

类似于 sorted(itertools.chain(*iterables)),但返回一个可迭代对象,不会立即将所有数据拉入内存,并假设每个输入流都已排序(从小到大)。

具有两个必须指定为关键字参数的可选参数。

*key* 指定一个接受一个参数的 键函数,该参数用于从每个输入元素中提取比较键。 默认值为 None(直接比较元素)。

*reverse* 是一个布尔值。 如果设置为 True,则合并输入元素,就像每个比较都被反转一样。 为了实现类似于 sorted(itertools.chain(*iterables), reverse=True) 的行为,所有可迭代对象都必须从最大到最小排序。

在 3.5 版本中更改: 添加了可选的 *key* 和 *reverse* 参数。

heapq.nlargest(n, iterable, key=None)

返回一个列表,其中包含由 *iterable* 定义的数据集中最大的 *n* 个元素。 如果提供了 *key*,则它指定一个接受一个参数的函数,该函数用于从 *iterable* 中的每个元素中提取比较键(例如,key=str.lower)。 等效于:sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

返回一个列表,其中包含由iterable定义的数据集中最小的 n 个元素。如果提供了 key,则它指定一个单参数函数,用于从 iterable 中的每个元素提取比较键(例如,key=str.lower)。等效于: sorted(iterable, key=key)[:n]

后两个函数对于较小的 n 值效果最佳。对于较大的值,使用 sorted() 函数效率更高。此外,当 n==1 时,使用内置的 min()max() 函数效率更高。如果需要重复使用这些函数,请考虑将可迭代对象转换为实际的堆。

基本示例

可以通过将所有值压入堆中,然后一次弹出最小值来实现 堆排序

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

这类似于 sorted(iterable),但与 sorted() 不同,此实现是不稳定的。

堆元素可以是元组。这对于将比较值(例如任务优先级)与正在跟踪的主记录一起分配非常有用。

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

优先级队列实现说明

优先级队列是堆的常见用途,它带来了一些实现挑战。

  • 排序稳定性:如何使两个具有相同优先级的任务按照它们最初添加的顺序返回?

  • 如果优先级相等且任务没有默认比较顺序,则元组比较对于(优先级,任务)对会中断。

  • 如果任务的优先级发生变化,如何将其移动到堆中的新位置?

  • 或者,如果需要删除待处理的任务,如何找到它并将其从队列中移除?

前两个挑战的解决方案是将条目存储为包含优先级、条目计数和任务的 3 元素列表。条目计数充当决胜器,以便两个具有相同优先级的任务按照添加的顺序返回。并且由于没有两个条目计数相同,因此元组比较永远不会尝试直接比较两个任务。

解决不可比较任务问题的另一个解决方案是创建一个包装类,该类忽略任务项并且仅比较优先级字段。

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

其余的挑战围绕着查找待处理的任务以及更改其优先级或完全删除它。可以使用指向队列中条目的字典来完成查找任务。

删除条目或更改其优先级更加困难,因为它会破坏堆结构的不变量。因此,一个可能的解决方案是将条目标记为已删除,并添加一个具有修订优先级的新条目。

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

理论

堆是数组,对于所有 k,从 0 开始计数元素,a[k] <= a[2*k+1]a[k] <= a[2*k+2]。为了比较起见,不存在的元素被认为是无限的。堆的有趣特性是 a[0] 始终是其最小的元素。

上面的奇怪不变量旨在成为锦标赛的有效内存表示形式。下面的数字是 k,而不是 a[k]

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

在上面的树中,每个单元格 k 都位于 2*k+12*k+2 之上。在我们在体育比赛中看到的常见二元锦标赛中,每个单元格都是它所处的两个单元格的获胜者,我们可以沿着树向下追溯获胜者,以查看他/她所有的对手。但是,在许多此类锦标赛的计算机应用中,我们不需要追溯获胜者的历史。为了更节省内存,当获胜者被提升时,我们尝试用较低级别的其他东西替换它,规则变成一个单元格和它所在的两个单元格包含三个不同的项目,但顶部单元格“胜出” 位于顶部的两个单元格之上。

如果始终保护此堆不变量,则索引 0 显然是总冠军。删除它并找到“下一个”获胜者的最简单算法方法是将一些失败者(比如上图中的单元格 30)移动到 0 位置,然后将这个新的 0 下渗到树中,交换值,直到重新建立不变量。这显然是对树中项目总数的对数。通过迭代所有项目,您将获得 O(n log n) 排序。

这种排序的一个优点是,只要插入的项目不“优于”您提取的最后一个 0'th 元素,您就可以在排序进行时有效地插入新项目。这在模拟环境中特别有用,在模拟环境中,树保存所有传入的事件,“获胜”条件表示最小的计划时间。当一个事件计划其他事件执行时,它们会被安排到未来,因此它们可以轻松进入堆中。因此,堆是实现调度程序的好结构(这就是我用于 MIDI 音序器的原因 :-))。

已经对实现调度程序的各种结构进行了广泛的研究,堆在这方面很不错,因为它们的速度相当快,速度几乎是恒定的,而且最坏的情况与平均情况没有太大差异。但是,还有其他更有效的表示形式,但最坏的情况可能很糟糕。

堆在大磁盘排序中也非常有用。您很可能都知道,大排序意味着产生“运行”(即预先排序的序列,其大小通常与 CPU 内存量有关),然后对这些运行进行合并传递,合并通常组织得非常巧妙 [1]。初始排序产生尽可能长的运行非常重要。锦标赛是实现这一目标的好方法。如果使用所有可用于保存锦标赛的内存,替换并渗入适合当前运行的项目,您将产生运行,对于随机输入,运行的大小是内存的两倍,而对于模糊有序的输入,运行会更好得多。

此外,如果您在磁盘上输出第 0 个项目并获得一个可能不适合当前锦标赛的输入(因为该值“胜过”最后一个输出值),它就无法放入堆中,因此堆的大小会减小。可以巧妙地立即将释放的内存用于逐步构建第二个堆,第二个堆的增长速度与第一个堆的熔化速度完全相同。当第一个堆完全消失时,您将切换堆并开始新的运行。巧妙且非常有效!

总而言之,堆是有用的内存结构,需要了解。我在一些应用程序中使用它们,我认为保留一个“堆”模块是件好事。 :-)

脚注