- Build Reusable Components
Build Reusable Components
A detailed tutorial on creating components that you can use in various pipelines
This page describes how to author a reusable component that you canload and run in Kubeflow Pipelines. A reusable component is a pre-implementedstandalone component that is easy to add as a step in any pipeline.
If you’re new topipelines, see the conceptual guides to pipelinesand components.
Summary
Below is a summary of the steps involved in creating and using a component:
- Write the program that contains your component’s logic. The program mustuse specific methods to pass data to and from the component.
- Containerize the program.
- Write a component specification in YAML format that describes thecomponent for the Kubeflow Pipelines system.
- Use the Kubeflow Pipelines SDK to load and run the component in yourpipeline. The rest of this page gives some explanation about input and output data,followed by detailed descriptions of the above steps.
Passing the data to and from the containerized program
When planning to write a component you need to think about how the componentcommunicates with upstream and downstream components. That is, how it consumesinput data and produces output data.
Summary
For small pieces of data (smaller than 512 kibibyte (KiB)):
- Inputs: Read the value from a command-line argument.
- Outputs: Write the value to a local file, using a path provided as acommand-line argument.
For bigger pieces of data (larger than 512 KiB) or for a storage-specificcomponent:
- Inputs: Read the data URI from a file provided as a command-line argument.Then read the data from that URI.
- Outputs: Upload the data to the URI provided as a command-line argument. Thenwrite that URI to a local file, using a path provided as a command-lineargument.
More about input data
There are several ways to make input data available to a program running insidea container:
- Small pieces of data (smaller than 512 kibibyte (KiB)): Pass the datacontent as a command-line argument:
program.py --param 100
- Bigger data (larger than 512 KiB): Kubeflow Pipelines doesn’t provide away of transferring larger pieces of data to the container running theprogram. Instead, the program (or the wrapper script) should receive dataURIs instead of the data itself and then access the data from the URIs. Forexample:
program.py --train-uri [https://server.edu/datasets/1/train.tsv](https://server.edu/datasets/1/train.tsv) \
--eval-uri [https://server.edu/datasets/1/eval.tsv](https://server.edu/datasets/1/train.tsv)
program.py --train-gcs-uri gs://bucket/datasets/1/train.tsv
program.py --big-query-table my_table
More about output data
The program must write the output data to some location and inform the systemabout that location so that the system can pass the data between steps.You should provide the paths to your output data as command-line arguments.That is, you should not hardcode the paths.
You can choose a suitable storage solution for your output data. Options includethe following:
- Google Cloud Storage is therecommended default storage solution for writing output.
- For structured data you can useBigQuery.You must provide the specific URI/path or table name to which to write theresults.
The program should do the following:
- Upload the data to your chosen storage system.
- Pass out a URI pointing to the data, by writing that URI to a file andinstructing the system to pick it up and treat it as the value of a particularcomponent output.
Note that the example below accepts both a URI for uploading the data into, anda file path to write that URI to.
program.py --out-model-uri gs://bucket/163/output_model \
--out-model-uri-file /outputs/output_model_uri/data
Why should the program output the URI it has just received as an input argument?The reason is that the URIs specified in the pipeline are usually not the realURIs, but rather URI templates containing UIDs. The system resolves the URIs atruntime when the containerized program starts. Only the containerized programsees the fully-resolved URI.
Below is an example of such a URI:
gs://my-bucket/{{workflow.uid}}/{{pod.id}}/data
In cases where the program cannot control the URI/ID of the created object (forexample, where the URI is generated by the outside system), the program shouldjust accept the file path to write the resulting URI/ID:
program.py --out-model-uri-file /outputs/output_model_uri/data
Future-proofing your code
The following guidelines help you avoid the need to modify the program code inthe near future or have different versions for different storage systems.
If the program has access to the TensorFlow package, you can usetf.gfile
to read and write files. The tf.gfile
module supports both local and CloudStorage paths.
If you cannot use tf.gfile
, a solution is to read inputs from and writeoutputs to local files, then add a storage-specific wrapper that downloads anduploads the data from/to a specific storage solution such asCloud Storage orAmazon S3.For example, create a wrapper script that uses the gsutil
cp
command todownload the input data before running the main program and to upload the outputdata after the program finishes.
Writing the program code
This section describes an example program that has two inputs (for small andlarge pieces of data) and one output. The programming language in this exampleis Python 3.
program.py
#!/usr/bin/env python3
import argparse
import os
from pathlib import Path
from tensorflow import gfile # Supports both local paths and Cloud Storage (GCS) or S3
# Function doing the actual work
def do_work(input1_file, output1_file, param1):
for x in range(param1):
line = next(input1_file)
if not line:
break
_ = output1_file.write(line)
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
parser.add_argument('--input1-path', type=str, help='Path of the local file or GCS blob containing the Input 1 data.')
parser.add_argument('--param1', type=int, default=100, help='Parameter 1.')
parser.add_argument('--output1-path', type=str, help='Path of the local file or GCS blob where the Output 1 data should be written.')
parser.add_argument('--output1-path-file', type=str, help='Path of the local file where the Output 1 URI data should be written.')
args = parser.parse_args()
gfile.MakeDirs(os.path.dirname(args.output1_path))
# Opening the input/output files and performing the actual work
with gfile.Open(args.input1_path, 'r') as input1_file, gfile.Open(args.output1_path, 'w') as output1_file:
do_work(input1_file, output1_file, args.param1)
# Writing args.output1_path to a file so that it will be passed to downstream tasks
Path(args.output1_path_file).parent.mkdir(parents=True, exist_ok=True)
Path(args.output1_path_file).write_text(args.output1_path)
The command line invocation of this program is:
python3 program.py --input1-path <URI to Input 1 data> \
--param1 <value of Param1 input> --output1-path <URI for Output 1 data> \
--output1-path-file <local file path for the Output 1 URI>
You need to pass the URI for Output 1 data
forward so that the downstreamsteps can access the UI. The program write the URI to a local file and tells thesystem to grab it and expose it as an output. You should avoid hard-coding anypaths, so the program receives the path to the local file path through the—output1-path-file
command-line argument.
Writing a Dockerfile to containerize your application
You need a Docker container image thatpackages your program.
The instructions on creating container images are not specific to KubeflowPipelines. To make things easier for you, this section provides some guidelineson standard container creation. You can use any procedureof your choice to create the Docker containers.
Your Dockerfile mustcontain all program code, including the wrapper, and the dependencies (operatingsystem packages, Python packages etc).
Ensure you have write access to a container registry where you can pushthe container image. Examples includeGoogle Container Registryand Docker Hub.
Think of a name for your container image. This guide uses the name`gcr.io/my-org/my-image’.
Example Dockerfile
ARG BASE_IMAGE_TAG=1.12.0-py3
FROM tensorflow/tensorflow:$BASE_IMAGE_TAG
RUN python3 -m pip install keras
COPY ./src /pipelines/component/src
Create a build_image.sh
script (see example below) to build the containerimage based on the Dockerfile and push the container image to some containerrepository.
Run the build_image.sh
script to build the container image based on the Dockerfileand push it to your chosen container repository.
Best practice: After pushing the image, get the strict image name with digest,and use the strict image name for reproducibility.
Example build_image.sh:
#!/bin/bash -e
image_name=gcr.io/my-org/my-image # Specify the image name here
image_tag=latest
full_image_name=${image_name}:${image_tag}
base_image_tag=1.12.0-py3
cd "$(dirname "$0")"
docker build --build-arg BASE_IMAGE_TAG=${base_image_tag} -t "${full_image_name}" .
docker push "$full_image_name"
# Output the strict image name (which contains the sha256 image digest)
docker inspect --format="{{index .RepoDigests 0}}" "${IMAGE_NAME}"
Make your script executable:
chmod +x build_image.sh
Writing your component definition file
You need a component specification in YAML format that describes thecomponent for the Kubeflow Pipelines system.
For the complete definition of a Kubeflow Pipelines component, see thecomponent specification.However, for this tutorial you don’t need to know the full schema of thecomponent specification. The tutorial provides enough information for therelevant the components.
Start writing the component definition (component.yaml
) by specifying yourcontainer image in the component’s implementation section:
implementation:
container:
image: gcr.io/my-org/my-image@sha256:a172..752f # Name of a container image that you've pushed to a container repo.
Complete the component’s implementation section based on your (wrapper) program:
implementation:
container:
image: gcr.io/my-org/my-image@sha256:a172..752f
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
command: [
python3, /kfp/component/src/program.py, # Path of the program inside the container
--input1-path, <URI to Input 1 data>,
--param1, <value of Param1 input>,
--output1-path, <URI template for Output 1 data>,
--output1-path-file, <local file path for the Output 1 URI>,
]
The command
section still contains some dummy placeholders (in anglebrackets). Let’s replace them with real placeholders. A placeholder representsa command-line argument that is replaced with some value or path before theprogram is executed. In component.yaml
, you specify the placeholders usingYAML’s mapping syntax to distinguish them from the verbatim strings. There arethree placeholders available:
{inputValue: Some input name}
This placeholder is replaced by the value of the argument to thespecified input. This is useful for small pieces of input data.{outputPath: Some output name}
This placeholder is replaced by the auto-generated local path where theprogram should write its output data. This instructs the system to read thecontent of the file and store it as the value of the specified output.
As well as putting real placeholders in the command line, you need to addcorresponding input and output specifications to the inputs and outputssections. The input/output specification contains the input name, type,description and default value. Only the name is required. The input and outputnames are free-form strings, but be careful with the YAML syntax and use quotesif necessary. The input/output names do not need to be the same as thecommand-line flags which are usually quite short.
Replace the placeholders as follows:
- Replace
<URI to Input 1 file>
with{inputValue: Input 1 URI}
andaddInput 1 URI
to the inputs section. URLs are small, so we’re passingthem in as command-line arguments. - Replace
<value of Param1 input>
with{inputValue: Parameter 1}
and addParameter 1
to the inputs section. Integers are small, so we’re passingthem in as command-line arguments. - Replace
<URI template for Output 1 file>
with{inputValue: Output 1 URI template}
and addOutput 1 URI template
to the inputs section. Thislooks very confusing: you’re adding an output URI into the inputs section.The reason is that currently you must manually pass in URIs, so thisis input, not output. - Replace
<local file path for the Output 1 URI>
with{outputPath: Output 1 URI}
and addOutput 1 URI
to the outputs section. Again, this looksquite confusing: you now have both input and output calledOutput 1 URI
.(Note that you can use different names.) The reason is that the URI ispass through. It’s passed to the task as input and is then output fromthe task, so that downstream tasks have access to it.
After replacing the placeholders and adding inputs/outputs, yourcomponent.yaml
looks like this:
inputs: #List of input specs. Each input spec is a map.
- {name: Input 1 URI}
- {name: Parameter 1}
- {name: Output 1 URI template}
outputs:
- {name: Output 1 URI}
implementation:
container:
image: gcr.io/my-org/my-image@sha256:a172..752f
command: [
python3, /pipelines/component/src/program.py,
--input1-path,
{inputValue: Input 1 URI}, # Refers to the "Input 1 URI" input
--param1,
{inputValue: Parameter 1}, # Refers to the "Parameter 1" input
--output1-path,
{inputValue: Output 1 URI template}, # Refers to "Output 1 URI template" input
--output1-path-file,
{outputPath: Output 1 URI}, # Refers to the "Output 1 URI" output
]
The above component specification is sufficient, but you should add moremetadata to make it more useful. The example below includes the followingadditions:
- Component name and description.
- For each input and output: description, default value, and type.
Final version of component.yaml
:
name: Do dummy work
description: Performs some dummy work.
inputs:
- {name: Input 1 URI, type: GCSPath, description='GCS path to Input 1'}
- {name: Parameter 1, type: Integer, default='100', description='Parameter 1 description'} # The default values must be specified as YAML strings.
- {name: Output 1 URI template, type: GCSPath, description='GCS path template for Output 1'}
outputs:
- {name: Output 1 URI, type: GCSPath, description='GCS path for Output 1'}
implementation:
container:
image: gcr.io/my-org/my-image@sha256:a172..752f
command: [
python3, /pipelines/component/src/program.py,
--input1-path, {inputValue: Input 1 URI},
--param1, {inputValue: Parameter 1},
--output1-path, {inputValue: Output 1 URI template},
--output1-path-file, {outputPath: Output 1 URI},
]
Build your component into a pipeline with the Kubeflow Pipelines SDK
Here is a sample pipeline that shows how to load a component and use it tocompose a pipeline
import kfp
# Load the component by calling load_component_from_file or load_component_from_url
# To load the component, the pipeline author only needs to have access to the component.yaml file.
# The Kubernetes cluster executing the pipeline needs access to the container image specified in the component.
dummy_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml'))
# dummy_op = kfp.components.load_component_from_url('http://....../component.yaml')
# dummy_op is now a "factory function" that accepts the arguments for the component's inputs
# and produces a task object (e.g. ContainerOp instance).
# Inspect the dummy_op function in Jupyter Notebook by typing "dummy_op(" and pressing Shift+Tab
# You can also get help by writing help(dummy_op) or dummy_op? or dummy_op??
# The signature of the dummy_op function corresponds to the inputs section of the component.
# Some tweaks are performed to make the signature valid and pythonic:
# 1) All inputs with default values will come after the inputs without default values
# 2) The input names are converted to pythonic names (spaces and symbols replaced
# with underscores and letters lowercased).
# Define a pipeline and create a task from a component:
@kfp.dsl.pipeline(name='My pipeline', description='')
def my_pipeline():
dummy1_task = dummy_op(
# Input name "Input 1 URI" is converted to pythonic parameter name "input_1_uri"
input_1_uri='gs://my-bucket/datasets/train.tsv',
parameter_1='100',
# You must use Argo placeholders ("{{workflow.uid}}" and "{{pod.name}}")
# to guarantee that the outputs from different pipeline runs and tasks write
# to unique locations and do not overwrite each other.
output_1_uri='gs://my-bucket/{{workflow.uid}}/{{pod.name}}/output_1/data',
).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))
# To access GCS, you must configure the container to have access to a
# GCS secret that grants required access to the bucket.
# The outputs of the dummy1_task can be referenced using the
# dummy1_task.outputs dictionary.
# ! The output names are converted to lowercased dashed names.
# Pass the outputs of the dummy1_task to some other component
dummy2_task = dummy_op(
input_1_uri=dummy1_task.outputs['output-1-uri'],
parameter_1='200',
output_1_uri='gs://my-bucket/{{workflow.uid}}/{{pod.name}}/output_1/data',
).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))
# To access GCS, you must configure the container to have access to a
# GCS secret that grants required access to the bucket.
# This pipeline can be compiled, uploaded and submitted for execution.
Organizing the component files
This section provides a recommended way to organize the component files. Thereis no requirement that you must organize the files in this way. However, usingthe standard organization makes it possible to reuse the same scripts fortesting, image building and component versioning.See thissample componentfor an real-life component example.
components/<component group>/<component name>/
src/* #Component source code files
tests/* #Unit tests
run_tests.sh #Small script that runs the tests
README.md #Documentation. Move to docs/ if multiple files needed
Dockerfile #Dockerfile to build the component container image
build_image.sh #Small script that runs docker build and docker push
component.yaml #Component definition in YAML format
Next steps
- Consolidate what you’ve learned by reading thebest practices for designing andwriting components.
- See how to export metrics from yourpipeline.
- Visualize the output of your component byadding metadata for an outputviewer.
- Explore the reusable components and other sharedresources.