Skip to content

Instantly share code, notes, and snippets.

@lantzbuilds
Last active January 27, 2026 21:57
Show Gist options
  • Select an option

  • Save lantzbuilds/2c3a65b4f91452f39c5d864e2566e446 to your computer and use it in GitHub Desktop.

Select an option

Save lantzbuilds/2c3a65b4f91452f39c5d864e2566e446 to your computer and use it in GitHub Desktop.
Making Python Data Structures Thread-Safe with threading — Educational Reference

Making the Task Queue Thread-Safe with Python's threading

A practical guide to concurrent access patterns for shared data structures.


Why Thread Safety Matters

When multiple threads access shared data simultaneously, you can get race conditions — bugs that depend on unpredictable timing:

# Without locking — BROKEN
class TaskQueue:
    def __init__(self):
        self._counter = 0

    def add_task(self, task_id, priority, payload):
        # Thread A reads _counter = 5
        # Thread B reads _counter = 5 (same value!)
        # Thread A writes _counter = 6
        # Thread B writes _counter = 6 (overwrites A's increment!)
        current = self._counter
        self._counter = current + 1  # Lost update!

Symptoms of race conditions:

  • Duplicate task IDs
  • Tasks "disappearing" from the queue
  • Incorrect status values
  • Crashes that only happen "sometimes"

The Race Condition in Our Task Queue

Consider two threads calling get_next_task() simultaneously:

Thread A                          Thread B
────────                          ────────
heap = [task-1, task-2]           heap = [task-1, task-2]
        │                                 │
        ▼                                 ▼
pop task-1                        pop task-1  ← Same task!
        │                                 │
        ▼                                 ▼
return task-1                     return task-1  ← Duplicate processing!

Both threads get the same task — disaster for a job queue.


Solution 1: Global Lock (Simple)

The simplest fix — one lock protects all operations:

import heapq
import threading
from typing import Any, Optional


class ThreadSafeTaskQueue:
    def __init__(self):
        self._heap = []
        self._tasks = {}
        self._counter = 0
        self._lock = threading.Lock()  # One lock for everything

    def add_task(self, task_id: str, priority: int, payload: Any) -> None:
        with self._lock:  # Acquire lock, auto-release when done
            if task_id in self._tasks:
                raise ValueError(f"Task {task_id} already exists")

            self._tasks[task_id] = {
                "id": task_id,
                "priority": priority,
                "payload": payload,
                "status": "queued"
            }
            heapq.heappush(self._heap, (-priority, self._counter, task_id))
            self._counter += 1

    def get_next_task(self) -> Optional[dict]:
        with self._lock:
            while self._heap:
                _, _, task_id = heapq.heappop(self._heap)
                task = self._tasks.get(task_id)

                if task and task["status"] == "queued":
                    task["status"] = "processing"
                    return task

            return None

    def get_status(self, task_id: str) -> Optional[str]:
        with self._lock:
            task = self._tasks.get(task_id)
            return task["status"] if task else None

    def complete_task(self, task_id: str) -> None:
        with self._lock:
            task = self._tasks.get(task_id)
            if task is None:
                raise KeyError(f"Task {task_id} not found")
            task["status"] = "completed"

Pros: Simple, correct, easy to reason about Cons: Only one thread can do anything at a time (low concurrency)


Understanding threading.Lock

Basic Usage

lock = threading.Lock()

# Manual acquire/release (error-prone)
lock.acquire()
try:
    # Critical section — only one thread at a time
    do_something()
finally:
    lock.release()  # MUST release, even if exception

# Context manager (preferred)
with lock:
    # Critical section
    do_something()
# Lock automatically released here

Lock Behavior

lock = threading.Lock()

# Thread A
with lock:           # Acquires lock
    time.sleep(10)   # Holds lock for 10 seconds
                     # Lock released

# Thread B (runs concurrently)
with lock:           # BLOCKS here until Thread A releases
    print("Got it!") # Finally runs after Thread A finishes

Non-Blocking Acquire

if lock.acquire(blocking=False):
    try:
        # Got the lock, do work
        pass
    finally:
        lock.release()
else:
    # Couldn't get lock, do something else
    print("Queue is busy, try again later")

Solution 2: RLock for Reentrant Locking

What if a method calls another method that also needs the lock?

class TaskQueue:
    def __init__(self):
        self._lock = threading.Lock()

    def add_task(self, task_id, priority, payload):
        with self._lock:
            # ... add task logic ...
            self._log_event(f"Added {task_id}")  # Calls another method

    def _log_event(self, message):
        with self._lock:  # DEADLOCK! Already holding the lock
            print(message)

Solution: Use RLock (reentrant lock)

class TaskQueue:
    def __init__(self):
        self._lock = threading.RLock()  # Can be acquired multiple times by same thread

    def add_task(self, task_id, priority, payload):
        with self._lock:  # Acquire count: 1
            # ... add task logic ...
            self._log_event(f"Added {task_id}")

    def _log_event(self, message):
        with self._lock:  # Acquire count: 2 (same thread, OK!)
            print(message)
        # Acquire count back to 1

    # Lock fully released when add_task's `with` block exits

Rule of thumb: Use RLock if methods call other methods that need the lock.


Solution 3: Read-Write Lock Pattern

What if reads are much more common than writes? A single lock forces readers to wait for other readers — unnecessary!

import threading


class ReadWriteLock:
    """Multiple readers OR one writer, but not both."""

    def __init__(self):
        self._read_ready = threading.Condition(threading.Lock())
        self._readers = 0

    def acquire_read(self):
        with self._read_ready:
            self._readers += 1

    def release_read(self):
        with self._read_ready:
            self._readers -= 1
            if self._readers == 0:
                self._read_ready.notify_all()

    def acquire_write(self):
        self._read_ready.acquire()
        while self._readers > 0:
            self._read_ready.wait()

    def release_write(self):
        self._read_ready.release()


class TaskQueueWithRWLock:
    def __init__(self):
        self._heap = []
        self._tasks = {}
        self._counter = 0
        self._rwlock = ReadWriteLock()

    def get_status(self, task_id: str) -> Optional[str]:
        """Read operation — multiple threads can do this concurrently."""
        self._rwlock.acquire_read()
        try:
            task = self._tasks.get(task_id)
            return task["status"] if task else None
        finally:
            self._rwlock.release_read()

    def add_task(self, task_id: str, priority: int, payload: Any) -> None:
        """Write operation — exclusive access required."""
        self._rwlock.acquire_write()
        try:
            # ... add task logic ...
            pass
        finally:
            self._rwlock.release_write()

Pros: Better concurrency for read-heavy workloads Cons: More complex, potential writer starvation


Solution 4: Condition Variables (Producer-Consumer)

What if we want get_next_task() to wait for a task instead of returning None?

import threading
import heapq
from typing import Any


class BlockingTaskQueue:
    """A task queue where get_next_task() blocks until a task is available."""

    def __init__(self):
        self._heap = []
        self._tasks = {}
        self._counter = 0
        self._lock = threading.Lock()
        self._not_empty = threading.Condition(self._lock)

    def add_task(self, task_id: str, priority: int, payload: Any) -> None:
        with self._not_empty:  # Acquires underlying lock
            self._tasks[task_id] = {
                "id": task_id,
                "priority": priority,
                "payload": payload,
                "status": "queued"
            }
            heapq.heappush(self._heap, (-priority, self._counter, task_id))
            self._counter += 1

            self._not_empty.notify()  # Wake up one waiting consumer

    def get_next_task(self, timeout: float = None) -> dict:
        with self._not_empty:
            # Wait until there's a queued task
            while True:
                # Try to find a queued task
                while self._heap:
                    _, _, task_id = heapq.heappop(self._heap)
                    task = self._tasks.get(task_id)
                    if task and task["status"] == "queued":
                        task["status"] = "processing"
                        return task

                # No queued task found, wait for notification
                if not self._not_empty.wait(timeout):
                    raise TimeoutError("No task available")

    def complete_task(self, task_id: str) -> None:
        with self._not_empty:
            task = self._tasks.get(task_id)
            if task:
                task["status"] = "completed"

Usage:

queue = BlockingTaskQueue()

# Consumer thread — blocks until task available
def worker():
    while True:
        task = queue.get_next_task()  # Blocks here if queue empty
        process(task)
        queue.complete_task(task["id"])

# Producer thread — adds tasks
def producer():
    queue.add_task("task-1", priority=5, payload={})
    # Consumer wakes up immediately

Solution 5: Just Use queue.PriorityQueue

Python's standard library has a thread-safe priority queue built-in:

import queue
import threading


class SimpleThreadSafeTaskQueue:
    def __init__(self):
        self._queue = queue.PriorityQueue()
        self._tasks = {}
        self._tasks_lock = threading.Lock()
        self._counter = 0

    def add_task(self, task_id: str, priority: int, payload: dict) -> None:
        with self._tasks_lock:
            self._tasks[task_id] = {
                "id": task_id,
                "priority": priority,
                "payload": payload,
                "status": "queued"
            }
            # PriorityQueue.put() is thread-safe
            self._queue.put((-priority, self._counter, task_id))
            self._counter += 1

    def get_next_task(self, block: bool = True, timeout: float = None):
        while True:
            try:
                # PriorityQueue.get() is thread-safe and can block
                _, _, task_id = self._queue.get(block=block, timeout=timeout)
            except queue.Empty:
                return None

            with self._tasks_lock:
                task = self._tasks.get(task_id)
                if task and task["status"] == "queued":
                    task["status"] = "processing"
                    return task
                # Task was cancelled, try next one

    def get_status(self, task_id: str) -> str:
        with self._tasks_lock:
            task = self._tasks.get(task_id)
            return task["status"] if task else None

    def complete_task(self, task_id: str) -> None:
        with self._tasks_lock:
            task = self._tasks.get(task_id)
            if task:
                task["status"] = "completed"
            self._queue.task_done()  # Signal task completion

Pros:

  • Built-in, well-tested, handles edge cases
  • Supports blocking with timeout
  • task_done() / join() for graceful shutdown

Cons:

  • Less control over internal behavior
  • Still need separate lock for task metadata

Common Pitfalls

1. Forgetting to Release the Lock

# BAD — lock never released if exception occurs
lock.acquire()
do_something_risky()  # If this raises, lock is held forever!
lock.release()

# GOOD — always use context manager
with lock:
    do_something_risky()
# Lock released even if exception

2. Lock Ordering Deadlock

# Thread A                    Thread B
lock_a.acquire()              lock_b.acquire()
lock_b.acquire()  # WAITS     lock_a.acquire()  # WAITS
# DEADLOCK! Each waiting for the other

# Solution: Always acquire locks in the same order
# Or use a single lock
# Or use timeout: lock.acquire(timeout=5)

3. Holding Lock During I/O

# BAD — blocks other threads during slow network call
with lock:
    data = fetch_from_api()  # 2 seconds of blocking
    process(data)

# GOOD — minimize lock scope
data = fetch_from_api()  # No lock needed for read
with lock:
    process(data)  # Lock only for shared state

4. Check-Then-Act Race Condition

# BAD — status can change between check and act
def safe_complete(self, task_id):
    status = self.get_status(task_id)      # Lock acquired, released
    if status == "processing":             # No lock held here!
        self.complete_task(task_id)        # Lock acquired, released
    # Another thread could have changed status between these calls!

# GOOD — atomic check-and-act
def safe_complete(self, task_id):
    with self._lock:
        task = self._tasks.get(task_id)
        if task and task["status"] == "processing":
            task["status"] = "completed"

Quick Reference: Which Lock to Use?

Scenario Solution
Simple shared state threading.Lock()
Methods calling other locked methods threading.RLock()
Wait for condition (producer-consumer) threading.Condition()
Many readers, few writers Read-Write Lock pattern
Just need a thread-safe queue queue.Queue or queue.PriorityQueue
Limit concurrent access to N threading.Semaphore(N)

Complete Thread-Safe Implementation

import heapq
import threading
from enum import Enum
from typing import Any, Optional
from dataclasses import dataclass


class TaskStatus(Enum):
    QUEUED = "queued"
    PROCESSING = "processing"
    COMPLETED = "completed"
    CANCELLED = "cancelled"


@dataclass
class Task:
    task_id: str
    priority: int
    payload: Any
    status: TaskStatus = TaskStatus.QUEUED


class ThreadSafeTaskQueue:
    """
    A thread-safe priority task queue.

    Thread-safety guarantees:
    - Multiple threads can safely call any method concurrently
    - get_next_task() blocks until a task is available (or timeout)
    - All operations are atomic
    """

    def __init__(self):
        self._heap: list = []
        self._tasks: dict[str, Task] = {}
        self._counter: int = 0
        self._lock = threading.RLock()
        self._not_empty = threading.Condition(self._lock)
        self._shutdown = False

    def add_task(self, task_id: str, priority: int, payload: Any) -> None:
        """Add a task to the queue. Thread-safe."""
        with self._not_empty:
            if self._shutdown:
                raise RuntimeError("Queue is shut down")
            if task_id in self._tasks:
                raise ValueError(f"Task {task_id} already exists")

            task = Task(task_id=task_id, priority=priority, payload=payload)
            self._tasks[task_id] = task
            heapq.heappush(self._heap, (-priority, self._counter, task_id))
            self._counter += 1
            self._not_empty.notify()

    def get_next_task(self, timeout: Optional[float] = None) -> Optional[Task]:
        """
        Get the highest priority task. Blocks if queue is empty.

        Args:
            timeout: Max seconds to wait. None = wait forever.

        Returns:
            Task object, or None if timeout/shutdown.
        """
        with self._not_empty:
            while True:
                if self._shutdown:
                    return None

                # Try to find a queued task
                while self._heap:
                    _, _, task_id = heapq.heappop(self._heap)
                    task = self._tasks.get(task_id)
                    if task and task.status == TaskStatus.QUEUED:
                        task.status = TaskStatus.PROCESSING
                        return task

                # No task available, wait
                if not self._not_empty.wait(timeout):
                    return None  # Timeout

    def get_status(self, task_id: str) -> Optional[str]:
        """Get task status. Thread-safe."""
        with self._lock:
            task = self._tasks.get(task_id)
            return task.status.value if task else None

    def complete_task(self, task_id: str) -> None:
        """Mark task as completed. Thread-safe."""
        with self._lock:
            task = self._tasks.get(task_id)
            if task is None:
                raise KeyError(f"Task {task_id} not found")
            if task.status != TaskStatus.PROCESSING:
                raise ValueError(f"Task {task_id} is not processing")
            task.status = TaskStatus.COMPLETED

    def cancel_task(self, task_id: str) -> bool:
        """Cancel a queued task. Returns True if cancelled. Thread-safe."""
        with self._lock:
            task = self._tasks.get(task_id)
            if task and task.status == TaskStatus.QUEUED:
                task.status = TaskStatus.CANCELLED
                return True
            return False

    def shutdown(self, wait: bool = True) -> None:
        """Stop accepting new tasks and wake up blocked consumers."""
        with self._not_empty:
            self._shutdown = True
            self._not_empty.notify_all()

    def __len__(self) -> int:
        """Number of queued tasks (approximate in concurrent use)."""
        with self._lock:
            return sum(1 for t in self._tasks.values()
                      if t.status == TaskStatus.QUEUED)

Usage Example: Worker Pool

import threading
import time


def worker(queue: ThreadSafeTaskQueue, worker_id: int):
    """Worker thread that processes tasks."""
    print(f"Worker {worker_id} started")
    while True:
        task = queue.get_next_task(timeout=5)
        if task is None:
            print(f"Worker {worker_id} shutting down")
            break

        print(f"Worker {worker_id} processing {task.task_id}")
        time.sleep(0.1)  # Simulate work
        queue.complete_task(task.task_id)
        print(f"Worker {worker_id} completed {task.task_id}")


# Create queue and workers
queue = ThreadSafeTaskQueue()
workers = [
    threading.Thread(target=worker, args=(queue, i))
    for i in range(3)
]

# Start workers
for w in workers:
    w.start()

# Add tasks
for i in range(10):
    queue.add_task(f"task-{i}", priority=i, payload={"value": i})

# Wait for all tasks to complete, then shutdown
time.sleep(2)
queue.shutdown()

# Wait for workers to finish
for w in workers:
    w.join()

print("All done!")

Further Reading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment