Welcome! 🚀
This is a repository centering around DAG Writing Best Practices. It is designed to help you learn about Airflow best practices by working through a series of exercises.
The dags/code_syntax_examples folder contains complete DAGs that show how to use different Airflow features like callbacks, dynamic task mapping or dataset scheduling.
The dags/exercises folder contains incomplete DAGs that you can complete following the DAG writing exercises section below.
The solutions to the exercises can be found in the solutions folder.
This repo was part of a 2hr workshop of which a recording, including a walk-through of the exercises, is available here.
Set up your environment by following the instructions in the Setup section below. All DAGs in this repository can be run locally and on Astro without connecting to external systems. Exercises 1-4 are focussed on DAG writing and can be completed in a local environment. Exercises 5-8 are focussed on deploying and testing DAGs and exercises 5-7 require an Astro deployment. If you do not have an Astro account yet, you can sign up for a free trial here.
Sample solutions for DAG-writing related exercises can be found in the dags/solutions
folder of the repo, note that some exercises can be solved in multiple ways.
Tip
Consider using Ask Astro if you need additional guidance with any of the exercises.
The dags/code_syntax_examples
folder contains code examples for Airflow features discussed in the workshop. The tests
folder contains example DAG validation tests as well as an example for unit testing a custom operator and integration testing a DAG.
To set up a local Airflow environment you have two options, you can either use the Astro CLI or GitHub Codespaces.
-
Make sure you have Docker installed and running on your machine.
-
Install the free and open-source Astro CLI.
-
Fork this repository and clone it to your local machine. Make sure you uncheck the
Copy the main branch only
option when forking. -
Run
astro dev start
in the root of the cloned repository to start the Airflow environment. -
Access the Airflow UI at
localhost:8080
in your browser. Log in usingadmin
as both the username and password.
If you can't install the CLI, you can run the project from your forked repo using GitHub Codespaces.
-
Fork this repository. Make sure you uncheck the
Copy the main branch only
option when forking. -
Make sure you are on the
dag-writing-best-practices
branch. -
Click on the green "Code" button and select the "Codespaces" tab.
-
Click on the 3 dots and then
+ New with options...
to create a new Codespace with a configuration, make sure to select a Machine type of at least4-core
. -
Run
astro dev start -n --wait 5m
in the Codespaces terminal to start the Airflow environment using the Astro CLI. This can take a few minutes.Once you see the following printed to your terminal, the Airflow environment is ready to use:
Airflow is starting up! Project is running! All components are now available. Airflow Webserver: http://localhost:8080 Postgres Database: localhost:5435/postgres The default Airflow UI credentials are: admin:admin The default Postgres DB credentials are: postgres:postgres
-
Once the Airflow project has started, access the Airflow UI by clicking on the Ports tab and opening the forward URL for port
8080
. -
Log into the Airflow UI using
admin
as both the username and password. It is possible that after logging in you see an error, in this case you have to open the URL again from the ports tab.
The exercise DAGs are located in the dags/exercises
folder. They are tagged with exercise
as well as their exercise number.
These exercises are designed to get you familiar with commonly used Airflow features and methods for authoring DAGs. Making use of these features help make your DAGs more scalable and reliable and easier to troubleshoot. Feel free to use the following resources:
DAGs:
upstream_dag_1
: Retrieves weather data for a list of cities.upstream_dag_2
: Retrieves historical weather data for a specific city and date.downstream_dag
: Creates a report based on data generated from the two upstream DAGs.
TAG: exercise_1
With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps to make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.
Currently, the downstream_dag
DAG is dependent on the data generated by the upstream_dag_1
and upstream_dag_2
DAGs, let's use Datasets to use this dependency in the schedule for the downstream_dag
!
Define a schedule for the downstream DAG to run:
- every day at midnight UTC AND
- whenever both the "current_weather_data" and "max_temp_data" datasets are updated AND ONE OF the datasets "wind_speed_data" OR "wind_direction_data" is updated.
To implement this you will need to modify:
- The
create_weather_table
task in theupstream_dag_1
DAG to produce an update to the "current_weather_data" dataset. - The
get_max_wind
andget_wind_direction
tasks in theupstream_dag_2
DAG to produce updates to the "wind_speed_data" and "wind_direction_data" datasets respectively. - The schedule of the
downstream_dag
DAG with a DatasetOrTimeSchedule.
See the DAG code comments for more hints.
For the solution see solutions/
.
With dynamic task mapping, you can write DAGs that adapt to your data at runtime! This is useful when you want to run a task for each item in a list.
Currently the upstream_dag_1
DAG is set up to retrieve weather data for just one city, let's modify it to retrieve weather data for multiple cities in parallel!
Dynamically map the get_lat_long_for_one_city
task over a list of cities returned by the get_cities
task in the upstream_dag_1
DAG.
To implement this you will need to modify:
- The
get_cities
task in theupstream_dag_1
DAG to return a list of cities. - The
get_lat_long_for_one_city
task in theupstream_dag_1
DAG to be dynamically mapped over the list of cities.
See the DAG code comments and the Dynamic Tasks guide for more hints.
For the solution see solutions/upstream_dag_1.py
.
It is a best practice is to let your tasks retry by default to handle task failures and prevent many consecutive failed DAG runs. This can be achieved by using DAG parameters.
For the downstream_dag
DAG:
- Set all tasks in the DAG to retry 3 times by default and give them a new owner (you!).
- Make sure the DAG never has more than 6 consecutive failed runs.
See the DAG code comments for more hints.
For the solution see solutions/downstream_dag.py
.
Top-level DAG code is an Airflow anti-pattern. Because the DAG processor executes the .py
files that contain DAGs, all code that's not contained within the context of a task will be executed, which could easily result in DAGs failing to parse.
Rewrite the top_level_code
DAG to move the top-level DAG code into a task. Then you can proceed to calculate the meaning of life, the universe, and everything.
For the solution see solutions/top_level_code_solution.py
.
Now that DAGs are written, we'll cover deploying DAGs and some of the Day 2 operations that Astro enables.
Feel free to use the following resources:
- Start an Astro trial using this link. You can choose a template project to deploy if you wish, or you can skip this step.
- Create a new, empty Airflow Deployment in your Astro workspace with default settings.
Now that you have a Deployment, you can deploy the code we just worked on. You have two options for this workshop:
- Deploy using the Astro CLI by running
astro login
to sign in to your trial, and thenastro deploy
. - Use the GitHub Integration to connect your Astro workspace to your GitHub account and deploy by pushing the code to your fork of the repo. This way you can deploy code even if you don't have the Astro CLI installed.
Set up 3 environment variables for our API connections:
AIRFLOW_CONN_HISTORICAL_WEATHER_API_CONN=http://https://archive-api.open-meteo.com%2Fv1%2F
AIRFLOW_CONN_WEATHER_API_CONN=http://https://api.open-meteo.com%2Fv1%2F
AIRFLOW_CONN_WILDCARD_CONN=http://https://air-quality-api.open-meteo.com%2Fv1%2Fair-quality%3Flatitude%3D46.9481%26longitude%3D7.4474%26hourly%3Dozone%26forecast_days%3D1/?__extra__=%7B%7D
Astro alerts provide an additional layer of observability over Airflow's built-in alerting system. In the exercises/ folder, one of the DAGs helps highlight this functionality. upstream_dag_1
is parameterized to run with user input. You can simulate a failure of the API that data is retrieved from or a time delay in a task completing.
Set up two alerts in your Astro deployment:
- A DAG failure alert for the
upstream_dag_1
DAG. - A task duration alert for the
create_weather_table
task inupstream_dag_1
(try 1 minute).
For both alerts, choose email as the communication channel. Try out the alerts by running the upstream_dag_1
with the Simulate API failure
param set to True
for the first alert and once again with the simulate_task_delay
param set to 120
seconds for the second alert.
The Astro CLI includes commands that you can use to test and debug DAGs both inside and outside of a locally running Airflow environment. Tests can then be set up to automatically run as part of a CI/CD workflow. Implementing DAG validation tests help you ensure that any new DAG code adheres to your organization’s standards and won’t cause issues in production.
Update the appropriate test in the test directory of your Astro project to check that all DAGs have at least 3
retries by default. Run the test using the Astro CLI astro dev pytest
command. See what happens if you run the test when retries are not set for one of the DAGs.