Skip to main content

Monte Carlo's Apache Airflow Provider

Project description

airflow-mcd

Monte Carlo's beta Airflow provider.

Installation

Requires Python 3.7 or greater and is compatible with Airflow 1.10.14 or greater.

You can install and update using pip. For instance:

pip install -U airflow-mcd

This package can be added like any other python dependency to Airflow (e.g. via requirements.txt).

Basic usage

Callbacks

Sends a webhook back to Monte Carlo upon an event in Airflow. [Detailed examples and documentation here] (https://docs.getmontecarlo.com/edit/airflow-v2). Callbacks are at the DAG or Task level.

To import: from airflow_mcd.callbacks import mcd_callbacks

Broad Callbacks

if you don't have existing callbacks, these provide all-in-one callbacks:

dag_callbacks

task_callbacks

examples:

dag = DAG(
    'dag_name',~~~~
    **mcd_callbacks.dag_callbacks,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

Explicit Callbacks

Callback Type Description DAG Task
on_success_callback Invoked when the DAG/task succeeds mcd_dag_success_callback mcd_task_success_callback
on_failure_callback Invoked when the DAG/task fails mcd_dag_failure_callback mcd_task_failure_callback
sla_miss_callback Invoked when task(s) in a DAG misses its defined SLA mcd_sla_miss_callback N/A
on_retry_callback Invoked when the task is up for retry N/A mcd_task_retry_callback
on_execute_callback Invoked right before the task begins executing. N/A mcd_task_execute_callback

examples:

dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    task_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

Hooks:

  • SessionHook

    Creates a pycarlo compatible session. This is useful for creating your own operator built on top of our Python SDK.

    This hook expects an Airflow HTTP connection with the Monte Carlo API id as the "login" and the API token as the "password".

    Alternatively, you could define both the Monte Carlo API id and token in "extra" with the following format:

    {
        "mcd_id": "<ID>",
        "mcd_token": "<TOKEN>"
    }
    

    See here for details on how to generate a token.

Operators:

  • BaseMcdOperator

    This operator can be extended to build your own operator using our SDK or any other dependencies. This is useful if you want implement your own custom logic (e.g. creating custom lineage after a task completes).

  • SimpleCircuitBreakerOperator

    This operator can be used to execute a circuit breaker compatible rule (custom SQL monitor) to run integrity tests before allowing any downstream tasks to execute. Raises an AirflowFailException if the rule condition is in breach when using an Airflow version newer than 1.10.11, as that is preferred for tasks that can be failed without retrying. Older Airflow versions raise an AirflowException. For instance:

    from datetime import datetime, timedelta
    
    from airflow import DAG
    
    try:
      from airflow.operators.bash import BashOperator
    except ImportError:
      # For airflow versions <= 2.0.0. This module was deprecated in 2.0.0.
      from airflow.operators.bash_operator import BashOperator
    
    from airflow_mcd.operators import SimpleCircuitBreakerOperator
    
    mcd_connection_id = 'mcd_default_session'
    
    with DAG('sample-dag', start_date=datetime(2022, 2, 8), catchup=False, schedule_interval=timedelta(1)) as dag:
        task1 = BashOperator(
            task_id='example_elt_job_1',
            bash_command='echo I am transforming a very important table!',
        )
        breaker = SimpleCircuitBreakerOperator(
            task_id='example_circuit_breaker',
            mcd_session_conn_id=mcd_connection_id,
            rule_uuid='<RULE_UUID>'
        )
        task2 = BashOperator(
            task_id='example_elt_job_2',
            bash_command='echo I am building a very important dashboard from the table created in task1!',
            trigger_rule='none_failed'
        )
    
        task1 >> breaker >> task2
    

    This operator expects the following parameters:

    • mcd_session_conn_id: A SessionHook compatible connection.
    • rule_uuid: UUID of the rule (custom SQL monitor) to execute.

    The following parameters can also be passed:

    • timeout_in_minutes [default=5]: Polling timeout in minutes. Note that The Data Collector Lambda has a max timeout of 15 minutes when executing a query. Queries that take longer to execute are not supported, so we recommend filtering down the query output to improve performance (e.g limit WHERE clause). If you expect a query to take the full 15 minutes we recommend padding the timeout to 20 minutes.
    • fail_open [default=True]: Prevent any errors or timeouts when executing a rule from stopping your pipeline. Raises AirflowSkipException if set to True and any issues are encountered. Recommended to set the trigger_rule param for any downstream tasks to none_failed in this case.

Tests and releases

Locally make test will run all tests. See README-dev.md for additional details on development. When ready for a review, create a PR against main.

When ready to release, create a new Github release with a tag using semantic versioning (e.g. v0.42.0) and CircleCI will test and publish to PyPI. Note that an existing version will not be deployed.

License

Apache 2.0 - See the LICENSE for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_mcd-0.1.0.tar.gz (23.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_mcd-0.1.0-py3-none-any.whl (16.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow_mcd-0.1.0.tar.gz.

File metadata

  • Download URL: airflow_mcd-0.1.0.tar.gz
  • Upload date:
  • Size: 23.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.13.0 pkginfo/1.9.6 requests/2.31.0 requests-toolbelt/1.0.0 tqdm/4.65.0 CPython/3.8.6

File hashes

Hashes for airflow_mcd-0.1.0.tar.gz
Algorithm Hash digest
SHA256 bbbe53e2fbf2b51e801ded622d273a3c664de02145e0ab2931559f631a2ad477
MD5 f43d03441c04b5362b93760b68180e6f
BLAKE2b-256 bd20b0d7af5f83137a6e3b77c1604254d54b33b777515f7c774d3d8e20b840de

See more details on using hashes here.

File details

Details for the file airflow_mcd-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: airflow_mcd-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 16.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.13.0 pkginfo/1.9.6 requests/2.31.0 requests-toolbelt/1.0.0 tqdm/4.65.0 CPython/3.8.6

File hashes

Hashes for airflow_mcd-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 00f04df454148494452054e7af312faca23f5a1e077a8c9edb80dd5326d38cb1
MD5 0581d7e7e7a85a5a7e2d0a2039b75d55
BLAKE2b-256 191ca53e0f2c8b3c43f5012d7e9e0df4840248a9ab907d4ff0e05d6aef865da5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page