A practical guide to concurrent access patterns for shared data structures.
When multiple threads access shared data simultaneously, you can get race conditions — bugs that depend on unpredictable timing:
# Without locking — BROKEN
class TaskQueue:
def __init__(self):
self._counter = 0
def add_task(self, task_id, priority, payload):
# Thread A reads _counter = 5
# Thread B reads _counter = 5 (same value!)
# Thread A writes _counter = 6
# Thread B writes _counter = 6 (overwrites A's increment!)
current = self._counter
self._counter = current + 1 # Lost update!Symptoms of race conditions:
- Duplicate task IDs
- Tasks "disappearing" from the queue
- Incorrect status values
- Crashes that only happen "sometimes"
Consider two threads calling get_next_task() simultaneously:
Thread A Thread B
──────── ────────
heap = [task-1, task-2] heap = [task-1, task-2]
│ │
▼ ▼
pop task-1 pop task-1 ← Same task!
│ │
▼ ▼
return task-1 return task-1 ← Duplicate processing!
Both threads get the same task — disaster for a job queue.
The simplest fix — one lock protects all operations:
import heapq
import threading
from typing import Any, Optional
class ThreadSafeTaskQueue:
def __init__(self):
self._heap = []
self._tasks = {}
self._counter = 0
self._lock = threading.Lock() # One lock for everything
def add_task(self, task_id: str, priority: int, payload: Any) -> None:
with self._lock: # Acquire lock, auto-release when done
if task_id in self._tasks:
raise ValueError(f"Task {task_id} already exists")
self._tasks[task_id] = {
"id": task_id,
"priority": priority,
"payload": payload,
"status": "queued"
}
heapq.heappush(self._heap, (-priority, self._counter, task_id))
self._counter += 1
def get_next_task(self) -> Optional[dict]:
with self._lock:
while self._heap:
_, _, task_id = heapq.heappop(self._heap)
task = self._tasks.get(task_id)
if task and task["status"] == "queued":
task["status"] = "processing"
return task
return None
def get_status(self, task_id: str) -> Optional[str]:
with self._lock:
task = self._tasks.get(task_id)
return task["status"] if task else None
def complete_task(self, task_id: str) -> None:
with self._lock:
task = self._tasks.get(task_id)
if task is None:
raise KeyError(f"Task {task_id} not found")
task["status"] = "completed"Pros: Simple, correct, easy to reason about Cons: Only one thread can do anything at a time (low concurrency)
lock = threading.Lock()
# Manual acquire/release (error-prone)
lock.acquire()
try:
# Critical section — only one thread at a time
do_something()
finally:
lock.release() # MUST release, even if exception
# Context manager (preferred)
with lock:
# Critical section
do_something()
# Lock automatically released herelock = threading.Lock()
# Thread A
with lock: # Acquires lock
time.sleep(10) # Holds lock for 10 seconds
# Lock released
# Thread B (runs concurrently)
with lock: # BLOCKS here until Thread A releases
print("Got it!") # Finally runs after Thread A finishesif lock.acquire(blocking=False):
try:
# Got the lock, do work
pass
finally:
lock.release()
else:
# Couldn't get lock, do something else
print("Queue is busy, try again later")What if a method calls another method that also needs the lock?
class TaskQueue:
def __init__(self):
self._lock = threading.Lock()
def add_task(self, task_id, priority, payload):
with self._lock:
# ... add task logic ...
self._log_event(f"Added {task_id}") # Calls another method
def _log_event(self, message):
with self._lock: # DEADLOCK! Already holding the lock
print(message)Solution: Use RLock (reentrant lock)
class TaskQueue:
def __init__(self):
self._lock = threading.RLock() # Can be acquired multiple times by same thread
def add_task(self, task_id, priority, payload):
with self._lock: # Acquire count: 1
# ... add task logic ...
self._log_event(f"Added {task_id}")
def _log_event(self, message):
with self._lock: # Acquire count: 2 (same thread, OK!)
print(message)
# Acquire count back to 1
# Lock fully released when add_task's `with` block exitsRule of thumb: Use RLock if methods call other methods that need the lock.
What if reads are much more common than writes? A single lock forces readers to wait for other readers — unnecessary!
import threading
class ReadWriteLock:
"""Multiple readers OR one writer, but not both."""
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
with self._read_ready:
self._readers += 1
def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
self._read_ready.release()
class TaskQueueWithRWLock:
def __init__(self):
self._heap = []
self._tasks = {}
self._counter = 0
self._rwlock = ReadWriteLock()
def get_status(self, task_id: str) -> Optional[str]:
"""Read operation — multiple threads can do this concurrently."""
self._rwlock.acquire_read()
try:
task = self._tasks.get(task_id)
return task["status"] if task else None
finally:
self._rwlock.release_read()
def add_task(self, task_id: str, priority: int, payload: Any) -> None:
"""Write operation — exclusive access required."""
self._rwlock.acquire_write()
try:
# ... add task logic ...
pass
finally:
self._rwlock.release_write()Pros: Better concurrency for read-heavy workloads Cons: More complex, potential writer starvation
What if we want get_next_task() to wait for a task instead of returning None?
import threading
import heapq
from typing import Any
class BlockingTaskQueue:
"""A task queue where get_next_task() blocks until a task is available."""
def __init__(self):
self._heap = []
self._tasks = {}
self._counter = 0
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
def add_task(self, task_id: str, priority: int, payload: Any) -> None:
with self._not_empty: # Acquires underlying lock
self._tasks[task_id] = {
"id": task_id,
"priority": priority,
"payload": payload,
"status": "queued"
}
heapq.heappush(self._heap, (-priority, self._counter, task_id))
self._counter += 1
self._not_empty.notify() # Wake up one waiting consumer
def get_next_task(self, timeout: float = None) -> dict:
with self._not_empty:
# Wait until there's a queued task
while True:
# Try to find a queued task
while self._heap:
_, _, task_id = heapq.heappop(self._heap)
task = self._tasks.get(task_id)
if task and task["status"] == "queued":
task["status"] = "processing"
return task
# No queued task found, wait for notification
if not self._not_empty.wait(timeout):
raise TimeoutError("No task available")
def complete_task(self, task_id: str) -> None:
with self._not_empty:
task = self._tasks.get(task_id)
if task:
task["status"] = "completed"Usage:
queue = BlockingTaskQueue()
# Consumer thread — blocks until task available
def worker():
while True:
task = queue.get_next_task() # Blocks here if queue empty
process(task)
queue.complete_task(task["id"])
# Producer thread — adds tasks
def producer():
queue.add_task("task-1", priority=5, payload={})
# Consumer wakes up immediatelyPython's standard library has a thread-safe priority queue built-in:
import queue
import threading
class SimpleThreadSafeTaskQueue:
def __init__(self):
self._queue = queue.PriorityQueue()
self._tasks = {}
self._tasks_lock = threading.Lock()
self._counter = 0
def add_task(self, task_id: str, priority: int, payload: dict) -> None:
with self._tasks_lock:
self._tasks[task_id] = {
"id": task_id,
"priority": priority,
"payload": payload,
"status": "queued"
}
# PriorityQueue.put() is thread-safe
self._queue.put((-priority, self._counter, task_id))
self._counter += 1
def get_next_task(self, block: bool = True, timeout: float = None):
while True:
try:
# PriorityQueue.get() is thread-safe and can block
_, _, task_id = self._queue.get(block=block, timeout=timeout)
except queue.Empty:
return None
with self._tasks_lock:
task = self._tasks.get(task_id)
if task and task["status"] == "queued":
task["status"] = "processing"
return task
# Task was cancelled, try next one
def get_status(self, task_id: str) -> str:
with self._tasks_lock:
task = self._tasks.get(task_id)
return task["status"] if task else None
def complete_task(self, task_id: str) -> None:
with self._tasks_lock:
task = self._tasks.get(task_id)
if task:
task["status"] = "completed"
self._queue.task_done() # Signal task completionPros:
- Built-in, well-tested, handles edge cases
- Supports blocking with timeout
task_done()/join()for graceful shutdown
Cons:
- Less control over internal behavior
- Still need separate lock for task metadata
# BAD — lock never released if exception occurs
lock.acquire()
do_something_risky() # If this raises, lock is held forever!
lock.release()
# GOOD — always use context manager
with lock:
do_something_risky()
# Lock released even if exception# Thread A Thread B
lock_a.acquire() lock_b.acquire()
lock_b.acquire() # WAITS lock_a.acquire() # WAITS
# DEADLOCK! Each waiting for the other
# Solution: Always acquire locks in the same order
# Or use a single lock
# Or use timeout: lock.acquire(timeout=5)# BAD — blocks other threads during slow network call
with lock:
data = fetch_from_api() # 2 seconds of blocking
process(data)
# GOOD — minimize lock scope
data = fetch_from_api() # No lock needed for read
with lock:
process(data) # Lock only for shared state# BAD — status can change between check and act
def safe_complete(self, task_id):
status = self.get_status(task_id) # Lock acquired, released
if status == "processing": # No lock held here!
self.complete_task(task_id) # Lock acquired, released
# Another thread could have changed status between these calls!
# GOOD — atomic check-and-act
def safe_complete(self, task_id):
with self._lock:
task = self._tasks.get(task_id)
if task and task["status"] == "processing":
task["status"] = "completed"| Scenario | Solution |
|---|---|
| Simple shared state | threading.Lock() |
| Methods calling other locked methods | threading.RLock() |
| Wait for condition (producer-consumer) | threading.Condition() |
| Many readers, few writers | Read-Write Lock pattern |
| Just need a thread-safe queue | queue.Queue or queue.PriorityQueue |
| Limit concurrent access to N | threading.Semaphore(N) |
import heapq
import threading
from enum import Enum
from typing import Any, Optional
from dataclasses import dataclass
class TaskStatus(Enum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class Task:
task_id: str
priority: int
payload: Any
status: TaskStatus = TaskStatus.QUEUED
class ThreadSafeTaskQueue:
"""
A thread-safe priority task queue.
Thread-safety guarantees:
- Multiple threads can safely call any method concurrently
- get_next_task() blocks until a task is available (or timeout)
- All operations are atomic
"""
def __init__(self):
self._heap: list = []
self._tasks: dict[str, Task] = {}
self._counter: int = 0
self._lock = threading.RLock()
self._not_empty = threading.Condition(self._lock)
self._shutdown = False
def add_task(self, task_id: str, priority: int, payload: Any) -> None:
"""Add a task to the queue. Thread-safe."""
with self._not_empty:
if self._shutdown:
raise RuntimeError("Queue is shut down")
if task_id in self._tasks:
raise ValueError(f"Task {task_id} already exists")
task = Task(task_id=task_id, priority=priority, payload=payload)
self._tasks[task_id] = task
heapq.heappush(self._heap, (-priority, self._counter, task_id))
self._counter += 1
self._not_empty.notify()
def get_next_task(self, timeout: Optional[float] = None) -> Optional[Task]:
"""
Get the highest priority task. Blocks if queue is empty.
Args:
timeout: Max seconds to wait. None = wait forever.
Returns:
Task object, or None if timeout/shutdown.
"""
with self._not_empty:
while True:
if self._shutdown:
return None
# Try to find a queued task
while self._heap:
_, _, task_id = heapq.heappop(self._heap)
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.QUEUED:
task.status = TaskStatus.PROCESSING
return task
# No task available, wait
if not self._not_empty.wait(timeout):
return None # Timeout
def get_status(self, task_id: str) -> Optional[str]:
"""Get task status. Thread-safe."""
with self._lock:
task = self._tasks.get(task_id)
return task.status.value if task else None
def complete_task(self, task_id: str) -> None:
"""Mark task as completed. Thread-safe."""
with self._lock:
task = self._tasks.get(task_id)
if task is None:
raise KeyError(f"Task {task_id} not found")
if task.status != TaskStatus.PROCESSING:
raise ValueError(f"Task {task_id} is not processing")
task.status = TaskStatus.COMPLETED
def cancel_task(self, task_id: str) -> bool:
"""Cancel a queued task. Returns True if cancelled. Thread-safe."""
with self._lock:
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.QUEUED:
task.status = TaskStatus.CANCELLED
return True
return False
def shutdown(self, wait: bool = True) -> None:
"""Stop accepting new tasks and wake up blocked consumers."""
with self._not_empty:
self._shutdown = True
self._not_empty.notify_all()
def __len__(self) -> int:
"""Number of queued tasks (approximate in concurrent use)."""
with self._lock:
return sum(1 for t in self._tasks.values()
if t.status == TaskStatus.QUEUED)import threading
import time
def worker(queue: ThreadSafeTaskQueue, worker_id: int):
"""Worker thread that processes tasks."""
print(f"Worker {worker_id} started")
while True:
task = queue.get_next_task(timeout=5)
if task is None:
print(f"Worker {worker_id} shutting down")
break
print(f"Worker {worker_id} processing {task.task_id}")
time.sleep(0.1) # Simulate work
queue.complete_task(task.task_id)
print(f"Worker {worker_id} completed {task.task_id}")
# Create queue and workers
queue = ThreadSafeTaskQueue()
workers = [
threading.Thread(target=worker, args=(queue, i))
for i in range(3)
]
# Start workers
for w in workers:
w.start()
# Add tasks
for i in range(10):
queue.add_task(f"task-{i}", priority=i, payload={"value": i})
# Wait for all tasks to complete, then shutdown
time.sleep(2)
queue.shutdown()
# Wait for workers to finish
for w in workers:
w.join()
print("All done!")- Python
threadingdocumentation - Python
queuemodule — thread-safe queues - Real Python: Threading
- Raymond Hettinger: Concurrency — excellent PyCon talk