-
-
Save dbaynard/0b5609ee25bec43b4fc2aeb4c37b6a4a to your computer and use it in GitHub Desktop.
| const applyAsync = (f, pool) => (x, i) => | |
| Promise.resolve(x) | |
| .then((v) => new Promise((r) => pool.push({ i, v, r }))) | |
| .then(f); | |
| const worker = (getTimeout) => (it, i) => { | |
| const { done, value } = it.next(); | |
| if (done) return; | |
| const { v, r } = value; | |
| setTimeout( | |
| () => | |
| Promise.resolve(r(v)).then(() => { | |
| setTimeout(() => { | |
| worker(getTimeout)(it, i); | |
| }, getTimeout()); | |
| }), | |
| 0 | |
| ); | |
| }; | |
| const runWorkers = (n, queue, getTimeout) => | |
| queueMicrotask(() => | |
| new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout)) | |
| ); | |
| const mapConcurrentlyN = (n, f, xs) => { | |
| const queue = []; | |
| const ys = xs.map(applyAsync(f, queue)); | |
| runWorkers(n, queue, () => 300); | |
| return ys; | |
| }; |
| type Job<A, B> = { i: number; v: A; r: (_: B) => void }; | |
| const applyAsync = | |
| <A, B>(f: (_: A) => B, pool: Job<A, B>[]) => | |
| (x: A, i: number): Promise<B> => | |
| Promise.resolve(x) | |
| .then((v) => new Promise((r) => pool.push({ i, v, r }))) | |
| .then(f); | |
| const worker = | |
| (getTimeout: () => number) => | |
| <A, B>(it: Iterable<Job<A, B>>, i: number) => { | |
| const { done, value } = it.next(); | |
| if (done) return; | |
| const { v, r } = value; | |
| setTimeout( | |
| () => | |
| Promise.resolve(r(v)).then(() => { | |
| setTimeout(() => { | |
| worker(getTimeout)(it, i); | |
| }, getTimeout()); | |
| }), | |
| 0 | |
| ); | |
| }; | |
| const runWorkers = <A, B>( | |
| n: number, | |
| queue: Job<A, B>[], | |
| getTimeout: () => number | |
| ) => | |
| queueMicrotask(() => | |
| new Array(n).fill(queue.splice(0).values()).forEach(worker(getTimeout)) | |
| ); | |
| const mapConcurrentlyN = <A, B>( | |
| n: number, | |
| f: (_: A) => B, | |
| xs: A[] | |
| ): Promise<B>[] => { | |
| const queue = []; | |
| const ys = xs.map(applyAsync(f, queue)); | |
| runWorkers(n, queue, () => 300); | |
| return ys; | |
| }; |
I should wrap the call establishing the worker threads in a new Promise, pass the resolve function to the worker, and then call resolve when the last task completes (there’s some subtlety around completion). I can call resolve multiple times javascript - What happens if i reject / resolve multiple times in Kriskowal's q? - Stack Overflow
Yeah that didn’t work but I used queue.splice(0) when generating the iterator, instead.
I might review the api/naming, at some point. I’ve kept the getTimeout function as I want that in the worker API. applyAsync should probably be named queue 🤷
OK but just so I remember next time… I create the promises for each value in the original array in microtasks, and so I have to call queueMicrotask in runWorkers in order to run the queue at the end. Maybe I should just use setTimeout(…, 0), though 🤔
mapConcurrentlyN is (possibly) very broken if the array passed in contains non-trivial promises 🤷
Oh, and I should pass performance.now() to getTimeout, or something similar, so the timeout function is actually useful.
Better than most of these