Skip to main content

Lightweight DAG composition framework

Project description

⚗️ Daglib - Lightweight DAG composition framework

PyPI version PyPI - Downloads PyPI - Python Version Code style: black Checked with mypy pre-commit

Daglib is a lightweight, embeddable parallel task execution library used for turning pure Python functions into executable task graphs.

Installation

Core

pip install daglib

With visualizations enabled

pip install 'daglib[graphviz]'  # static visualizations
# or
pip install 'daglib[ipycytoscape]'  # interactive visulizations

Create your first DAG

import daglib

dag = daglib.Dag()


@dag.task()
def task_1a():
    return "Hello"


@dag.task()
def task_1b():
    return "world!"


@dag.task()
def task_2(task_1a, task_1b):
    return f"{task_1a}, {task_1b}"


dag.run()
'Hello, world!'

Beyond the "Hello, world!" example

For a more involved example, we will create a small pipeline that takes data from four source tables and creates a single reporting table. The data is driver-level information from the current 2022 Formula 1 season. The output will be a pivot table for team-level metrics.

Source Tables

  1. Team - Team of driver
  2. Points - Current total Driver's World Championship points for each driver for the season
  3. Wins - Current number of wins for each driver for the season
  4. Podiums - Current number of times the driver finished in the top 3 for the season
import pandas as pd
import daglib

# Ignore. Used to render the DataFrame correctly in the README
pd.set_option("display.notebook_repr_html", False)

dag = daglib.Dag()


@dag.task()
def team():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        team=["Red Bull", "Ferrari", "Mercedes", "Red Bull", "Ferrari", "Mercedes"],
    )).set_index("driver")


@dag.task()
def points():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        points=[258, 178, 146, 173, 156, 158]
    )).set_index("driver")


@dag.task()
def wins():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        wins=[8, 3, 0, 1, 1, 0]
    )).set_index("driver")


@dag.task()
def podiums():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        podiums=[10, 5, 6, 6, 6, 5]
    )).set_index("driver")


@dag.task()
def driver_metrics(team, points, wins, podiums):
    return team.join(points).join(wins).join(podiums)


@dag.task()
def team_metrics(driver_metrics):
    return driver_metrics.groupby("team").sum().sort_values("points", ascending=False)


dag.run()
          points  wins  podiums
team
Red Bull     431     9       16
Ferrari      334     4       11
Mercedes     304     0       11

Task Graph Visualization

The DAG we created above will create a task graph that looks like the following

task graph

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daglib-0.6.0.tar.gz (6.2 kB view hashes)

Uploaded Source

Built Distribution

daglib-0.6.0-py3-none-any.whl (6.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page