Skip to main content

OpenLineage integration with Airflow

Project description

OpenLineage Airflow Integration

A library that integrates Airflow DAGs with OpenLineage for automatic metadata collection.

Features

Metadata

  • Task lifecycle
  • Task parameters
  • Task runs linked to versioned code
  • Task inputs / outputs

Lineage

  • Track inter-DAG dependencies

Built-in

  • SQL parser
  • Link to code builder (ex: GitHub)
  • Metadata extractors

Requirements

Installation

$ pip3 install openlineage-airflow

Note: You can also add openlineage-airflow to your requirements.txt for Airflow.

To install from source, run:

$ python3 setup.py install

Configuration

HTTP Backend Environment Variables

openlineage-airflow uses OpenLineage client to push data to OpenLineage backend.

OpenLineage client depends on environment variables:

  • OPENLINEAGE_URL - point to service which will consume OpenLineage events
  • OPENLINEAGE_API_KEY - set if consumer of OpenLineage events requires Bearer authentication key
  • OPENLINEAGE_NAMESPACE - set if you are using something other than the default namespace for job namespace.

For backwards compatibility, openlineage-airflow also support configuration via MARQUEZ_URL, MARQUEZ_NAMESPACE and MARQUEZ_API_KEY variables.

MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns

Extractors : Sending the correct data from your DAGs

If you do nothing, OpenLineage backend will receive the Job and the Run from your DAGs, but sources and datasets will not be sent.

openlineage-airflow allows you to do more than that by building "Extractors". Extractors are in the process of changing right now, but they basically take a task and extract:

  1. Name : The name of the task
  2. Location : Location of the code for the task
  3. Inputs : List of input datasets
  4. Outputs : List of output datasets
  5. Context : The Airflow context for the task

Great Expectations

great_expectations extractor requires more care than that. For technical reasons, you need to manually provide dataset name and namespace for dataset provided to great expectations operator by calling function openlineage.airflow.extractors.great_expectations_extractor.set_dataset_info.

Usage

To begin collecting Airflow DAG metadata with OpenLineage, use:

- from airflow import DAG
+ from openlineage.airflow import DAG

When enabled, the library will:

  1. On DAG start, collect metadata for each task using an Extractor (the library defines a default extractor to use otherwise)
  2. Collect task input / output metadata (source, schema, etc)
  3. Collect task run-level metadata (execution time, state, parameters, etc)
  4. On DAG complete, also mark the task as complete in OpenLineage

Triggering Child Jobs

Commonly, Airflow DAGs will trigger processes on remote systems, such as an Apache Spark or Apache Beam job. Those systems may have their own OpenLineage integration and report their own job runs and dataset inputs/outputs. To propagate the job hierarchy, tasks must send their own run id so that the downstream process can report the ParentRunFacet with the proper run id.

The lineage_run_id macro exists to inject the run id of a given task into the arguments sent to a remote processing job's Airflow operator. The macro requires the DAG run_id and the task to access the generated run id for that task. For example, a Spark job can be triggered using the DataProcPySparkOperator with the correct parent run id using the following configuration:

t1 = DataProcPySparkOperator(
    task_id=job_name,
    #required pyspark configuration,
    job_name=job_name,
    dataproc_pyspark_properties={
        'spark.driver.extraJavaOptions':
            f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
        dag=dag)

Development

To install all dependencies for local development:

# Bash
$ pip3 install -e .[dev]
# escape the brackets in zsh
$ pip3 install -e .\[dev\]

To run the entire test suite, you'll first want to initialize the Airflow database:

$ airflow initdb

Then, run the test suite with:

$ pytest

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openlineage-airflow-0.2.0.tar.gz (17.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openlineage_airflow-0.2.0-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file openlineage-airflow-0.2.0.tar.gz.

File metadata

  • Download URL: openlineage-airflow-0.2.0.tar.gz
  • Upload date:
  • Size: 17.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.4 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.6.14

File hashes

Hashes for openlineage-airflow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a559317835fe0428b9b259e64b6f3d6375ddf5db439a76373330b4003ecba6b4
MD5 a126031cf7684fe76e6f55a40decd8e9
BLAKE2b-256 295b031c9ab8094585ae5959205594595e6ce4cb0540f7d79dfc7aa616f39421

See more details on using hashes here.

File details

Details for the file openlineage_airflow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: openlineage_airflow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 24.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.4 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.6.14

File hashes

Hashes for openlineage_airflow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6334a0b1b211edb02ef7a0496e14b7d47a775202560b5b27fe6dbed9b4919786
MD5 b8520a788167a285b96f536f60a4e549
BLAKE2b-256 50de8a9b14c9ba3609c4627113b6c5628d86aade0103ff7976b2dab76bf5c1b8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page