Building Python function-based components
Building your own lightweight pipelines components using Python
Run in Google Colab View source on GitHub
A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:
The component code, which implements the logic needed to perform a step in your ML workflow.
A component specification, which defines the following:
- The component’s metadata, its name and description.
- The component’s interface, the component’s inputs and outputs.
- The component’s implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component’s outputs.
Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you. This document describes how to build Python function-based components and use them in your pipeline.
Before you begin
- Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter notebook, restart the kernel after installing the SDK.
$ pip3 install kfp --upgrade
- Import the
kfp
andkfp.components
packages.
import kfp
import kfp.components as comp
- Create an instance of the
kfp.Client
class. To find your Kubeflow Pipelines cluster’s hostname, open the Kubeflow Pipelines user interface in your browser. The URL of the Kubeflow Pipelines user interface is something likehttps://my-cluster.my-organization.com/pipelines
. In this case, the hostname ismy-cluster.my-organization.com
.
# If you run this command on a Jupyter notebook running on Kubeflow, you can
# exclude the host parameter.
# client = kfp.Client()
client = kfp.Client(host='<your-kubeflow-pipelines-host-name>')
For more information about the Kubeflow Pipelines SDK, see the SDK reference guide.
Getting started with Python function-based components
This section demonstrates how to get started building Python function-based components by walking through the process of creating a simple component.
- Define your component’s code as a standalone python function. In this example, the function adds two floats and returns the sum of the two arguments.
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
- Use
kfp.components.create_component_from_func
to generate the component specification YAML and return a factory function that you can use to createkfp.dsl.ContainerOp
class instances for your pipeline. The component specification YAML is a reusable and shareable definition of your component.
add_op = comp.create_component_from_func(
add, output_component_file='add_component.yaml')
- Create and run your pipeline. Learn more about creating and running pipelines.
import kfp.dsl as dsl
@dsl.pipeline(
name='Addition pipeline',
description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
a='1',
b='7',
):
# Passes a pipeline parameter and a constant value to the `add_op` factory
# function.
first_add_task = add_op(a, 4)
# Passes an output reference from `first_add_task` and a pipeline parameter
# to the `add_op` factory function. For operations with a single return
# value, the output reference can be accessed as `task.output` or
# `task.outputs['output_name']`.
second_add_task = add_op(first_add_task.output, b)
# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)
Building Python function-based components
Use the following instructions to build a Python function-based component:
Define a standalone Python function. This function must meet the following requirements:
- It should not use any code declared outside of the function definition.
- Import statements must be added inside the function. Learn more about using and installing Python packages in your component.
- Helper functions must be defined inside this function.
Kubeflow Pipelines uses your function’s inputs and outputs to define your component’s interface. Learn more about passing data between components. Your function’s inputs and outputs must meet the following requirements:
- If the function accepts or returns large amounts of data or complex data types, you must pass that data as a file. Learn more about using large amounts of data as inputs or outputs.
- If the function accepts numeric values as parameters, the parameters must have type hints. Supported types are
int
andfloat
. Otherwise, parameters are passed as strings. - If your component returns multiple small outputs (short strings, numbers, or booleans), annotate your function with the
typing.NamedTuple
type hint and use thecollections.namedtuple
function return your function’s outputs as a new subclass of tuple. For an example, read Passing parameters by value.
(Optional.) If your function has complex dependencies, choose or build a container image for your Python function to run in. Learn more about selecting or building your component’s container image.
Call
kfp.components.create_component_from_func(func)
to convert your function into a pipeline component.- func: The Python function to convert.
- base_image: (Optional.) Specify the Docker container image to run this function in. Learn more about selecting or building a container image.
- output_component_file: (Optional.) Writes your component definition to a file. You can use this file to share the component with colleagues or reuse it in different pipelines.
- packages_to_install: (Optional.) A list of versioned Python packages to install before running your function.
Using and installing Python packages
When Kubeflow Pipelines runs your pipeline, each component runs within a Docker container image on a Kubernetes Pod. To load the packages that your Python function depends on, one of the following must be true:
- The package must be installed on the container image.
- The package must be defined using the
packages_to_install
parameter of thekfp.components.create_component_from_func(func)
function. - Your function must install the package. For example, your function can use the
subprocess
module to run a command likepip install
that installs a package.
Selecting or building a container image
Currently, if you do not specify a container image, your Python-function based component uses the python:3.7
container image. If your function has complex dependencies, you may benefit from using a container image that has your dependencies preinstalled, or building a custom container image. Preinstalling your dependencies reduces the amount of time that your component runs in, since your component does not need to download and install packages each time it runs.
Many frameworks, such as TensorFlow and PyTorch, and cloud service providers offer prebuilt container images that have common dependencies installed.
If a prebuilt container is not available, you can build a custom container image with your Python function’s dependencies. For more information about building a custom container, read the Dockerfile reference guide in the Docker documentation.
If you build or select a container image, instead of using the default container image, the container image must use Python 3.5 or later.
Understanding how data is passed between components
When Kubeflow Pipelines runs your component, a container image is started in a Kubernetes Pod and your component’s inputs are passed in as command-line arguments. When your component has finished, the component’s outputs are returned as files.
Python function-based components make it easier to build pipeline components by building the component specification for you. Python function-based components also handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.
The following sections describe how to pass parameters by value and by file.
- Parameters that are passed by value include numbers, booleans, and short strings. Kubeflow Pipelines passes parameters to your component by value, by passing the values as command-line arguments.
- Parameters that are passed by file include CSV, images, and complex types. These files are stored in a location that is accessible to your component running on Kubernetes, such as a persistent volume claim or a cloud storage service. Kubeflow Pipelines passes parameters to your component by file, by passing their paths as a command-line argument.
Input and output parameter names
When you use the Kubeflow Pipelines SDK to convert your Python function to a pipeline component, the Kubeflow Pipelines SDK uses the function’s interface to define the interface of your component in the following ways.
- Some arguments define input parameters.
- Some arguments define output parameters.
- The function’s return value is used as an output parameter. If the return value is a
collections.namedtuple
, the named tuple is used to return several small values.
Since you can pass parameters between components as a value or as a path, the Kubeflow Pipelines SDK removes common parameter suffixes that leak the component’s expected implementation. For example, a Python function-based component that ingests data and outputs CSV data may have an output argument that is defined as csv_path: comp.OutputPath(str)
. In this case, the output is the CSV data, not the path. So, the Kubeflow Pipelines SDK simplifies the output name to csv
.
The Kubeflow Pipelines SDK uses the following rules to define the input and output parameter names in your component’s interface:
- If the argument name ends with
_path
and the argument is annotated as ankfp.components.InputPath
orkfp.components.OutputPath
, the parameter name is the argument name with the trailing_path
removed. - If the argument name ends with
_file
, the parameter name is the argument name with the trailing_file
removed. - If you return a single small value from your component using the
return
statement, the output parameter is namedoutput
. - If you return several small values from your component by returning a
collections.namedtuple
, the Kubeflow Pipelines SDK uses the tuple’s field names as the output parameter names.
Otherwise, the Kubeflow Pipelines SDK uses the argument name as the parameter name.
Passing parameters by value
Python function-based components make it easier to pass parameters between components by value (such as numbers, booleans, and short strings), by letting you define your component’s interface by annotating your Python function. The supported types are int
, float
, bool
, and str
. You can also pass list
or dict
instances by value, if they contain small values, such as int
, float
, bool
, or str
values. If you do not annotate your function, these input parameters are passed as strings.
If your component returns multiple outputs by value, annotate your function with the typing.NamedTuple
type hint and use the collections.namedtuple
function to return your function’s outputs as a new subclass of tuple
.
You can also return metadata and metrics from your function.
- Metadata helps you visualize pipeline results. Learn more about visualizing pipeline metadata.
- Metrics help you compare pipeline runs. Learn more about using pipeline metrics.
The following example demonstrates how to return multiple outputs by value, including component metadata and metrics.
from typing import NamedTuple
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
'ExampleOutputs',
[
('sum', float),
('product', float),
('mlpipeline_ui_metadata', 'UI_metadata'),
('mlpipeline_metrics', 'Metrics')
]):
"""Example function that demonstrates how to return multiple values."""
sum_value = a + b
product_value = a * b
# Export a sample tensorboard
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Export two metrics
metrics = {
'metrics': [
{
'name': 'sum',
'numberValue': float(sum_value),
},{
'name': 'product',
'numberValue': float(product_value),
}
]
}
from collections import namedtuple
example_output = namedtuple(
'ExampleOutputs',
['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
return example_output(sum_value, product_value, metadata, metrics)
Passing parameters by file
Python function-based components make it easier to pass files to your component, or to return files from your component, by letting you annotate your Python function’s parameters to specify which parameters refer to a file. Your Python function’s parameters can refer to either input or output files. If your parameter is an output file, Kubeflow Pipelines passes your function a path or stream that you can use to store your output file.
The following example accepts a file as an input and returns two files as outputs.
def split_text_lines(
source_path: comp.InputPath(str),
odd_lines_path: comp.OutputPath(str),
even_lines_path: comp.OutputPath(str)):
"""Splits a text file into two files, with even lines going to one file
and odd lines to the other."""
with open(source_path, 'r') as reader:
with open(odd_lines_path, 'w') as odd_writer:
with open(even_lines_path, 'w') as even_writer:
while True:
line = reader.readline()
if line == "":
break
odd_writer.write(line)
line = reader.readline()
if line == "":
break
even_writer.write(line)
In this example, the inputs and outputs are defined as parameters of the split_text_lines
function. This lets Kubeflow Pipelines pass the path to the source data file and the paths to the output data files into the function.
To accept a file as an input parameter, use one of the following type annotations:
kfp.components.InputBinaryFile
: Use this annotation to specify that your function expects a parameter to be anio.BytesIO
instance that this function can read.kfp.components.InputPath
: Use this annotation to specify that your function expects a parameter to be the path to the input file as astring
.kfp.components.InputTextFile
: Use this annotation to specify that your function expects a parameter to be anio.TextIOWrapper
instance that this function can read.
To return a file as an output, use one of the following type annotations:
kfp.components.OutputBinaryFile
: Use this annotation to specify that your function expects a parameter to be anio.BytesIO
instance that this function can write to.kfp.components.OutputPath
: Use this annotation to specify that your function expects a parameter to be the path to store the output file at as astring
.kfp.components.OutputTextFile
: Use this annotation to specify that your function expects a parameter to be anio.TextIOWrapper
that this function can write to.
Example Python function-based component
This section demonstrates how to build a Python function-based component that uses imports, helper functions, and produces multiple outputs.
- Define your function. This example function uses the
numpy
package to calculate the quotient and remainder for a given dividend and divisor in a helper function. In addition to the quotient and remainder, the function also returns metadata for visualization and two metrics.
from typing import NamedTuple
def my_divmod(
dividend: float,
divisor: float) -> NamedTuple(
'MyDivmodOutput',
[
('quotient', float),
('remainder', float),
('mlpipeline_ui_metadata', 'UI_metadata'),
('mlpipeline_metrics', 'Metrics')
]):
'''Divides two numbers and calculate the quotient and remainder'''
# Import the numpy package inside the component function
import numpy as np
# Define a helper function
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
from tensorflow.python.lib.io import file_io
import json
# Export a sample tensorboard
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Export two metrics
metrics = {
'metrics': [{
'name': 'quotient',
'numberValue': float(quotient),
},{
'name': 'remainder',
'numberValue': float(remainder),
}]}
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput',
['quotient', 'remainder', 'mlpipeline_ui_metadata',
'mlpipeline_metrics'])
return divmod_output(quotient, remainder, json.dumps(metadata),
json.dumps(metrics))
- Test your function by running it directly, or with unit tests.
my_divmod(100, 7)
This should return a result like the following:
MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
Use
kfp.components.create_component_from_func
to return a factory function that you can use to createkfp.dsl.ContainerOp
class instances for your pipeline. This example also specifies the base container image to run this function in.
divmod_op = comp.create_component_from_func(
my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
- Define your pipeline. This example uses the
divmod_op
factory function and theadd_op
factory function from an earlier example.
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
a='1',
b='7',
c='17',
):
# Passes a pipeline parameter and a constant value as operation arguments.
add_task = add_op(a, 4) # The add_op factory function returns
# a dsl.ContainerOp class instance.
# Passes the output of the add_task and a pipeline parameter as operation
# arguments. For an operation with a single return value, the output
# reference is accessed using `task.output` or
# `task.outputs['output_name']`.
divmod_task = divmod_op(add_task.output, b)
# For an operation with multiple return values, output references are
# accessed as `task.outputs['output_name']`.
result_task = add_op(divmod_task.outputs['quotient'], c)
- Create and run your pipeline. Learn more about creating and running pipelines.
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
Run in Google Colab View source on GitHub
Last modified 20.04.2021: Apply Docs Restructure to `v1.2-branch` = update `v1.2-branch` to current `master` v2 (#2612) (4e2602bd)