Skip to content

Instantly share code, notes, and snippets.

@fanannan
Last active September 27, 2024 08:08
Show Gist options
  • Select an option

  • Save fanannan/413ce3b592b120a7c20f41e1affa7609 to your computer and use it in GitHub Desktop.

Select an option

Save fanannan/413ce3b592b120a7c20f41e1affa7609 to your computer and use it in GitHub Desktop.
FunctionExecutor a robust, multi-threaded, non-blocking task execution utility

FunctionExecutor

FunctionExecutor is a robust, multi-threaded, non-blocking task execution utility for Python. It allows you to queue functions for asynchronous execution, making it ideal for scenarios where you need to run multiple tasks concurrently without blocking the main thread.

Key Features

  • Persistent Availability: Once initialized, it remains active and ready to accept tasks throughout your program's lifetime.
  • Non-blocking Execution: Add tasks to the queue without waiting for them to complete.
  • Multi-threaded: Executes tasks concurrently using a pool of worker threads.
  • Automatic Thread Management: No need to manually start or stop threads.
  • Flexible Queue Size: Option to set a maximum queue size to prevent memory issues.
  • Error Handling: Catches and logs exceptions without stopping execution of other tasks.
  • NOTE: The executor doesn't return task results directly.

Installation

Copy the FunctionExecutor class into your project. Ensure you have Python 3.6 or later installed.

Usage

Basic Usage

from function_executor import FunctionExecutor

def my_task(name):
    return f"Task {name} completed"

# Create a global executor with 3 worker threads and a max queue size of 10
executor = FunctionExecutor(num_workers=3, max_queue_size=10)

# Add tasks to the queue
for i in range(5):
    executor.add_function(my_task, f"Task-{i}")

# The code here runs immediately, not waiting for tasks to complete

Persistent Availability

The FunctionExecutor remains active and can accept new tasks at any time, even after previous tasks have completed:

# This can be called anytime, anywhere in your code
executor.add_function(task1, "arg1")

# Much later in your program, you can still use the same executor
executor.add_function(task2, "arg2")

Waiting for Completion

If you need to wait for all current tasks to complete:

executor.wait_for_completion(timeout=60)  # Wait up to 60 seconds

How It Works

Task Allocation

  • All tasks are added to a single shared queue.
  • Multiple worker threads continually attempt to fetch tasks from this shared queue.
  • The first available thread that successfully retrieves a task executes it.
  • This "work-stealing" model ensures automatic workload balancing across threads.

Note: The order of task execution is not guaranteed to match the order of task addition.

Shutdown Behavior

Calling shutdown() is optional and depends on your use case:

  • If resource management is critical: Call shutdown() explicitly when you're done.
  • If resource consumption is not a concern: You can omit calling shutdown(). Resources will be cleaned up when your program exits.
# Optional: Shutdown the executor when you're done
executor.shutdown()

When shutdown() is called:

  • No new tasks can be added.
  • The executor waits for ongoing tasks to complete.
  • Worker threads are terminated.
  • Resources are released.

Best Practices

  1. Initialize FunctionExecutor as a global variable for persistent availability.
  2. Set an appropriate max_queue_size to prevent memory issues in long-running applications.
  3. Implement proper error handling within your tasks.
  4. Use wait_for_completion() if you need to ensure all tasks are finished before proceeding.
  5. Call shutdown() explicitly if you need immediate resource release, particularly in long-running applications.
  6. If resource management is not critical and your program is short-lived, you can omit calling shutdown().

Limitations

  • Tasks are executed in parallel, so be mindful of thread safety in your task implementations.
  • The executor doesn't return task results directly. Use other mechanisms to handle task outputs if necessary.
  • Task execution order is not guaranteed to match the order of task addition.
  • The executor does not provide built-in task prioritization.

Contributing

Feel free to submit issues or pull requests if you have suggestions for improvements or find any bugs.

License

This project is open-source and available under the MIT License.

import queue
from typing import Callable, Any, Optional, List
import threading
import time
import atexit
import logging
import weakref
class FunctionExecutor:
def __init__(self, num_workers: int = 3, max_queue_size: int = 0):
"""
Initialize the FunctionExecutor.
:param num_workers: Number of worker threads to create
:param max_queue_size: Maximum number of tasks in the queue (0 means unlimited)
"""
if num_workers < 1:
raise ValueError("Number of workers must be at least 1")
self.queue: queue.Queue = queue.Queue(maxsize=max_queue_size)
self.workers: List[threading.Thread] = []
self.num_workers: int = num_workers
self.shutdown_flag: threading.Event = threading.Event()
self.logger: logging.Logger = logging.getLogger(__name__)
self._lock: threading.Lock = threading.Lock()
self._shutdown_lock: threading.Lock = threading.Lock()
self._atexit_registered: bool = False
self._started: bool = False
def __enter__(self) -> 'FunctionExecutor':
"""Start the executor when entering a context manager."""
self.start()
return self
def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None:
"""Shutdown the executor when exiting a context manager."""
self.shutdown()
def start(self) -> None:
"""Start the worker threads if not already started."""
with self._lock:
if not self._started:
self._start_workers()
self._started = True
if not self._atexit_registered:
# Register shutdown method to be called on program exit
atexit.register(weakref.ref(self)().shutdown)
self._atexit_registered = True
def add_function(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> bool:
"""
Add a function and its arguments to the queue.
:param func: Function to be executed
:param args: Positional arguments for the function
:param kwargs: Keyword arguments for the function
:return: True if function was added successfully, False otherwise
"""
if self.shutdown_flag.is_set():
self.logger.warning("Cannot add functions after shutdown has been initiated.")
return False
try:
self.queue.put_nowait((func, args, kwargs))
return True
except queue.Full:
self.logger.warning("Queue is full. Function not added.")
return False
def _start_workers(self) -> None:
"""Start the worker threads."""
for _ in range(self.num_workers):
worker = threading.Thread(target=self._worker_loop)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker_loop(self) -> None:
"""Main loop for worker threads."""
while not self.shutdown_flag.is_set():
try:
func, args, kwargs = self.queue.get(timeout=1)
try:
result = func(*args, **kwargs)
self.logger.info(f"Function {func.__name__} executed. Result: {result}")
except Exception as e:
self.logger.error(f"Error executing function {func.__name__}: {str(e)}", exc_info=True)
finally:
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"Unexpected error in worker thread: {str(e)}", exc_info=True)
def is_running(self) -> bool:
"""Check if the executor has any active workers."""
return any(worker.is_alive() for worker in self.workers)
def shutdown(self, timeout: Optional[float] = None) -> None:
"""
Signal workers to shut down and wait for them to finish.
:param timeout: Maximum time to wait for each worker thread to finish
"""
with self._shutdown_lock:
if self.shutdown_flag.is_set():
return # Shutdown already in progress
self.shutdown_flag.set()
for worker in self.workers:
worker.join(timeout)
still_alive = [w for w in self.workers if w.is_alive()]
if still_alive:
self.logger.warning(f"{len(still_alive)} worker threads did not shut down within the specified timeout.")
with self._lock:
self.workers = []
if self._atexit_registered:
atexit.unregister(weakref.ref(self)().shutdown)
self._atexit_registered = False
def wait_for_completion(self, timeout: Optional[float] = None) -> None:
"""
Wait for all queued tasks to complete.
:param timeout: Maximum time to wait for task completion
"""
try:
self.queue.join(timeout=timeout)
except queue.Empty:
pass
except Exception as e:
self.logger.error(f"Error while waiting for task completion: {str(e)}", exc_info=True)
# Example usage
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
def slow_function(name: str, duration: int) -> str:
time.sleep(duration)
return f"{name} completed after {duration} seconds"
def error_function() -> None:
raise ValueError("This is a test error")
print("Creating executor...")
with FunctionExecutor(num_workers=3, max_queue_size=10) as executor:
print("Adding functions...")
for i in range(10):
executor.add_function(slow_function, f"Task {i}", i % 3 + 1)
executor.add_function(error_function)
print("Functions added. This line executes immediately.")
print("Exited 'with' block. This line also executes immediately.")
print("Main thread continues...")
time.sleep(2) # Simulate some work in the main thread
print("Main thread work complete.")
# Wait for all tasks to complete before exiting
executor.wait_for_completion(timeout=15)
print("Program exit. Any remaining tasks will be terminated.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment