Skip to content

Instantly share code, notes, and snippets.

@shinwachi
Last active September 4, 2020 21:12
Show Gist options
  • Select an option

  • Save shinwachi/ec38fda01edb4bbb9717540c5aadc9e3 to your computer and use it in GitHub Desktop.

Select an option

Save shinwachi/ec38fda01edb4bbb9717540c5aadc9e3 to your computer and use it in GitHub Desktop.
dagster config basic multiprocess
storage:
filesystem:
config:
base_dir:
"/tmp/"
execution:
multiprocess:
config:
max_concurrent: 0 # max cpu
solids:
read_and_split_solid:
inputs:
dataset_path:
value: "/home/jovyan/some_data.dat"
from dagster import (
Bool,
Field,
Int,
String,
execute_pipeline,
pipeline,
solid,
OutputDefinition,
Output,
)
import os
import glob
import importlib.util
import read_and_split_czi
import chunk_some_analysis
from dagster_dask import dask_executor
from dagster import ModeDefinition, default_executors, multiprocess_executor, pipeline, solid
import itertools
SAMPLES = ["somesample"]
SOURCEDIR = "/home/jovyan/some_data"
dataset_path = os.path.join(SOURCEDIR, SAMPLES[0]+".czi")
run_config = {
"solids": {
"read_and_split_czi_solid": {"inputs": {"dataset_path": {"value": dataset_path}}}
}
}
#main(inpath:str, block_size_x:int=1000, block_size_y:int=1000
@solid(
config_schema={
"block_size_x": Field(
Int,
default_value=2000,
is_required=False,
description=(
"image chunk block size: width"
)
),
"block_size_y": Field(
Int,
default_value=2000,
is_required=False,
description=(
"image chunk block size: height"
)
)
}
)
def read_and_split_czi_solid(context, dataset_path: str):
context.log.info(f"starting read_and_split_czi_solid")
context.log.info(f"found {dataset_path}")
ans = read_and_split_czi.main(
dataset_path,
block_size_x=context.solid_config["block_size_x"],
block_size_y=context.solid_config["block_size_y"],
)
context.log.info(f"got {ans}")
# return ans
yield Output(ans)
@solid(
output_defs=[
OutputDefinition(name='fold_one', is_required=True),
OutputDefinition(name='fold_two', is_required=True),
],
)
def split_into_two_folds(_, rows):
'''
https://stackoverflow.com/questions/60970993/cross-validation-using-dagster
'''
list1 = []
list2 = []
[il.append(x) for x,il in zip(rows, [list1, list2])]
for k, v in ({'fold_one':list1, 'fold_two':list2}).items():
yield Output(v, k)
# yield Output(list1, 'fold_one')
# yield Output(list2, 'fold_two')
env_n=14 # time: 9:17 2k block
#env_n=28 # time: 7:55 2k block, 5 process failures
# env_n=23 # time: 8:45 2k block, 3 process failures
ods = [OutputDefinition(name=f"list{i}", is_required=True) for i in range(env_n)]
@solid(
output_defs=ods,
)
def split_into_n_folds(context, rows):
n=env_n
lists = [[] for i in range(n)]
[il.append(x) for x, il in zip(rows, itertools.cycle(lists))]
# {"list1": [...], .... "listn": [...]}
listdict = dict([(f"list{i}", il) for i, il in enumerate(lists)])
for il_name, il in listdict.items():
yield Output(il, il_name)
@solid
def chunk_some_analysis_solid(context, img_paths):
ans = []
for img_path in img_paths:
context.log.info(f"img_path {img_path}")
ans.append(chunk_some_analysis.main(img_path))
return ans
@solid
def display_n_results(context, results):
for result in results:
context.log.info(f"result: {result}")
@pipeline(
mode_defs=[
ModeDefinition(
executor_defs=default_executors,
name="dev",
resource_defs={}
)
],
description="Analyze czi composite image file"
)
def chunk_some_analysis_pipeline():
folds = split_into_n_folds(read_and_split_czi_solid())
chunk_some_analysis_solids = [
chunk_some_analysis_solid.alias(f"chunk_some_analysis_solid_{i}")
for i,s in enumerate(folds)
]
chunk_some_analysis_solids_responses = [
isolid(ifold) for isolid, ifold in zip(chunk_some_analysis_solids, folds)
]
display_n_results(
chunk_some_analysis_solids_responses
)
if __name__ == "__main__":
print(f"{run_config}")
result = execute_pipeline(chunk_some_analysis_pipeline, run_config=run_config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment