Skip to content

Instantly share code, notes, and snippets.

@kireal
Last active August 24, 2021 21:48
Show Gist options
  • Select an option

  • Save kireal/75dfc5b73cd094a3cada83c1dd22f77e to your computer and use it in GitHub Desktop.

Select an option

Save kireal/75dfc5b73cd094a3cada83c1dd22f77e to your computer and use it in GitHub Desktop.
Scale up AWS calls with coroutines
# import goes here
# client that runs ecs task
def call_ecs_task(ecs_client, payload):
results = ecs_client.run_task()
pass
# client that runs lambda task
def call_lambda(lambda_client, payload):
results = lambda_client.invoke()
pass
# select ecs or lambda to run, add to execution loop
async def call_routine_async(executor, routine_payloads):
loop = asyncio.get_event_loop()
blocking_tasks = []
for payload in routine_payloads:
if Lamda: # if we run lambda
blocking_tasks.append(loop.run_in_executor(executor,
call_lambda,
lambda_client,
payload))
else: # if we run ecs
blocking_tasks.append(loop.run_in_executor(executor,
call_ecs_task,
ecs_client,
payload))
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed] # collect yield from lambdas
return results
# creates pool of workers and event loop
def do_serverless_tasks(tasks, parallelism):
executor = concurrent.futures.ThreadPoolExecutor(max_workers=parallelism)
event_loop = asyncio.get_event_loop()
routine_results = event_loop.run_until_complete(call_routine_async(executor, tasks))
return routine_results
# prepares list of jobs to pass into corutine
def prepare_and_run_tasks(input_request, parallelism):
# process input request and prepare list of tasks
tasks = # prepare tasks based on input_request
do_serverless_tasks(tasks, parallelism)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment