Skip to main content

A Python library for defining, managing, and executing function pipelines.

Project description

PipeFunc: Structure, Automate, and Simplify Your Computational Workflows 🕸

Stop micromanaging execution. Focus on the science. Capture your workflow's essence with function pipelines, represent computations as DAGs, and automate parallel sweeps.

Python PyPi Ruff pytest Conda Coverage CodSpeed Badge Documentation Downloads GitHub Discord

:books: Table of Contents

:thinking: What is this?

asciicast

pipefunc is a Python library designed for creating and executing function pipelines. By simply annotating functions and specifying their outputs, it builds a pipeline that automatically manages the execution order based on dependencies. Visualize the pipeline as a directed graph, execute the pipeline for all (or specific) outputs, add multidimensional sweeps, automatically parallelize the pipeline, and get nicely structured data back.

[!NOTE] A pipeline is a sequence of interconnected functions, structured as a Directed Acyclic Graph (DAG), where outputs from one or more functions serve as inputs to subsequent ones. pipefunc streamlines the creation and management of these pipelines, offering powerful tools to efficiently execute them.

Whether you're working with data processing, scientific computations, machine learning (AI) workflows, or any other scenario involving interdependent functions, pipefunc helps you focus on the logic of your code while it handles the intricacies of function dependencies and execution order.

:rocket: Key Features

  1. 🚀 Function Composition and Pipelining: Create pipelines by using the @pipefunc decorator; execution order is automatically handled.
  2. 📊 Pipeline Visualization: Generate visual graphs of your pipelines to better understand the flow of data.
  3. 👥 Multiple Outputs: Handle functions that return multiple results, allowing each result to be used as input to other functions.
  4. 🔁 Map-Reduce Support: Perform "map" operations to apply functions over data and "reduce" operations to aggregate results, allowing n-dimensional mappings.
  5. 👮 Type Annotations Validation: Validates the type annotations between functions to ensure type consistency.
  6. 🎛️ Resource Usage Profiling: Get reports on CPU usage, memory consumption, and execution time to identify bottlenecks and optimize your code.
  7. 🔄 Automatic parallelization: Automatically runs pipelines in parallel (local or remote) with shared memory and disk caching options.
  8. Ultra-Fast Performance: Minimal overhead of about 15 µs per function in the graph, ensuring blazingly fast execution.
  9. 🔍 Parameter Sweep Utilities: Generate parameter combinations for parameter sweeps and optimize the sweeps with result caching.
  10. 💡 Flexible Function Arguments: Call functions with different argument combinations, letting pipefunc determine which other functions to call based on the provided arguments.
  11. 🏗️ Leverages giants: Builds on top of NetworkX for graph algorithms, NumPy for multi-dimensional arrays, and optionally Xarray for labeled multi-dimensional arrays, Zarr to store results in memory/disk/cloud or any key-value store, and Adaptive for parallel sweeps.
  12. 🤓 Nerd stats: >1000 tests with 100% test coverage, fully typed, only 3 required dependencies, all Ruff Rules, all public API documented.

:test_tube: How does it work?

pipefunc provides a Pipeline class that you use to define your function pipeline. You add functions to the pipeline using the pipefunc decorator, which also lets you specify the function's output name. Once your pipeline is defined, you can execute it for specific output values, simplify it by combining function nodes, visualize it as a directed graph, and profile the resource usage of the pipeline functions. For more detailed usage instructions and examples, please check the usage example provided in the package.

Here is a simple example usage of pipefunc to illustrate its primary features:

from pipefunc import pipefunc, Pipeline

# Define three functions that will be a part of the pipeline
@pipefunc(output_name="c")
def f_c(a, b):
    return a + b

@pipefunc(output_name="d")
def f_d(b, c):
    return b * c

@pipefunc(output_name="e")
def f_e(c, d, x=1):
    return c * d * x

# Create a pipeline with these functions
pipeline = Pipeline([f_c, f_d, f_e], profile=True)  # `profile=True` enables resource profiling

# Call the pipeline directly for different outputs:
assert pipeline("d", a=2, b=3) == 15
assert pipeline("e", a=2, b=3) == 75

# Visualize the pipeline
pipeline.visualize()

# Show resource reporting (only works if profile=True)
pipeline.print_profiling_stats()

This example demonstrates defining a pipeline with f_c, f_d, f_e functions, accessing and executing these functions using the pipeline, visualizing the pipeline graph, getting all possible argument mappings, and reporting on the resource usage. This basic example should give you an idea of how to use pipefunc to construct and manage function pipelines.

The following example demonstrates how to perform a map-reduce operation using pipefunc:

from pipefunc import pipefunc, Pipeline
from pipefunc.map import load_outputs
import numpy as np

@pipefunc(output_name="c", mapspec="a[i], b[j] -> c[i, j]")  # the mapspec is used to specify the mapping
def f(a: int, b: int):
    return a + b

@pipefunc(output_name="mean")  # there is no mapspec, so this function takes the full 2D array
def g(c: np.ndarray):
    return np.mean(c)

pipeline = Pipeline([f, g])
inputs = {"a": [1, 2, 3], "b": [4, 5, 6]}
pipeline.map(inputs, run_folder="my_run_folder", parallel=True)
result = load_outputs("mean", run_folder="my_run_folder")
print(result)  # prints 7.0

Here the mapspec argument is used to specify the mapping between the inputs and outputs of the f function, it creates the product of the a and b input lists and computes the sum of each pair. The g function then computes the mean of the resulting 2D array. The map method executes the pipeline for the inputs, and the load_outputs function is used to load the results of the g function from the specified run folder.

:notebook: Jupyter Notebook Example

See the detailed usage example and more in our example.ipynb.

[!TIP] Have uv installed? Run uvx --with "pipefunc[docs]" -p 3.13 opennb pipefunc/pipefunc/example.ipynb to open the example notebook in your browser without the need to setup anything!

:computer: Installation

Install the latest stable version from conda (recommended):

conda install pipefunc

or from PyPI:

pip install "pipefunc[all]"

or install main with:

pip install -U https://github.com/pipefunc/pipefunc/archive/main.zip

or clone the repository and do a dev install (recommended for dev):

git clone git@github.com:pipefunc/pipefunc.git
cd pipefunc
pip install -e ".[dev]"

:hammer_and_wrench: Development

We use pre-commit to manage pre-commit hooks, which helps us ensure that our code is always clean and compliant with our coding standards. To set it up, install pre-commit with pip and then run the install command:

pip install pre-commit
pre-commit install

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipefunc-0.90.3.tar.gz (453.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipefunc-0.90.3-py3-none-any.whl (230.5 kB view details)

Uploaded Python 3

File details

Details for the file pipefunc-0.90.3.tar.gz.

File metadata

  • Download URL: pipefunc-0.90.3.tar.gz
  • Upload date:
  • Size: 453.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pipefunc-0.90.3.tar.gz
Algorithm Hash digest
SHA256 4cbe4f4a12b7d417899b680c232060eee9b80a35798a277deb98271582816f3e
MD5 c4dd9707ab57318c11dbc621e66d2fe3
BLAKE2b-256 0c7c91c3cfcd8048e937f40d9147fc75716843d24f09972f16e6f560220a1127

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipefunc-0.90.3.tar.gz:

Publisher: pythonpublish.yml on pipefunc/pipefunc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pipefunc-0.90.3-py3-none-any.whl.

File metadata

  • Download URL: pipefunc-0.90.3-py3-none-any.whl
  • Upload date:
  • Size: 230.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pipefunc-0.90.3-py3-none-any.whl
Algorithm Hash digest
SHA256 3c27424ee2471acd0927faffd9f3fce1cdd373c1800532a40b11c1a00d805481
MD5 a13a6642069aa5307749aafb62a0e5ee
BLAKE2b-256 e8f15d1767a6409d0fb2e68fc4b626d7436dccae822ed1fbef90589672e0a81e

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipefunc-0.90.3-py3-none-any.whl:

Publisher: pythonpublish.yml on pipefunc/pipefunc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page