This is a starter kit for executing scheduled dbt runs (Bolt schedules) within Apache Airflow. For ease of deployment, this project uses Astronomer.io to manage Apache Airflow. If you want to deploy your project locally, see the section below titled "Deploy Your Project Locally Using Astronomer."
Before using this starter kit, ensure you have the following prerequisites set up:
- An existing airflow project
- An Integrated Development Environment (IDE) like VSCode
- Docker, Podman, or a similar tool
- Python
This demo doesn't cover the initial setup of an Airflow project. If you need guidance on setting up an Airflow project, sign up for a free trial of astronomer.io and follow their instructions.
Follow these steps to set up and run your scheduled dbt runs with Paradime and Apache Airflow:
Generate a new API key in Paradime by clicking "Generate API Key" in the Paradime account settings. Here's what it looks like:
![Screenshot 2023-09-12 at 11 09 35 AM](https://private-user-images.githubusercontent.com/107123308/267409172-e9e906dd-7fda-4bef-9797-ff3e58e91fba.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjE0NTc1MzMsIm5iZiI6MTcyMTQ1NzIzMywicGF0aCI6Ii8xMDcxMjMzMDgvMjY3NDA5MTcyLWU5ZTkwNmRkLTdmZGEtNGJlZi05Nzk3LWZmM2U1OGU5MWZiYS5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjQwNzIwJTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI0MDcyMFQwNjMzNTNaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT02NTdiMTA0YTZkOWE2OWNlOWNiM2U4ZTBlYzU2NGM3NTZmZjgzMWVjZGVjOGE5NTRlZjg2MDJlYzEwZThiMTc2JlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCZhY3Rvcl9pZD0wJmtleV9pZD0wJnJlcG9faWQ9MCJ9.szP96esJyrbBbKCvjBDZzL25mb_9kOfqO8gSypdN3jA)
Add the following variables to your Airflow deployment:
X-API-KEY
X-API-SECRET
URL
The values of these variables should be the values you generated in Paradime (step 1).
Note:
- If you are running airflow locally (ex. through Docker or Podman) your airflow deployment URL might look something like: http://localhost:8080
- If you are running airflow in the cloud (ex. through astronomer) your airflow deployment URL might look something like: sadflsdf000101getpnvrhv8.astronomer.run/d4rf5zb8
Add the following Python files to your Airflow "dags" folder:
dags.py
paradime_schedules.py
In the dags.py
file, update the following variables:
DAG_ID
: A unique identifier for the DAG (e.g.,DAG_ID = "0_bolt_airflow"
).DAG_INTERVAL
: A cron schedule for when the DAG should run (e.g.,DAG_INTERVAL = "@daily"
).SCHEDULE_NAME
: Should match the name of the schedule inparadime_schedules.yml
in the DBT folder (e.g.,SCHEDULE_NAME = "my_schedule_name_in_paradime"
).
Save the updated dags.py
file.
Test and deploy your new scheduled dbt runs. Depending on your Airflow setup, you can test your new DAGs. For example, if you're using Astronomer, you can use the following command:
- Astro example: astro deploy --dags
This Python file, dags.py
, provides an example setup for running Paradime's Bolt within an Apache Airflow Directed Acyclic Graph (DAG). It demonstrates how to configure Airflow to interact with Paradime's API for scheduling and monitoring tasks. To use this DAG, Airflow needs specific variables such as X-API-KEY
, X-API-SECRET
, and URL
. Additionally, you can customize global variables within the file, including DAG_ID
, DAG_INTERVAL
, and SCHEDULE_NAME
.
The main components and actions performed in this DAG include:
- Importing required modules and functions from Apache Airflow and Paradime.
- Defining global variables like
DAG_ID
,DAG_INTERVAL
, andSCHEDULE_NAME
. - Creating an Airflow DAG object with the specified configuration.
- Defining Airflow tasks, including start and end tasks (represented by DummyOperator), upstream and downstream tasks (using BashOperator and PythonOperator), and a Python sensor (PythonSensor) to monitor the status of a Bolt run.
- Configuring the task dependencies, where tasks are executed sequentially.
This example serves as a template for incorporating Paradime's Bolt into custom Airflow workflows for scheduling and automating tasks. Users can modify this DAG to fit their specific requirements and integrate it into their Airflow environment.
Remember to replace the variable values and schedule name according to your Paradime setup and use case before deploying this DAG.
The paradime_schedules.py
Python script provides essential functions for interacting with Paradime's scheduling API within an Apache Airflow environment. It facilitates the execution and monitoring of scheduled tasks through the following functions:
-
run_schedule(schedule_name: str, task_instance: TaskInstance) -> None
: This function triggers the execution of a Paradime Bolt run using a GraphQL mutation query. It accepts theschedule_name
as a parameter and communicates with the Paradime API by sending a POST request. The resultingrun_id
is extracted from the API response and stored in the Airflow task instance's XCom data for tracking. -
get_run_status(task_instance: TaskInstance) -> bool
: This function retrieves the status of a previously triggered Paradime Bolt run. It queries the Paradime API for the run's status using therun_id
stored in the task instance's XCom data. The function raises exceptions if the run has failed or encountered errors and returnsTrue
if the run is not in a "RUNNING" state. -
_extract_gql_response(request: requests.Response, query_name: str, field: str) -> str
: A utility function used internally to extract relevant data from the GraphQL API response. It handles parsing and error checking.
To use these functions effectively, users must configure their Apache Airflow environment with the necessary variables such as URL
, X-API-KEY
, and X-API-SECRET
. This script enables the integration of Paradime's scheduling capabilities into custom Airflow workflows for automated task execution and monitoring.