Last active
September 5, 2023 10:57
-
-
Save costrouc/8126f6a8eef8be29de780bd964bc4ab0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from rq import Queue | |
| from redis import Redis | |
| import tasks | |
| import time | |
| import contextlib | |
| @contextlib.contextmanager | |
| def timer(message: str): | |
| start_time = time.time() | |
| yield | |
| print(f'{message} took {time.time() - start_time:.2f} [s]') | |
| redis_conn = Redis() | |
| queue = Queue(connection=redis_conn) | |
| with timer('submitting tasks'): | |
| def special_function(a: int, b: int, c: str): | |
| return str(a + b) + c | |
| job = tasks.task_enqueue(queue, special_function, 1, 2, 'a') | |
| # result = job.latest_result(timeout=60) | |
| while job.get_status() != 'finished': | |
| time.sleep(0.1) | |
| print('result', job.result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| from rq import Queue | |
| from redis import Redis | |
| from dask import delayed | |
| import tasks | |
| @delayed | |
| def add(a, b): | |
| print('add', a, b) | |
| return a + b | |
| @delayed | |
| def multiply(a, b): | |
| import time | |
| print('multiply', a, b) | |
| time.sleep(1) | |
| return a * b | |
| f = add(multiply(1, 2), add(3, multiply(4, 5))) | |
| redis_conn = Redis() | |
| queue = Queue(connection=redis_conn) | |
| job = tasks.convert_dask_delayed_to_rq(f, queue) | |
| while job.get_status() != 'finished': | |
| time.sleep(0.1) | |
| print('result', job.result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| version: '3' | |
| services: | |
| redis: | |
| image: "redis:7" | |
| command: | |
| - redis-server | |
| ports: | |
| - "6379:6379" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: rqcloud | |
| channels: | |
| - conda-forge | |
| dependencies: | |
| - rq | |
| - redis-py | |
| - cloudpickle |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cloudpickle | |
| from rq import get_current_job | |
| def convert_dask_delayed_to_rq(f, queue): | |
| last_job = None | |
| jobs = {} | |
| for task_id in f.dask._toposort_layers(): | |
| depends_on = [] | |
| for dependency in f.dask.dependencies[task_id]: | |
| depends_on.append(jobs[dependency]) | |
| _args = [] | |
| for arg in f.dask[task_id][1:]: | |
| if arg in jobs: | |
| _args.append(f'task-{arg}') | |
| else: | |
| _args.append(arg) | |
| job = task_enqueue( | |
| queue, | |
| f.dask[task_id][0], | |
| *_args, | |
| depends_on=depends_on, | |
| job_id=task_id | |
| ) | |
| jobs[task_id] = job | |
| last_job = job | |
| return last_job | |
| def run_task(pickled_function: bytes, *args, **kwargs): | |
| current_job = get_current_job() | |
| dependencies = {} | |
| for dependency in current_job.fetch_dependencies(): | |
| dependencies[dependency.id] = dependency | |
| print(dependencies) | |
| _args = [] | |
| for arg in args: | |
| if isinstance(arg, str) and arg.startswith('task-'): | |
| _args.append(dependencies[arg[5:]].result) | |
| else: | |
| _args.append(arg) | |
| f = cloudpickle.loads(pickled_function) | |
| return f(*_args, **kwargs) | |
| def task_enqueue(queue, f, *args, **kwargs): | |
| pickled_function = cloudpickle.dumps(f) | |
| job = queue.enqueue(run_task, pickled_function, *args, **kwargs) | |
| return job |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from rq import Connection, Queue, Worker | |
| if __name__ == '__main__': | |
| with Connection(): | |
| q = Queue() | |
| Worker(q).work() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Let's make this a little more honest, since this is for demonstration purposes
run_taskhere, internallyrqjust extracts the "path", i.e."tasks.run_task"and puts that into the queue.run_taskfunction needs to be available on the worker, whiletask_enqueueneeds to be available on the node that handles the queue. Unless we operate on a single node, it is unlikely that both have access to same local file.