Last active
March 18, 2024 07:10
-
-
Save AseiSugiyama/5246f446e038bcd90651da979930e1da to your computer and use it in GitHub Desktop.
fan-in-fan-out-pipeline.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "nbformat": 4, | |
| "nbformat_minor": 0, | |
| "metadata": { | |
| "colab": { | |
| "provenance": [], | |
| "authorship_tag": "ABX9TyNKQwE56Z/C+IewwEXm6rKy", | |
| "include_colab_link": true | |
| }, | |
| "kernelspec": { | |
| "name": "python3", | |
| "display_name": "Python 3" | |
| }, | |
| "language_info": { | |
| "name": "python" | |
| } | |
| }, | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "view-in-github", | |
| "colab_type": "text" | |
| }, | |
| "source": [ | |
| "<a href=\"https://colab.research.google.com/gist/AseiSugiyama/5246f446e038bcd90651da979930e1da/fan-in-fan-out-pipeline.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "source": [ | |
| "# Fan-in, fan-out example pipeline\n", | |
| "\n", | |
| "This notebook porvides simple example of fan-in and fan-out pipeline of KFP v2. For fan-out, it uses [`kfp.dsl.ParallelFor`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/dsl.html#kfp.dsl.ParallelFor). For fan-in, it uses [`kfp.dsl.Collected`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/dsl.html#kfp.dsl.Collected).\n", | |
| "\n", | |
| "## Setup" | |
| ], | |
| "metadata": { | |
| "id": "wg1PscHFQ-1T" | |
| } | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "!pip install kfp" | |
| ], | |
| "metadata": { | |
| "id": "3gDPLLSETTlH" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "# Restart kernel. (Colab only)\n", | |
| "import IPython\n", | |
| "\n", | |
| "app = IPython.Application.instance()\n", | |
| "app.kernel.do_shutdown(True)" | |
| ], | |
| "metadata": { | |
| "id": "4b9f4bkmTgxh" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "source": [ | |
| "## Define and compile pipeline" | |
| ], | |
| "metadata": { | |
| "id": "T1WMGFC2YnWc" | |
| } | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "from kfp import dsl\n", | |
| "from kfp.dsl import Model, Metrics" | |
| ], | |
| "metadata": { | |
| "id": "zzJtC3BSTpoB" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "id": "bYSLcomzQ3kl" | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "@dsl.component(base_image=\"python:3.10\")\n", | |
| "def id_func(input: int) -> Metrics:\n", | |
| " from pathlib import Path\n", | |
| " metric = Metrics(\n", | |
| " uri=dsl.get_uri(),\n", | |
| " metadata={'value':input}\n", | |
| " )\n", | |
| " Path(metric.path).write_text(str(input))\n", | |
| " return metric\n", | |
| "\n", | |
| "@dsl.component(base_image=\"python:3.10\")\n", | |
| "def calc_average(models: list[Metrics]) -> float:\n", | |
| " return sum([float(model.metadata['value']) for model in models]) / len(models)\n", | |
| "\n", | |
| "@dsl.pipeline\n", | |
| "def fanin_pipeline():\n", | |
| " with dsl.ParallelFor(\n", | |
| " items=[i for i in range(10)],\n", | |
| " ) as item:\n", | |
| " id_func_task = id_func(input=item)\n", | |
| "\n", | |
| " calc_average(models=dsl.Collected(id_func_task.output))\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "from kfp import compiler\n", | |
| "compiler.Compiler().compile(\n", | |
| " pipeline_func=fanin_pipeline,\n", | |
| " package_path=\"fanin_pipeline.yaml\"\n", | |
| ")" | |
| ], | |
| "metadata": { | |
| "id": "HACYSMyCYvII" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "source": [ | |
| "## Run the pipeline\n", | |
| "\n", | |
| "We can run this pipeline in Vertex Pipelines. The result of this pipeline will be following image.\n", | |
| "\n", | |
| "Note: To run this pipeline, it is required to create GCS bucket to store artifacts of this pipeline.\n", | |
| "\n", | |
| "" | |
| ], | |
| "metadata": { | |
| "id": "FvwSkeEDXwgE" | |
| } | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment