Last active
September 4, 2020 21:12
-
-
Save shinwachi/ec38fda01edb4bbb9717540c5aadc9e3 to your computer and use it in GitHub Desktop.
dagster config basic multiprocess
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| storage: | |
| filesystem: | |
| config: | |
| base_dir: | |
| "/tmp/" | |
| execution: | |
| multiprocess: | |
| config: | |
| max_concurrent: 0 # max cpu | |
| solids: | |
| read_and_split_solid: | |
| inputs: | |
| dataset_path: | |
| value: "/home/jovyan/some_data.dat" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dagster import ( | |
| Bool, | |
| Field, | |
| Int, | |
| String, | |
| execute_pipeline, | |
| pipeline, | |
| solid, | |
| OutputDefinition, | |
| Output, | |
| ) | |
| import os | |
| import glob | |
| import importlib.util | |
| import read_and_split_czi | |
| import chunk_some_analysis | |
| from dagster_dask import dask_executor | |
| from dagster import ModeDefinition, default_executors, multiprocess_executor, pipeline, solid | |
| import itertools | |
| SAMPLES = ["somesample"] | |
| SOURCEDIR = "/home/jovyan/some_data" | |
| dataset_path = os.path.join(SOURCEDIR, SAMPLES[0]+".czi") | |
| run_config = { | |
| "solids": { | |
| "read_and_split_czi_solid": {"inputs": {"dataset_path": {"value": dataset_path}}} | |
| } | |
| } | |
| #main(inpath:str, block_size_x:int=1000, block_size_y:int=1000 | |
| @solid( | |
| config_schema={ | |
| "block_size_x": Field( | |
| Int, | |
| default_value=2000, | |
| is_required=False, | |
| description=( | |
| "image chunk block size: width" | |
| ) | |
| ), | |
| "block_size_y": Field( | |
| Int, | |
| default_value=2000, | |
| is_required=False, | |
| description=( | |
| "image chunk block size: height" | |
| ) | |
| ) | |
| } | |
| ) | |
| def read_and_split_czi_solid(context, dataset_path: str): | |
| context.log.info(f"starting read_and_split_czi_solid") | |
| context.log.info(f"found {dataset_path}") | |
| ans = read_and_split_czi.main( | |
| dataset_path, | |
| block_size_x=context.solid_config["block_size_x"], | |
| block_size_y=context.solid_config["block_size_y"], | |
| ) | |
| context.log.info(f"got {ans}") | |
| # return ans | |
| yield Output(ans) | |
| @solid( | |
| output_defs=[ | |
| OutputDefinition(name='fold_one', is_required=True), | |
| OutputDefinition(name='fold_two', is_required=True), | |
| ], | |
| ) | |
| def split_into_two_folds(_, rows): | |
| ''' | |
| https://stackoverflow.com/questions/60970993/cross-validation-using-dagster | |
| ''' | |
| list1 = [] | |
| list2 = [] | |
| [il.append(x) for x,il in zip(rows, [list1, list2])] | |
| for k, v in ({'fold_one':list1, 'fold_two':list2}).items(): | |
| yield Output(v, k) | |
| # yield Output(list1, 'fold_one') | |
| # yield Output(list2, 'fold_two') | |
| env_n=14 # time: 9:17 2k block | |
| #env_n=28 # time: 7:55 2k block, 5 process failures | |
| # env_n=23 # time: 8:45 2k block, 3 process failures | |
| ods = [OutputDefinition(name=f"list{i}", is_required=True) for i in range(env_n)] | |
| @solid( | |
| output_defs=ods, | |
| ) | |
| def split_into_n_folds(context, rows): | |
| n=env_n | |
| lists = [[] for i in range(n)] | |
| [il.append(x) for x, il in zip(rows, itertools.cycle(lists))] | |
| # {"list1": [...], .... "listn": [...]} | |
| listdict = dict([(f"list{i}", il) for i, il in enumerate(lists)]) | |
| for il_name, il in listdict.items(): | |
| yield Output(il, il_name) | |
| @solid | |
| def chunk_some_analysis_solid(context, img_paths): | |
| ans = [] | |
| for img_path in img_paths: | |
| context.log.info(f"img_path {img_path}") | |
| ans.append(chunk_some_analysis.main(img_path)) | |
| return ans | |
| @solid | |
| def display_n_results(context, results): | |
| for result in results: | |
| context.log.info(f"result: {result}") | |
| @pipeline( | |
| mode_defs=[ | |
| ModeDefinition( | |
| executor_defs=default_executors, | |
| name="dev", | |
| resource_defs={} | |
| ) | |
| ], | |
| description="Analyze czi composite image file" | |
| ) | |
| def chunk_some_analysis_pipeline(): | |
| folds = split_into_n_folds(read_and_split_czi_solid()) | |
| chunk_some_analysis_solids = [ | |
| chunk_some_analysis_solid.alias(f"chunk_some_analysis_solid_{i}") | |
| for i,s in enumerate(folds) | |
| ] | |
| chunk_some_analysis_solids_responses = [ | |
| isolid(ifold) for isolid, ifold in zip(chunk_some_analysis_solids, folds) | |
| ] | |
| display_n_results( | |
| chunk_some_analysis_solids_responses | |
| ) | |
| if __name__ == "__main__": | |
| print(f"{run_config}") | |
| result = execute_pipeline(chunk_some_analysis_pipeline, run_config=run_config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment