Skip to content

Instantly share code, notes, and snippets.

@saber-solooki
Created September 5, 2025 08:57
Show Gist options
  • Select an option

  • Save saber-solooki/468f300187e8302bad08edc48ff69d43 to your computer and use it in GitHub Desktop.

Select an option

Save saber-solooki/468f300187e8302bad08edc48ff69d43 to your computer and use it in GitHub Desktop.
Sync/Async performance comparison
import structlog
import time
import asyncio
import multiprocessing
from config import settings
log = structlog.get_logger()
async def sync_worker_coroutine(worker_id: int, process_id: int, iterations: int = 100):
"""
A coroutine worker that performs synchronous logging operations.
Args:
worker_id: ID of the worker within the process
process_id: ID of the process
iterations: Number of log iterations to perform
"""
for i in range(iterations):
log.info("sync_log", iteration=i, worker_id=worker_id, process_id=process_id)
async def async_worker_coroutine(worker_id: int, process_id: int, iterations: int = 100):
"""
A coroutine worker that performs asynchronous logging operations.
Args:
worker_id: ID of the worker within the process
process_id: ID of the process
iterations: Number of log iterations to perform
"""
for i in range(iterations):
await log.ainfo("async_log", iteration=i, worker_id=worker_id, process_id=process_id)
async def process_task(process_id: int, m_workers: int = 5, iterations: int = 100, use_async: bool = False):
"""
Task that runs in each process, creating m coroutine workers.
Args:
process_id: ID of the current process
m_workers: Number of coroutine workers to create in this process
iterations: Number of log iterations per worker
use_async: If True, use async logging; if False, use sync logging
"""
log_type = "async" if use_async else "sync"
print(f"Process {process_id} starting with {m_workers} workers, {iterations} iterations each ({log_type} logging)")
start_time = time.time()
# Create m coroutine workers
tasks = []
worker_func = async_worker_coroutine if use_async else sync_worker_coroutine
for worker_id in range(m_workers):
task = asyncio.create_task(
worker_func(worker_id, process_id, iterations)
)
tasks.append(task)
# Wait for all workers to complete
await asyncio.gather(*tasks)
end_time = time.time()
print(f"Process {process_id} completed in {end_time - start_time:.3f}s ({log_type} logging)")
def run_process(process_id: int, m_workers: int, iterations: int, use_async: bool):
"""
Function to run in each separate process.
Args:
process_id: ID of the current process
m_workers: Number of coroutine workers per process
iterations: Number of log iterations per worker
use_async: If True, use async logging; if False, use sync logging
"""
asyncio.run(process_task(process_id, m_workers, iterations, use_async))
def run_concurrent_logging_test(n_processes: int = 3, m_workers: int = 5, iterations: int = 100, use_async: bool = False):
"""
Run concurrent logging test with n processes, each having m coroutine workers.
Args:
n_processes: Number of processes to run concurrently
m_workers: Number of coroutine workers per process
iterations: Number of log iterations per worker
use_async: If True, use async logging; if False, use sync logging
"""
log_type = "async" if use_async else "sync"
print(f"Starting concurrent {log_type} logging test:")
print(f"- {n_processes} processes")
print(f"- {m_workers} workers per process")
print(f"- {iterations} iterations per worker")
print(f"- Total expected log messages: {n_processes * m_workers * iterations}")
print("-" * 50)
start_time = time.time()
# Create and start n processes
processes = []
for process_id in range(n_processes):
process = multiprocessing.Process(
target=run_process,
args=(process_id, m_workers, iterations, use_async)
)
processes.append(process)
process.start()
# Wait for all processes to complete
for process in processes:
process.join()
end_time = time.time()
print("-" * 50)
print(f"All {log_type} logging processes completed in {end_time - start_time:.3f}s")
return end_time - start_time
if __name__ == "__main__":
# Test parameters - adjust as needed
n_processes = 10 # Number of processes
m_workers = 10 # Number of coroutine workers per process
iterations = 100000 # Number of log iterations per worker
print("=" * 60)
print("CONCURRENT LOGGING PERFORMANCE COMPARISON")
print("=" * 60)
# Run sync logging test
print("\n🔄 RUNNING SYNC LOGGING TEST:")
sync_time = run_concurrent_logging_test(n_processes, m_workers, iterations, use_async=False)
print("\n" + "=" * 60)
# Run async logging test
print("\n⚡ RUNNING ASYNC LOGGING TEST:")
async_time = run_concurrent_logging_test(n_processes, m_workers, iterations, use_async=True)
# Performance comparison
print("\n" + "=" * 60)
print("📊 PERFORMANCE COMPARISON:")
print(f"Sync logging total time: {sync_time:.3f}s")
print(f"Async logging total time: {async_time:.3f}s")
if sync_time > async_time:
speedup = sync_time / async_time
print(f"🚀 Async logging is {speedup:.2f}x faster")
else:
slowdown = async_time / sync_time
print(f"⚠️ Sync logging is {slowdown:.2f}x faster")
print("=" * 60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment