Skip to content

Instantly share code, notes, and snippets.

@daryl314
Last active May 3, 2019 19:34
Show Gist options
  • Select an option

  • Save daryl314/3cbea4401700def00637d860c5a81a69 to your computer and use it in GitHub Desktop.

Select an option

Save daryl314/3cbea4401700def00637d860c5a81a69 to your computer and use it in GitHub Desktop.
Python: Parallel map/foreach over an input array without Pool pickling restrictions
from multiprocessing import Process, BoundedSemaphore, RawArray, Pipe, cpu_count, RawValue
from threading import Thread
import resource, time, sys, os, signal
try:
import psutil
except:
print("psutil import failure -- ParallelJob timeouts disabled")
################################################################################
class Worker(object):
"""Base class for a parallel processing worker"""
def __init__(self, fn, semaphore=None, postRelease=None):
self.fn = fn
self.semaphore = semaphore
self.postRelease = postRelease
def start(self, *args, **kwargs):
self.worker = self.WORKER(target=self.process, args=args, kwargs=kwargs)
if self.semaphore is not None:
self.semaphore.acquire()
self.tic()
self.worker.start()
return self
def process(self, *args, **kwargs):
out = self.fn(*args, **kwargs)
self.release()
return out
def join(self):
self.worker.join()
def release(self):
if self.semaphore is not None:
self.semaphore.release()
self.toc()
if self.postRelease is not None:
self.postRelease()
def tic(self):
self.start_time = (resource.getrusage(resource.RUSAGE_CHILDREN), time.time())
def toc(self):
usage_end = resource.getrusage(resource.RUSAGE_CHILDREN)
time_end = time.time()
self.usage = usage_end.ru_utime - self.start_time[0].ru_utime
self.time = time_end - self.start_time[1]
################################################################################
class ThreadWorker(Worker):
"""Parallel processing worker using a thread"""
WORKER = Thread
################################################################################
class ProcessWorker(Worker):
"""Parallel processing worker using a process"""
WORKER = Process
################################################################################
class SuicideWorker(ThreadWorker):
"""Parallel processing worker with timeout"""
def __init__(self, fn, timeout=None, **kwargs):
super(SuicideWorker,self).__init__(fn,**kwargs)
if timeout is None or 'psutil' in sys.modules:
self.timeout = timeout
else:
raise RuntimeError("psutil module is required for timeouts")
self.terminated = RawValue('b')
def process(self, *args, **kwargs):
proc = Process(target=self.fn, args=args, kwargs=kwargs)
proc.start()
proc.join(self.timeout)
self.terminated.value = proc.is_alive()
if self.terminated.value == 1:
self.toc()
self.killChildren(proc.pid)
proc.terminate()
self.release()
return self.terminated.value
@staticmethod
def killChildren(pid):
try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for p in children:
try:
os.kill(p.pid, signal.SIGKILL)
except OSError:
pass
################################################################################
class PipeWorker(SuicideWorker):
"""Parallel processing worker with pipe communication"""
def __init__(self, fn, timeout_result=None, **kwargs):
self.LPipe,self.RPipe = Pipe()
wrappedFn = self.wrapFn(fn, self.LPipe)
super(PipeWorker,self).__init__(wrappedFn,**kwargs)
self.value = None
self.timeout_result = timeout_result
def process(self, timeout, *args, **kwargs):
was_timeout = super(PipeWorker,self).process(timeout,*args,**kwargs)
if was_timeout:
self.value = self.timeout_result
else:
self.value = self.RPipe.recv()
self.LPipe.close()
self.RPipe.close()
def join(self):
self.worker.join()
return self.value
@staticmethod
def wrapFn(fn, pipe):
def sendToPipe(*args, **kwargs):
pipe.send(fn(*args, **kwargs))
return sendToPipe
################################################################################
class Job(object):
"""collection of parallel processing workers"""
def __init__(self, WorkerClass, fn, arr, workers, **kwargs):
self.semaphore = BoundedSemaphore(value=workers)
self.workers = [WorkerClass(fn,semaphore=self.semaphore,**kwargs).start(x) for x in arr]
def join(self):
return [worker.join() for worker in self.workers]
def workerTime(self):
return [worker.time for worker in self.workers]
def workerUsage(self):
return [worker.usage for worker in self.workers]
################################################################################
def parallelForEach(fn,
arr,
workers=cpu_count(),
withProcess=True,
timeout=None,
):
"""Iterate over an input array in parallel"""
if timeout is not None:
job = Job(SuicideWorker, fn, arr, workers, timeout=timeout)
elif withProcess:
job = Job(ProcessWorker, fn, arr, workers)
else:
job = Job(ThreadWorker, fn, arr, workers)
job.join()
return job
################################################################################
def parallelMap(fn,
arr,
workers=cpu_count(),
dtype=None,
withProcess=True,
timeout=None,
timeout_result=None,
return_job=False,
):
"""Map over an input array in parallel"""
assert withProcess or timeout is None, "Cannot have timeout with threads"
# need pipes for generic types running in a separate process
if withProcess and dtype is None:
job_args = {'timeout':timeout, 'timeout_result':timeout_result}
job = Job(PipeWorker, fn, arr, workers, **job_args)
if return_job:
job.join()
return job
else:
return job.join()
# otherwise can use a shared array or threads to avoid need for pipes
else:
out = [None]*len(arr) if dtype is None else RawArray(dtype,len(arr))
def runAndStore(i):
out[i] = fn(arr[i])
parallelForEach(runAndStore, range(len(arr)), workers, withProcess)
return out
################################################################################
class WorkerPool(object):
"""Parallel processing pool of persistent workers"""
def __init__(self, workers, withProcess=False, verbose=False, logger=sys.stdout, colors=None):
self.withProcess = withProcess # use a process (True) or thread (False)
self.verbose = verbose # are we logging?
self.colors = colors # colors to use for logging
self.logger = logger # logger for verbose output
self.workers = workers # workers
self.busy = RawArray('b', len(workers)) # busy/free status for workers
self.semaphore = BoundedSemaphore(value=len(workers)) # signal for an available worker
self.lock = BoundedSemaphore(value=1) # atomic acquire/release
def execute(self, fn, timeout=None):
"""Execute fn(worker) with worker from the pool; return a joinable Worker"""
self.log('WorkerPool.execute acquiring a worker')
worker, idx = self.acquireWorker()
freeWorker = lambda: self.releaseWorker(idx)
if timeout is None and not self.withProcess:
self.log("WorkerPool.execute launching ThreadWorker with worker[%d]" % idx, worker)
return ThreadWorker(fn, postRelease=freeWorker).start(worker)
elif timeout is None and self.withProcess:
self.log("WorkerPool.execute launching ProcessWorker with worker[%d]" % idx, worker)
return ProcessWorker(fn, postRelease=freeWorker).start(worker)
else:
self.log("WorkerPool.execute launching SuicideWorker with worker[%d]" % idx, worker)
return SuicideWorker(fn, postRelease=freeWorker, timeout=timeout).start(worker)
def acquireWorker(self):
"""Acquire a worker when one is available"""
self.semaphore.acquire() # wait for at least one available worker
self.lock.acquire() # start atomic update
idx = list(self.busy).index(0) # find first available worker
self.busy[idx] = 1 # worker is now busy
self.log("WorkerPool.acquireWorker Acquired worker %d" % idx, self.workers[idx])
self.lock.release() # end atomic update
return self.workers[idx], idx # return worker and its associated index
def releaseWorker(self, idx):
"""Release worker at specified index from service"""
self.lock.acquire() # start atomic update
self.busy[idx] = 0 # worker is now available
self.log("WorkerPool.releaseWorker Released worker %d" % idx, self.workers[idx])
self.lock.release() # end atomic update
self.semaphore.release() # signal that a worker is available
def log(self, txt, worker=None):
"""Log text to logger if running in verbose mode"""
if self.verbose:
if worker is not None and hasattr(worker, 'log'):
worker.log(txt)
else:
self.logger.write(txt + '\n')
################################################################################
if __name__ == '__main__':
def sleepyWorker(x):
print("Starting worker %d ..." % x)
time.sleep(x * 0.1)
print("Worker %d complete!" % x)
return x*2
timing = []
job = parallelMap(sleepyWorker, range(15), withProcess=True, timeout=1, return_job=True)
print("Result:", job.join())
import IPython;IPython.embed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment