Last active
May 3, 2019 19:34
-
-
Save daryl314/3cbea4401700def00637d860c5a81a69 to your computer and use it in GitHub Desktop.
Python: Parallel map/foreach over an input array without Pool pickling restrictions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from multiprocessing import Process, BoundedSemaphore, RawArray, Pipe, cpu_count, RawValue | |
| from threading import Thread | |
| import resource, time, sys, os, signal | |
| try: | |
| import psutil | |
| except: | |
| print("psutil import failure -- ParallelJob timeouts disabled") | |
| ################################################################################ | |
| class Worker(object): | |
| """Base class for a parallel processing worker""" | |
| def __init__(self, fn, semaphore=None, postRelease=None): | |
| self.fn = fn | |
| self.semaphore = semaphore | |
| self.postRelease = postRelease | |
| def start(self, *args, **kwargs): | |
| self.worker = self.WORKER(target=self.process, args=args, kwargs=kwargs) | |
| if self.semaphore is not None: | |
| self.semaphore.acquire() | |
| self.tic() | |
| self.worker.start() | |
| return self | |
| def process(self, *args, **kwargs): | |
| out = self.fn(*args, **kwargs) | |
| self.release() | |
| return out | |
| def join(self): | |
| self.worker.join() | |
| def release(self): | |
| if self.semaphore is not None: | |
| self.semaphore.release() | |
| self.toc() | |
| if self.postRelease is not None: | |
| self.postRelease() | |
| def tic(self): | |
| self.start_time = (resource.getrusage(resource.RUSAGE_CHILDREN), time.time()) | |
| def toc(self): | |
| usage_end = resource.getrusage(resource.RUSAGE_CHILDREN) | |
| time_end = time.time() | |
| self.usage = usage_end.ru_utime - self.start_time[0].ru_utime | |
| self.time = time_end - self.start_time[1] | |
| ################################################################################ | |
| class ThreadWorker(Worker): | |
| """Parallel processing worker using a thread""" | |
| WORKER = Thread | |
| ################################################################################ | |
| class ProcessWorker(Worker): | |
| """Parallel processing worker using a process""" | |
| WORKER = Process | |
| ################################################################################ | |
| class SuicideWorker(ThreadWorker): | |
| """Parallel processing worker with timeout""" | |
| def __init__(self, fn, timeout=None, **kwargs): | |
| super(SuicideWorker,self).__init__(fn,**kwargs) | |
| if timeout is None or 'psutil' in sys.modules: | |
| self.timeout = timeout | |
| else: | |
| raise RuntimeError("psutil module is required for timeouts") | |
| self.terminated = RawValue('b') | |
| def process(self, *args, **kwargs): | |
| proc = Process(target=self.fn, args=args, kwargs=kwargs) | |
| proc.start() | |
| proc.join(self.timeout) | |
| self.terminated.value = proc.is_alive() | |
| if self.terminated.value == 1: | |
| self.toc() | |
| self.killChildren(proc.pid) | |
| proc.terminate() | |
| self.release() | |
| return self.terminated.value | |
| @staticmethod | |
| def killChildren(pid): | |
| try: | |
| parent = psutil.Process(pid) | |
| except psutil.NoSuchProcess: | |
| return | |
| children = parent.children(recursive=True) | |
| for p in children: | |
| try: | |
| os.kill(p.pid, signal.SIGKILL) | |
| except OSError: | |
| pass | |
| ################################################################################ | |
| class PipeWorker(SuicideWorker): | |
| """Parallel processing worker with pipe communication""" | |
| def __init__(self, fn, timeout_result=None, **kwargs): | |
| self.LPipe,self.RPipe = Pipe() | |
| wrappedFn = self.wrapFn(fn, self.LPipe) | |
| super(PipeWorker,self).__init__(wrappedFn,**kwargs) | |
| self.value = None | |
| self.timeout_result = timeout_result | |
| def process(self, timeout, *args, **kwargs): | |
| was_timeout = super(PipeWorker,self).process(timeout,*args,**kwargs) | |
| if was_timeout: | |
| self.value = self.timeout_result | |
| else: | |
| self.value = self.RPipe.recv() | |
| self.LPipe.close() | |
| self.RPipe.close() | |
| def join(self): | |
| self.worker.join() | |
| return self.value | |
| @staticmethod | |
| def wrapFn(fn, pipe): | |
| def sendToPipe(*args, **kwargs): | |
| pipe.send(fn(*args, **kwargs)) | |
| return sendToPipe | |
| ################################################################################ | |
| class Job(object): | |
| """collection of parallel processing workers""" | |
| def __init__(self, WorkerClass, fn, arr, workers, **kwargs): | |
| self.semaphore = BoundedSemaphore(value=workers) | |
| self.workers = [WorkerClass(fn,semaphore=self.semaphore,**kwargs).start(x) for x in arr] | |
| def join(self): | |
| return [worker.join() for worker in self.workers] | |
| def workerTime(self): | |
| return [worker.time for worker in self.workers] | |
| def workerUsage(self): | |
| return [worker.usage for worker in self.workers] | |
| ################################################################################ | |
| def parallelForEach(fn, | |
| arr, | |
| workers=cpu_count(), | |
| withProcess=True, | |
| timeout=None, | |
| ): | |
| """Iterate over an input array in parallel""" | |
| if timeout is not None: | |
| job = Job(SuicideWorker, fn, arr, workers, timeout=timeout) | |
| elif withProcess: | |
| job = Job(ProcessWorker, fn, arr, workers) | |
| else: | |
| job = Job(ThreadWorker, fn, arr, workers) | |
| job.join() | |
| return job | |
| ################################################################################ | |
| def parallelMap(fn, | |
| arr, | |
| workers=cpu_count(), | |
| dtype=None, | |
| withProcess=True, | |
| timeout=None, | |
| timeout_result=None, | |
| return_job=False, | |
| ): | |
| """Map over an input array in parallel""" | |
| assert withProcess or timeout is None, "Cannot have timeout with threads" | |
| # need pipes for generic types running in a separate process | |
| if withProcess and dtype is None: | |
| job_args = {'timeout':timeout, 'timeout_result':timeout_result} | |
| job = Job(PipeWorker, fn, arr, workers, **job_args) | |
| if return_job: | |
| job.join() | |
| return job | |
| else: | |
| return job.join() | |
| # otherwise can use a shared array or threads to avoid need for pipes | |
| else: | |
| out = [None]*len(arr) if dtype is None else RawArray(dtype,len(arr)) | |
| def runAndStore(i): | |
| out[i] = fn(arr[i]) | |
| parallelForEach(runAndStore, range(len(arr)), workers, withProcess) | |
| return out | |
| ################################################################################ | |
| class WorkerPool(object): | |
| """Parallel processing pool of persistent workers""" | |
| def __init__(self, workers, withProcess=False, verbose=False, logger=sys.stdout, colors=None): | |
| self.withProcess = withProcess # use a process (True) or thread (False) | |
| self.verbose = verbose # are we logging? | |
| self.colors = colors # colors to use for logging | |
| self.logger = logger # logger for verbose output | |
| self.workers = workers # workers | |
| self.busy = RawArray('b', len(workers)) # busy/free status for workers | |
| self.semaphore = BoundedSemaphore(value=len(workers)) # signal for an available worker | |
| self.lock = BoundedSemaphore(value=1) # atomic acquire/release | |
| def execute(self, fn, timeout=None): | |
| """Execute fn(worker) with worker from the pool; return a joinable Worker""" | |
| self.log('WorkerPool.execute acquiring a worker') | |
| worker, idx = self.acquireWorker() | |
| freeWorker = lambda: self.releaseWorker(idx) | |
| if timeout is None and not self.withProcess: | |
| self.log("WorkerPool.execute launching ThreadWorker with worker[%d]" % idx, worker) | |
| return ThreadWorker(fn, postRelease=freeWorker).start(worker) | |
| elif timeout is None and self.withProcess: | |
| self.log("WorkerPool.execute launching ProcessWorker with worker[%d]" % idx, worker) | |
| return ProcessWorker(fn, postRelease=freeWorker).start(worker) | |
| else: | |
| self.log("WorkerPool.execute launching SuicideWorker with worker[%d]" % idx, worker) | |
| return SuicideWorker(fn, postRelease=freeWorker, timeout=timeout).start(worker) | |
| def acquireWorker(self): | |
| """Acquire a worker when one is available""" | |
| self.semaphore.acquire() # wait for at least one available worker | |
| self.lock.acquire() # start atomic update | |
| idx = list(self.busy).index(0) # find first available worker | |
| self.busy[idx] = 1 # worker is now busy | |
| self.log("WorkerPool.acquireWorker Acquired worker %d" % idx, self.workers[idx]) | |
| self.lock.release() # end atomic update | |
| return self.workers[idx], idx # return worker and its associated index | |
| def releaseWorker(self, idx): | |
| """Release worker at specified index from service""" | |
| self.lock.acquire() # start atomic update | |
| self.busy[idx] = 0 # worker is now available | |
| self.log("WorkerPool.releaseWorker Released worker %d" % idx, self.workers[idx]) | |
| self.lock.release() # end atomic update | |
| self.semaphore.release() # signal that a worker is available | |
| def log(self, txt, worker=None): | |
| """Log text to logger if running in verbose mode""" | |
| if self.verbose: | |
| if worker is not None and hasattr(worker, 'log'): | |
| worker.log(txt) | |
| else: | |
| self.logger.write(txt + '\n') | |
| ################################################################################ | |
| if __name__ == '__main__': | |
| def sleepyWorker(x): | |
| print("Starting worker %d ..." % x) | |
| time.sleep(x * 0.1) | |
| print("Worker %d complete!" % x) | |
| return x*2 | |
| timing = [] | |
| job = parallelMap(sleepyWorker, range(15), withProcess=True, timeout=1, return_job=True) | |
| print("Result:", job.join()) | |
| import IPython;IPython.embed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment