A practical guide using the Task Queue example.
We need to efficiently get the highest priority task from a collection. Two approaches:
| Approach | add_task |
get_next_task |
|---|---|---|
| Sorted list | O(n log n) | O(1) |
| Heap | O(log n) | O(log n) |
For a queue with frequent adds and removes, the heap wins. For 1 million tasks, that's ~20 operations vs ~20 million.
A heap is a complete binary tree where every parent is smaller (min-heap) or larger (max-heap) than its children:
1 Min-heap: parent ≤ children
/ \
3 2 Finding minimum: O(1) — it's always the root
/ \ /
7 4 5 Insert/remove: O(log n) — tree height
Python's heapq implements a min-heap using a regular list, where:
- Index
0is the root (smallest element) - For element at index
i: left child is at2i+1, right child is at2i+2
import heapq
# Create a heap from a list
numbers = [5, 3, 8, 1, 9]
heapq.heapify(numbers)
print(numbers) # [1, 3, 8, 5, 9] — 1 is at root
# Push an element
heapq.heappush(numbers, 2)
print(numbers) # [1, 2, 8, 5, 9, 3] — maintains heap property
# Pop the smallest element
smallest = heapq.heappop(numbers)
print(smallest) # 1
print(numbers) # [2, 3, 8, 5, 9] — rebalanced automatically
# Peek without removing (just index 0)
print(numbers[0]) # 2Since heapq is a min-heap (smallest first), but we want highest priority first, we negate the priority:
# We want: priority 10 comes before priority 5
# heapq gives us: smallest first
# Solution: store -priority
heapq.heappush(heap, (-priority, task_id))
# Now:
# priority=10 → stored as -10
# priority=5 → stored as -5
# -10 < -5, so -10 pops first ✓Visual example:
tasks = [
("task-A", priority=5),
("task-B", priority=10),
("task-C", priority=1),
]
# Stored in heap as:
heap = [
(-10, "task-B"), # Root — smallest tuple, so pops first
(-5, "task-A"),
(-1, "task-C"),
]
heapq.heappop(heap) # Returns (-10, "task-B") — highest priority!Python compares tuples lexicographically (element by element):
(1, "apple") < (1, "banana") # True — first elements equal, compare second
(1, "zebra") < (2, "apple") # True — first elements decide itThis lets us create composite sort keys:
# Sort by priority (descending), then by insertion order (ascending)
heap_entry = (-priority, insertion_counter, task_id)What if two tasks have the same priority? We want FIFO (first-in, first-out) ordering.
class TaskQueue:
def __init__(self):
self._heap = []
self._counter = 0 # Monotonically increasing
def add_task(self, task_id, priority, payload):
# Tuple: (-priority, counter, task_id)
heapq.heappush(self._heap, (-priority, self._counter, task_id))
self._counter += 1Now tasks with equal priority are ordered by insertion time:
# Add two priority-5 tasks
add_task("task-A", priority=5, ...) # entry: (-5, 0, "task-A")
add_task("task-B", priority=5, ...) # entry: (-5, 1, "task-B")
# Comparison:
(-5, 0, "task-A") < (-5, 1, "task-B") # True — 0 < 1
# task-A pops first ✓Why not just use task_id as tiebreaker?
# This gives wrong order:
(-5, "task-B") < (-5, "task-A") # True — "task-A" < "task-B" alphabetically!
# But task-B might have been added first...You might wonder: why not store the Task object directly?
# This could fail:
heapq.heappush(heap, (-priority, counter, task_object))If priorities AND counters are somehow equal, Python tries to compare Task objects:
Task(...) < Task(...) # TypeError: '<' not supported between instances of 'Task'Solutions:
-
Store just the ID (recommended) — look up the object separately
heapq.heappush(heap, (-priority, counter, task_id)) task = self._tasks[task_id] # Separate dict lookup
-
Define
__lt__on your class@dataclass(order=True) class Task: sort_index: tuple = field(init=False, repr=False) task_id: str priority: int def __post_init__(self): self.sort_index = (-self.priority, self.task_id)
-
Use a wrapper that's always unequal
# Prevent comparison from ever reaching the task object heapq.heappush(heap, (-priority, counter, id(task), task)) # id() is unique, so comparison never reaches task
Removing an arbitrary element from a heap is O(n). Instead, we use lazy deletion:
def cancel_task(self, task_id):
# Don't remove from heap — just mark as cancelled
self._tasks[task_id].status = TaskStatus.CANCELLED
def get_next_task(self):
while self._heap:
neg_priority, _, task_id = heapq.heappop(self._heap)
task = self._tasks.get(task_id)
# Skip cancelled/completed tasks still in heap
if task and task.status == TaskStatus.QUEUED:
task.status = TaskStatus.PROCESSING
return task
return None # Empty queueTrade-off: The heap may contain "garbage" entries for cancelled tasks. For most use cases, this is fine. If cancellations are frequent, periodically rebuild the heap.
queue = TaskQueue()
# 1. Add task-1 (priority 5)
queue.add_task("task-1", priority=5, payload={...})
# _heap = [(-5, 0, "task-1")]
# 2. Add task-2 (priority 10) — higher priority
queue.add_task("task-2", priority=10, payload={...})
# heapq reorders to maintain heap property:
# _heap = [(-10, 1, "task-2"), (-5, 0, "task-1")]
# ↑ root (smallest tuple = highest priority)
# 3. Add task-3 (priority 1) — lowest priority
queue.add_task("task-3", priority=1, payload={...})
# _heap = [(-10, 1, "task-2"), (-5, 0, "task-1"), (-1, 2, "task-3")]
# Heap structure:
# (-10, 1, "task-2") ← root
# / \
# (-5, 0, "task-1") (-1, 2, "task-3")
# 4. Get next task
task = queue.get_next_task()
# heappop removes root: (-10, 1, "task-2")
# Heap rebalances:
# _heap = [(-5, 0, "task-1"), (-1, 2, "task-3")]
# ↑ new root
# Returns task-2 ✓import heapq
from enum import Enum
from typing import Any, Optional
class TaskStatus(Enum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
CANCELLED = "cancelled"
class Task:
def __init__(self, task_id: str, priority: int, payload: Any):
self.task_id = task_id
self.priority = priority
self.payload = payload
self.status = TaskStatus.QUEUED
class TaskQueue:
def __init__(self):
self._heap = [] # Priority queue: (-priority, counter, task_id)
self._tasks = {} # task_id → Task object
self._counter = 0 # Tiebreaker for equal priorities
def add_task(self, task_id: str, priority: int, payload: Any) -> None:
if task_id in self._tasks:
raise ValueError(f"Task {task_id} already exists")
task = Task(task_id, priority, payload)
self._tasks[task_id] = task
heapq.heappush(self._heap, (-priority, self._counter, task_id))
self._counter += 1
def get_next_task(self) -> Optional[Task]:
while self._heap:
_, _, task_id = heapq.heappop(self._heap)
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.QUEUED:
task.status = TaskStatus.PROCESSING
return task
return None
def get_status(self, task_id: str) -> Optional[str]:
task = self._tasks.get(task_id)
return task.status.value if task else None
def complete_task(self, task_id: str) -> None:
task = self._tasks.get(task_id)
if task is None:
raise KeyError(f"Task {task_id} not found")
task.status = TaskStatus.COMPLETED
def cancel_task(self, task_id: str) -> None:
task = self._tasks.get(task_id)
if task is None:
raise KeyError(f"Task {task_id} not found")
task.status = TaskStatus.CANCELLED # Lazy deletion| Operation | Code | Time Complexity |
|---|---|---|
| Create heap | heapq.heapify(list) |
O(n) |
| Push | heapq.heappush(heap, item) |
O(log n) |
| Pop smallest | heapq.heappop(heap) |
O(log n) |
| Peek smallest | heap[0] |
O(1) |
| Push + pop | heapq.heappushpop(heap, item) |
O(log n) |
| Pop + push | heapq.heapreplace(heap, item) |
O(log n) |
| N smallest | heapq.nsmallest(n, iterable) |
O(n log k) |
| N largest | heapq.nlargest(n, iterable) |
O(n log k) |
heapq.heappush(heap, -value) # Negate on push
value = -heapq.heappop(heap) # Negate on pop# Use a dict to track latest entry per key
entry_finder = {}
REMOVED = "<removed>"
def add_task(task_id, priority):
if task_id in entry_finder:
remove_task(task_id)
entry = [-priority, task_id]
entry_finder[task_id] = entry
heapq.heappush(heap, entry)
def remove_task(task_id):
entry = entry_finder.pop(task_id)
entry[-1] = REMOVED # Mark as removed
def pop_task():
while heap:
priority, task_id = heapq.heappop(heap)
if task_id is not REMOVED:
del entry_finder[task_id]
return task_id
raise KeyError("Empty queue")def merge_sorted(*iterables):
return heapq.merge(*iterables) # Built-in!
list(heapq.merge([1, 3, 5], [2, 4, 6])) # [1, 2, 3, 4, 5, 6]