Skip to main content

Lightweight DAG composition framework

Project description

⚗️ Daglib - Lightweight DAG composition framework

PyPI version PyPI - Downloads PyPI - Python Version Code style: black Checked with mypy pre-commit

Daglib is a lightweight alternative to Airflow and other orchestration engines. It is meant to run on a single machine and comes with many great features out of the box like task I/O, dynamic task generation, and simple testing and deployment.

It can run as a standalone application or be embedded in another application to enable more complex use cases like event-driven workflows, conditional workflows, and more.

See documentation at https://mharrisb1.github.io/daglib/

Installation

pip install daglib

Create your first DAG

import daglib

dag = daglib.Dag()


@dag.task()
def task_1a():
    return "Hello"


@dag.task()
def task_1b():
    return "world!"


@dag.task(final=True)
def tassk_2(task_1a, task_1b):
    print(f"{task_1a}, {task_1b}")


dag.run()
Hello, world!

Beyond the "Hello, world!" example

For a more involved example, we will create a small ETL pipeline that takes data from four source tables and creates a single reporting table. The data is driver-level information from the current 2022 Formula 1 season. The output will be a pivot table for team-level metrics.

Source Tables

  1. Team - Which team the driver belongs to for the season
  2. Points - Current total Driver's World Championship points for each driver for the season
  3. Wins - Current number of wins for each driver for the season
  4. Podiums - Current number of times the driver finished in the top 3 for the season
import pandas as pd
import daglib

# Ignore. Used to render the DataFrame correctly in the README
pd.set_option("display.notebook_repr_html", False)

dag = daglib.Dag()


@dag.task()
def team():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        team=["Red Bull", "Ferrari", "Mercedes", "Red Bull", "Ferrari", "Mercedes"],
    )).set_index("driver")


@dag.task()
def points():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        points=[175, 126, 77, 129, 102, 111]
    )).set_index("driver")


@dag.task()
def wins():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        wins=[6, 2, 0, 1, 0, 0]
    )).set_index("driver")


@dag.task()
def podiums():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        podiums=[7, 4, 2, 5, 5, 3]
    )).set_index("driver")


@dag.task()
def driver_metrics(team, points, wins, podiums):
    return team.join(points).join(wins).join(podiums)


@dag.task(final=True)
def team_metrics(driver_metrics):
    return driver_metrics.groupby("team").sum().sort_values("points", ascending=False)


dag.run()
          points  wins  podiums
team
Red Bull     304     7       12
Ferrari      228     2        9
Mercedes     188     0        5

Task Graph Visualization

The DAG we created above will create a task graph that looks like the following

task graph

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daglib-0.3.2.tar.gz (16.0 kB view hashes)

Uploaded Source

Built Distribution

daglib-0.3.2-py3-none-any.whl (8.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page