Skip to content

Instantly share code, notes, and snippets.

@DnPlas
Created March 30, 2023 10:36
Show Gist options
  • Select an option

  • Save DnPlas/617056ba89d97d67972e2f97882dc21a to your computer and use it in GitHub Desktop.

Select an option

Save DnPlas/617056ba89d97d67972e2f97882dc21a to your computer and use it in GitHub Desktop.
pipeline admin namespace
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: map-reduce-
namespace: admin
annotations:
workflows.argoproj.io/description: |
This workflow demonstrates map-reduce using "key-only" artifacts.
The first task "split" produces a number of parts, each in the form of a JSON document, saving it to a bucket.
Each "map" task then reads those documents, performs a map operation, and writes them out to a new bucket.
Finally, "reduce" merges all the mapped documents into a final document.
workflows.argoproj.io/version: '>= 3.0.0'
spec:
entrypoint: main
arguments:
parameters:
- name: numParts
value: "4"
templates:
- name: main
dag:
tasks:
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
artifacts:
- name: part
s3:
key: "{{workflow.name}}/parts/{{item}}.json"
depends: "split"
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
depends: "map"
# The `split` task creates a number of "parts". Each part has a unique ID (index).
# This task writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved as an output artifact.
# Finally, it dumps a list of part ID to stdout.
- name: split
inputs:
parameters:
- name: numParts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/mnt/out")
partIds = list(map(lambda x: str(x), range({{inputs.parameters.numParts}})))
for i, partId in enumerate(partIds, start=1):
with open("/mnt/out/" + partId + ".json", "w") as f:
json.dump({"foo": i}, f)
json.dump(partIds, sys.stdout)
outputs:
artifacts:
- name: parts
path: /mnt/out
archive:
none: { }
s3:
key: "{{workflow.name}}/parts"
# One `map` per part ID is started. Finds its own "part file" under `/mnt/in/part.json`.
# Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory".
- name: map
inputs:
parameters:
- name: partId
artifacts:
- name: part
path: /mnt/in/part.json
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
outputs:
artifacts:
- name: part
path: /mnt/out/part.json
archive:
none: { }
s3:
key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json"
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
artifacts:
- name: results
path: /mnt/in
s3:
key: "{{workflow.name}}/results"
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
total = 0
os.mkdir("/mnt/out")
for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
result = json.load(f)
total = total + result["bar"]
with open("/mnt/out/total.json" , "w") as f:
json.dump({"total": total}, f)
outputs:
artifacts:
- name: total
path: /mnt/out/total.json
archive:
none: { }
s3:
key: "{{workflow.name}}/total.json"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment