Skip to main content

Kubernetes platform configuration library and generated protos.

Project description

Kubeflow Pipelines SDK kfp-kubernetes API Reference

The Kubeflow Pipelines SDK kfp-kubernetes python library (part of the Kubeflow Pipelines project) is an addon to the Kubeflow Pipelines SDK that enables authoring Kubeflow pipelines with Kubernetes-specific features and concepts, such as:

Be sure to check out the full API Reference for more details.

Installation

The kfp-kubernetes package can be installed as a KFP SDK extra dependency.

pip install kfp[kubernetes]

Or installed independently:

pip install kfp-kubernetes

Getting started

The following is an example of a simple pipeline that uses the kfp-kubernetes library to mount a pre-existing secret as an environment variable available in the task's container.

Secret: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    import os
    print(os.environ['SECRET_VAR'])

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_env(task,
                                 secret_name='my-secret',
                                 secret_key_to_env={'password': 'SECRET_VAR'})

Other examples

Here is a non-exhaustive list of some other examples of how to use the kfp-kubernetes library. Be sure to check out the full API Reference for more details.

Secret: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol')

Secret: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol'
                                    optional=True)

ConfigMap: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    import os
    print(os.environ['CM_VAR'])

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_env(task,
                                 config_map_name='my-cm',
                                 config_map_key_to_env={'foo': 'CM_VAR'})

ConfigMap: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol')

ConfigMap: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol',
				       optional=True)

PersistentVolumeClaim: Dynamically create PVC, mount, then delete

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.component
def read_data():
    with open('/reused_data/file.txt') as f:
        print(f.read())

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteOnce'],
        size='5Gi',
        storage_class_name='standard',
    )

    task1 = make_data()
    # normally task sequencing is handled by data exchange via component inputs/outputs
    # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
    task2 = read_data().after(task1)

    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )

    # wait to delete the PVC until after task2 completes
    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)

PersistentVolumeClaim: Create PVC on-the-fly tied to your pod's lifecycle

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.pipeline
def my_pipeline():
    task1 = make_data()
    # note that the created pvc will be autoamatically cleaned up once pod disappeared and cannot be shared between pods
    kubernetes.add_ephemeral_volume(
        task1,
        volume_name="my-pvc",
        mount_path="/data",
        access_modes=['ReadWriteOnce'],
        size='5Gi',
    )

Pod Metadata: Add pod labels and annotations to the container pod's definition

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.add_pod_label(
        task,
        label_key='kubeflow.com/kfp',
        label_value='pipeline-node',
    )
    kubernetes.add_pod_annotation(
        task,
        annotation_key='run_id',
        annotation_value='123456',
    )

Kubernetes Field: Use Kubernetes Field Path as enviornment variable

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.use_field_path_as_env(
        task,
        env_name='KFP_RUN_NAME',
        field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
    )

Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds

from kfp import dsl
from kfp import kubernetes

@dsl.component
def comp():
    pass

@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.set_timeout(task, 20)

ImagePullPolicy: One of "Always" "Never", "IfNotPresent".

from kfp import dsl
from kfp import kubernetes

@dsl.component
def simple_task():
    print("hello-world")

@dsl.pipeline
def pipeline():
    task = simple_task()
    kubernetes.set_image_pull_policy(task, "Always")

SecurityContext: Set security context for the task's container

from kfp import dsl
from kfp import kubernetes

@dsl.component
def my_task():
    print("running with custom security context")

@dsl.pipeline
def pipeline():
    task = my_task()
    kubernetes.set_security_context(
        task,
        run_as_user=1000,
        run_as_group=3000,
    )

Note: Cluster administrators can configure default runAsUser and runAsGroup values for all customer workload containers via the pipeline-install-config ConfigMap (defaultSecurityContextRunAsUser and defaultSecurityContextRunAsGroup). When admin defaults are configured, SDK-specified values for those fields are overridden.

ImagePullSecrets: Set secrets to authenticate image pulls

from kfp import dsl
from kfp import kubernetes

@dsl.container_component
def hello():
    return dsl.ContainerSpec(
        image='some-private-image:tag',
        command=['echo', 'hello']
    )

@dsl.pipeline
def pipeline():
    task = hello()
    kubernetes.set_image_pull_secrets(task, ['secret-name'])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kfp_kubernetes-2.16.0.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kfp_kubernetes-2.16.0-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file kfp_kubernetes-2.16.0.tar.gz.

File metadata

  • Download URL: kfp_kubernetes-2.16.0.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kfp_kubernetes-2.16.0.tar.gz
Algorithm Hash digest
SHA256 39c0ae960f3cbd349b1fb823071bc687b626d14cb759746b5b98700a7d6956a8
MD5 fec1fcc3fd7b71d1e6fa74bb94a2b4ca
BLAKE2b-256 103d953a9553ef097425a8e050037edab73794d6fb7ec94010ba30bebb860e5c

See more details on using hashes here.

Provenance

The following attestation bundles were made for kfp_kubernetes-2.16.0.tar.gz:

Publisher: publish-packages.yml on kubeflow/pipelines

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kfp_kubernetes-2.16.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kfp_kubernetes-2.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d2a5ef0edfbfdf3a60dbb72f8cbde766636d06cd30d90cbd23c8167538fcdb4d
MD5 b2571b054d2ab101699e4bfd357caebd
BLAKE2b-256 53ee3f36c14ddc007e9c0a542d4dbd25409c3dab49c4423446365c493c1525a4

See more details on using hashes here.

Provenance

The following attestation bundles were made for kfp_kubernetes-2.16.0-py3-none-any.whl:

Publisher: publish-packages.yml on kubeflow/pipelines

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page