Created
March 30, 2023 10:36
-
-
Save DnPlas/617056ba89d97d67972e2f97882dc21a to your computer and use it in GitHub Desktop.
pipeline admin namespace
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| apiVersion: argoproj.io/v1alpha1 | |
| kind: Workflow | |
| metadata: | |
| generateName: map-reduce- | |
| namespace: admin | |
| annotations: | |
| workflows.argoproj.io/description: | | |
| This workflow demonstrates map-reduce using "key-only" artifacts. | |
| The first task "split" produces a number of parts, each in the form of a JSON document, saving it to a bucket. | |
| Each "map" task then reads those documents, performs a map operation, and writes them out to a new bucket. | |
| Finally, "reduce" merges all the mapped documents into a final document. | |
| workflows.argoproj.io/version: '>= 3.0.0' | |
| spec: | |
| entrypoint: main | |
| arguments: | |
| parameters: | |
| - name: numParts | |
| value: "4" | |
| templates: | |
| - name: main | |
| dag: | |
| tasks: | |
| - name: split | |
| template: split | |
| arguments: | |
| parameters: | |
| - name: numParts | |
| value: "{{workflow.parameters.numParts}}" | |
| - name: map | |
| template: map | |
| arguments: | |
| parameters: | |
| - name: partId | |
| value: '{{item}}' | |
| artifacts: | |
| - name: part | |
| s3: | |
| key: "{{workflow.name}}/parts/{{item}}.json" | |
| depends: "split" | |
| withParam: '{{tasks.split.outputs.result}}' | |
| - name: reduce | |
| template: reduce | |
| depends: "map" | |
| # The `split` task creates a number of "parts". Each part has a unique ID (index). | |
| # This task writes one "part file" for each of pieces of processing that needs doing, into to single directory | |
| # which is then saved as an output artifact. | |
| # Finally, it dumps a list of part ID to stdout. | |
| - name: split | |
| inputs: | |
| parameters: | |
| - name: numParts | |
| script: | |
| image: python:alpine3.6 | |
| command: | |
| - python | |
| source: | | |
| import json | |
| import os | |
| import sys | |
| os.mkdir("/mnt/out") | |
| partIds = list(map(lambda x: str(x), range({{inputs.parameters.numParts}}))) | |
| for i, partId in enumerate(partIds, start=1): | |
| with open("/mnt/out/" + partId + ".json", "w") as f: | |
| json.dump({"foo": i}, f) | |
| json.dump(partIds, sys.stdout) | |
| outputs: | |
| artifacts: | |
| - name: parts | |
| path: /mnt/out | |
| archive: | |
| none: { } | |
| s3: | |
| key: "{{workflow.name}}/parts" | |
| # One `map` per part ID is started. Finds its own "part file" under `/mnt/in/part.json`. | |
| # Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory". | |
| - name: map | |
| inputs: | |
| parameters: | |
| - name: partId | |
| artifacts: | |
| - name: part | |
| path: /mnt/in/part.json | |
| script: | |
| image: python:alpine3.6 | |
| command: | |
| - python | |
| source: | | |
| import json | |
| import os | |
| import sys | |
| os.mkdir("/mnt/out") | |
| with open("/mnt/in/part.json") as f: | |
| part = json.load(f) | |
| with open("/mnt/out/part.json", "w") as f: | |
| json.dump({"bar": part["foo"] * 2}, f) | |
| outputs: | |
| artifacts: | |
| - name: part | |
| path: /mnt/out/part.json | |
| archive: | |
| none: { } | |
| s3: | |
| key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json" | |
| # The `reduce` task takes the "results directory" and returns a single result. | |
| - name: reduce | |
| inputs: | |
| artifacts: | |
| - name: results | |
| path: /mnt/in | |
| s3: | |
| key: "{{workflow.name}}/results" | |
| script: | |
| image: python:alpine3.6 | |
| command: | |
| - python | |
| source: | | |
| import json | |
| import os | |
| import sys | |
| total = 0 | |
| os.mkdir("/mnt/out") | |
| for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))): | |
| result = json.load(f) | |
| total = total + result["bar"] | |
| with open("/mnt/out/total.json" , "w") as f: | |
| json.dump({"total": total}, f) | |
| outputs: | |
| artifacts: | |
| - name: total | |
| path: /mnt/out/total.json | |
| archive: | |
| none: { } | |
| s3: | |
| key: "{{workflow.name}}/total.json" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment