Build a Pipeline

A tutorial on using Pipelines SDK v2 to orchestrate your ML workflow as a pipeline

Run in Google Colab View source on GitHub

A Kubeflow pipeline is a portable and scalable definition of a machine learning (ML) workflow. Each step in your ML workflow, such as preparing data or training a model, is an instance of a pipeline component. This document provides an overview of pipeline concepts and best practices, and instructions describing how to build an ML pipeline.

Note: This guide demonstrates how to build pipelines using the Pipelines SDK v2. Currently, Kubeflow Pipelines v2 is in development. You can use this guide to start building and running pipelines that are compatible with the Pipelines SDK v2.

Learn more about Pipelines SDK v2.

Before you begin

  1. Run the following command to install the Kubeflow Pipelines SDK v1.6.2 or higher. If you run this command in a Jupyter notebook, restart the kernel after installing the SDK.
  1. $ pip install --upgrade kfp
  1. Import the kfp packages.
  1. import kfp
  2. from kfp.v2 import dsl
  3. from kfp.v2.dsl import component
  4. from kfp.v2.dsl import (
  5. Input,
  6. Output,
  7. Artifact,
  8. Dataset,
  9. )

Understanding pipelines

A Kubeflow pipeline is a portable and scalable definition of an ML workflow, based on containers. A pipeline is composed of a set of input parameters and a list of the steps in this workflow. Each step in a pipeline is an instance of a component, which is represented as an instance of ContainerOp.

You can use pipelines to:

  • Orchestrate repeatable ML workflows.
  • Accelerate experimentation by running a workflow with different sets of hyperparameters.

Understanding pipeline components

A pipeline component is a containerized application that performs one step in a pipeline’s workflow. Pipeline components are defined in component specifications, which define the following:

  • The component’s interface, its inputs and outputs.
  • The component’s implementation, the container image and the command to execute.
  • The component’s metadata, such as the name and description of the component.

You can build components by defining a component specification for a containerized application, or you can use the Kubeflow Pipelines SDK to generate a component specification for a Python function. You can also reuse prebuilt components in your pipeline.

Understanding the pipeline graph

Each step in your pipeline’s workflow is an instance of a component. When you define your pipeline, you specify the source of each step’s inputs. Step inputs can be set from the pipeline’s input arguments, constants, or step inputs can depend on the outputs of other steps in this pipeline. Kubeflow Pipelines uses these dependencies to define your pipeline’s workflow as a graph.

For example, consider a pipeline with the following steps: ingest data, generate statistics, preprocess data, and train a model. The following describes the data dependencies between each step.

  • Ingest data: This step loads data from an external source which is specified using a pipeline argument, and it outputs a dataset. Since this step does not depend on the output of any other steps, this step can run first.
  • Generate statistics: This step uses the ingested dataset to generate and output a set of statistics. Since this step depends on the dataset produced by the ingest data step, it must run after the ingest data step.
  • Preprocess data: This step preprocesses the ingested dataset and transforms the data into a preprocessed dataset. Since this step depends on the dataset produced by the ingest data step, it must run after the ingest data step.
  • Train a model: This step trains a model using the preprocessed dataset, the generated statistics, and pipeline parameters, such as the learning rate. Since this step depends on the preprocessed data and the generated statistics, it must run after both the preprocess data and generate statistics steps are complete.

Since the generate statistics and preprocess data steps both depend on the ingested data, the generate statistics and preprocess data steps can run in parallel. All other steps are executed once their data dependencies are available.

Designing your pipeline

When designing your pipeline, think about how to split your ML workflow into pipeline components. The process of splitting an ML workflow into pipeline components is similar to the process of splitting a monolithic script into testable functions. The following rules can help you define the components that you need to build your pipeline.

  • Components should have a single responsibility. Having a single responsibility makes it easier to test and reuse a component. For example, if you have a component that loads data you can reuse that for similar tasks that load data. If you have a component that loads and transforms a dataset, the component can be less useful since you can use it only when you need to load and transform that dataset.

  • Reuse components when possible. Kubeflow Pipelines provides components for common pipeline tasks and for access to cloud services.

    Note: Not all prebuilt components are compatible with Pipelines SDK v2. For example, you might need to update the type hints for component inputs and outputs.

  • Consider what you need to know to debug your pipeline and research the lineage of the models that your pipeline produces. Kubeflow Pipelines stores the inputs and outputs of each pipeline step. By interrogating the artifacts produced by a pipeline run, you can better understand the variations in model quality between runs or track down bugs in your workflow.

In general, you should design your components with composability in mind.

Pipelines are composed of component instances, also called steps. Steps can define their inputs as depending on the output of another step. The dependencies between steps define the pipeline workflow graph.

Building pipeline components

Kubeflow pipeline components are containerized applications that perform a step in your ML workflow. Here are the ways that you can define pipeline components:

  • If you have a containerized application that you want to use as a pipeline component, create a component specification to define this container image as a pipeline component.

    This option provides the flexibility to include code written in any language in your pipeline, so long as you can package the application as a container image. Learn more about building pipeline components.

  • If your component code can be expressed as a Python function, evaluate if your component can be built as a Python function-based component. The Kubeflow Pipelines SDK makes it easier to build lightweight Python function-based components by saving you the effort of creating a component specification.

Whenever possible, reuse prebuilt components to save yourself the effort of building custom components.

The example in this guide demonstrates how to build a pipeline that uses a Python function-based component and reuses a prebuilt component.

Understanding how data is passed between components

When Kubeflow Pipelines runs a component, a container image is started in a Kubernetes Pod and your component’s inputs are passed in as command-line arguments. When your component has finished, the component’s outputs are returned as files.

In your component’s specification, you define the components inputs and outputs and how the inputs and output paths are passed to your program as command-line arguments.

Component inputs and outputs are classified as either parameters or artifacts, depending on their data type.

  • Parameters typically represent settings that affect the behavior of your pipeline. Parameters are passed into your component by value, and can be of any of the following types: int, double, float, or str. Since parameters are passed by value, the quantity of data passed in a parameter must be appropriate to pass as a command-line argument.

  • Artifacts represent large or complex data structures like datasets or models, and are passed into components as a reference to a file path.

    If you have large amounts of string data to pass to your component, such as a JSON file, annotate that input or output as a type of Artifact, such as Dataset, to let Kubeflow Pipelines know to pass this to your component as a file.

    In addition to the artifact’s data, you can also read and write the artifact’s metadata. For output artifacts, you can record metadata as key-value pairs, such as the accuracy of a trained model. For input artifacts, you can read the artifact’s metadata — for example, you could use metadata to decide if a model is accurate enough to deploy for predictions.

All outputs are returned as files, using the the paths that Kubeflow Pipelines provides.

Python function-based components make it easier to build pipeline components by building the component specification for you. Python function-based components also handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.

Learn more about how Python function-based components handle inputs and outputs.

Getting started building a pipeline

The following sections demonstrate how to get started building a Kubeflow pipeline by walking through the process of converting a Python script into a pipeline.

Design your pipeline

The following steps walk through some of the design decisions you may face when designing a pipeline.

  1. Evaluate the process. In the following example, a Python function downloads a zipped tar file (.tar.gz) that contains several CSV files, from a public website. The function extracts the CSV files and then merges them into a single file.
  1. import glob
  2. import pandas as pd
  3. import tarfile
  4. import urllib.request
  5. def download_and_merge_csv(url: str, output_csv: str):
  6. with urllib.request.urlopen(url) as res:
  7. tarfile.open(fileobj=res, mode="r|gz").extractall('data')
  8. df = pd.concat(
  9. [pd.read_csv(csv_file, header=None)
  10. for csv_file in glob.glob('data/*.csv')])
  11. df.to_csv(output_csv, index=False, header=False)
  1. Run the following Python command to test the function.
  1. download_and_merge_csv(
  2. url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz',
  3. output_csv='merged_data.csv')
  1. Run the following to print the first few rows of the merged CSV file.
  1. $ head merged_data.csv
  1. Design your pipeline. For example, consider the following pipeline designs.

    • Implement the pipeline using a single step. In this case, the pipeline contains one component that works similarly to the example function. This is a straightforward function, and implementing a single-step pipeline is a reasonable approach in this case.

      The down side of this approach is that the zipped tar file would not be an artifact of your pipeline runs. Not having this artifact available could make it harder to debug this component in production.

    • Implement this as a two-step pipeline. The first step downloads a file from a website. The second step extracts the CSV files from a zipped tar file and merges them into a single file.

      This approach has a few benefits:

      • You can reuse the Web Download component to implement the first step.
      • Each step has a single responsibility, which makes the components easier to reuse.
      • The zipped tar file is an artifact of the first pipeline step. This means that you can examine this artifact when debugging pipelines that use this component.

    This example implements a two-step pipeline.

Build your pipeline components

Build your pipeline components. This example modifies the initial script to extract the contents of a zipped tar file, merge the CSV files that were contained in the zipped tar file, and return the merged CSV file.

This example builds a Python function-based component. You can also package your component’s code as a Docker container image and define the component using a ComponentSpec.

In this case, the following modifications were required to the original function.

  • The file download logic was removed. The path to the zipped tar file is passed to this function as the tar_data argument.

  • The import statements were moved inside of the function. Python function-based components require standalone Python functions. This means that any required import statements must be defined within the function, and any helper functions must be defined within the function.

  • The function’s arguments are annotated as an kfp.dsl.Input or kfp.dsl.Output artifact. These annotations let Kubeflow Pipelines know to provide the path to the zipped tar file and to create a path where your function stores the merged CSV file.

  • The function is decorated with the kfp.dsl.component annotation. You can also use this annotation to define the following:

    • The container image that your function runs in.
    • Any PyPI packages that this function depends on, that are not already installed on the container image.
    • The location to save the component specification to. You can use the component specification to share this component with your colleagues.

    This annotation converts your function into a factory function that creates pipeline steps. These pipeline steps execute the function you defined as a part of a pipeline’s workflow.

Learn more about building Python function-based components.

The following example shows the updated merge_csv function.

  1. @component(
  2. packages_to_install=['pandas==1.1.4'],
  3. output_component_file='component.yaml'
  4. )
  5. def merge_csv(tar_data: Input[Artifact], output_csv: Output[Dataset]):
  6. import glob
  7. import pandas as pd
  8. import tarfile
  9. tarfile.open(name=tar_data.path, mode="r|gz").extractall('data')
  10. df = pd.concat(
  11. [pd.read_csv(csv_file, header=None)
  12. for csv_file in glob.glob('data/*.csv')])
  13. df.to_csv(output_csv.path, index=False, header=False)

Build your pipeline

  1. Use kfp.components.load_component_from_url to load the component specification YAML for any components that you are reusing in this pipeline.
  1. web_downloader_op = kfp.components.load_component_from_url(
  2. 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component-sdk-v2.yaml')
  1. Define your pipeline as a Python function.

    Your pipeline function’s arguments define your pipeline’s parameters. Use pipeline parameters to experiment with different hyperparameters, such as the learning rate used to train a model, or pass run-level inputs, such as the path to an input file, into a pipeline run. The data type must be specified for all pipeline parameters.

    Use the factory functions created by the kfp.dsl.component annotation and the kfp.components.load_component_from_url function to create your pipeline’s tasks. The inputs to the component factory functions can be pipeline parameters, the outputs of other tasks, or a constant value. In this case, the web_downloader_task task uses the url pipeline parameter, and the merge_csv_task uses the data output of the web_downloader_task.

    The kfp.dsl.pipeline annotation lets you specify the following:

    • name: The pipeline’s name.
    • description: (Optional.) A description of the pipeline’s workflow.
    • pipeline_root: The default path where your pipeline’s artifacts are stored. This must be a path that your pipeline can read and write to, such as a Persistent Volume Claim or a cloud service such as Google Cloud Storage.
  1. # Define a pipeline and create a task from a component:
  2. @dsl.pipeline(
  3. name='my-pipeline',
  4. # You can optionally specify your own pipeline_root
  5. # pipeline_root='gs://my-pipeline-root/example-pipeline',
  6. )
  7. def my_pipeline(url: str):
  8. web_downloader_task = web_downloader_op(url=url)
  9. merge_csv_task = merge_csv(tar_data=web_downloader_task.outputs['data'])
  10. # The outputs of the merge_csv_task can be referenced using the
  11. # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

Compile and run your pipeline

After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.

Option 1: Compile and then upload in UI

  1. Run the following to compile your pipeline and save it as pipeline.yaml.
  1. kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
  2. pipeline_func=my_pipeline,
  3. package_path='pipeline.yaml')
  1. Upload and run your pipeline.yaml using the Kubeflow Pipelines user interface. See the guide to getting started with the UI.

Option 2: run the pipeline using Kubeflow Pipelines SDK client

  1. Create an instance of the kfp.Client class following steps in connecting to Kubeflow Pipelines using the SDK client.
  1. client = kfp.Client() # change arguments accordingly
  1. Run the pipeline using the kfp.Client instance:
  1. client.create_run_from_pipeline_func(
  2. my_pipeline,
  3. mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
  4. # You can optionally override your pipeline_root when submitting the run too:
  5. # pipeline_root='gs://my-pipeline-root/example-pipeline',
  6. arguments={
  7. 'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
  8. })

Next steps

Run in Google Colab View source on GitHub

Last modified 27.05.2021: doc(kfp): connecting KFP SDK client to API generic introduction (#2729) (4636ab5d)