Source Code of Book url
Data pipelines with apache airflow book name
Chapter 1
Airflow’s key feature is that it enables you to easily build scheduled data pipelines using a flexible Python framework, while also providing many building blocks that allow you to stitch together the many different technologies encountered in modern technological landscapes.
Airflow is not a data processing tool in itself but orchestrates the different com- ponents responsible for processing your data in data pipelines.
The Airflow scheduler —Parses DAGs, checks their schedule interval, and (if the DAGs’ schedule has passed) starts scheduling the DAGs’ tasks for execution by passing them to the Airflow workers.
The Airflow workers—Pick up tasks that are scheduled for execution and execute them. As such, the workers are responsible for actually “doing the work.”
The Airflow webserver —Visualizes the DAGs parsed by the scheduler and provides the main interface for users to monitor DAG runs and their results.
This property of Airflow’s schedule intervals is invaluable for implementing efficient data pipelines, as it allows you to build incremental data pipelines. In these incremental pipelines, each DAG run processes only data for the corresponding time slot (the data’s delta) instead of having to reprocess the entire data set every time. Especially for larger data sets, this can provide significant time and cost benefits by avoiding expensive recomputation of existing results. Schedule intervals become even more powerful when combined with the concept ofbackfillingAirflow, which allows you to execute a new DAG for historical schedule intervals that occurred in the past. This feature allows you to easily create (or backfill) new data sets with historical data simply by running your DAG for these past schedule intervals. Moreover, by clearing the results of past runs, you can also use this Airflow feature to easily rerun any historical tasks if you make changes to your task code, allowing you to easily reprocess an entire data set when needed.
Chapter 2
Anatomy of an Airflow DAG
Download_launches >> get_pictures >>notify is our pipeline. DAGS for Downloading and proceeding rocket launch data
Each operator performs a single unit of work, and multiple operators together form a workflow or DAG in Airflow. Operators run independently of each other, although you can define the order of execution, which we call dependencies in Airflow: download_launches >> get_pictures >> notify
Tasks vs. operators In this context and throughout the Airflow documentation, we see the terms operator and task used interchangeably. From a user’s perspective, they refer to the same thing, and the two often substitute each other in discussions. Operators provide the implementation of a piece of work. Airflow has a class called BaseOperator and many subclasses inheriting from the BaseOperator, such as PythonOperator, EmailOperator, and OracleOperator.
NOTE It is unnecessary to restart the entire workflow. A nice feature of Airflow is that you can restart from the point of failure and onward, without having to restart any previously succeeded tasks.
Scheduling in Airflow
In Airflow, we can use these execution dates by referencing them in our operators. For example, in the BashOperator, we can use Airflow’s templating functionality to include the execution dates dynamically in our Bash command. Templating is covered in detail in chapter 4.
Without an end date, Airflow will (in principle) keep executing our DAG on this daily schedule until the end of time. However, if we already know that our project has a fixed duration, we can tell Airflow to stop running our DAG after a certain date using the end_date parameter.
AIRFLOW schedule_interval paramater uses linux cron jobs syntax
Templating tasks using the Airflow context
In Airflow, you have a number of variables available at runtime from the task context. One of these variables is execution_date. Airflow uses the Pendulum (https:// pendulum.eustace.io) library for datetimes, and execution_date is such a Pendulum datetime object. It is a drop-in replacement for native Python datetime, so all methods that can be applied to Python can also be applied to Pendulum. Just like you can do datetime.now().year, you get the same result with pendulum.now().year.
Bash Operator templating
Python operator templating
In Apache Airflow, op_args
and op_kwargs
are both used to pass arguments to a PythonOperator. However, there is a key difference between the two: op_args
is a list of positional arguments, while op_kwargs
is a dictionary of keyword arguments.
- op_args
op_args
is a list of positional arguments that will be unpacked when calling the callable. For example, if you have a Python function that takes two arguments, you can pass them to the PythonOperator using the op_args
argument:
Code snippet
When the operator
is run, the my_function
function will be called with the arguments arg1
and arg2
.
- op_kwargs
op_kwargs
is a dictionary of keyword arguments that will get unpacked in your function. For example, if you have a Python function that takes two keyword arguments, you can pass them to the PythonOperator using the op_kwargs
argument:
Code snippet
When the operator
is run, the my_function
function will be called with the arguments arg1="arg1_value"
and arg2="arg2_value"
.
- Which one should you use?
In general, you should use op_kwargs
if you need to pass keyword arguments to your Python function. However, if you only need to pass positional arguments, you can use op_args
.
Here is a table that summarizes the differences between op_args
and op_kwargs
:
Argument
Description
op_args
A list of positional arguments that will be unpacked when calling the callable.
op_kwargs
A dictionary of keyword arguments that will get unpacked in your function.
This code currently prints the found pageview count, and now we want to connect the dots by writing those results to the Postgres table. The PythonOperator currently prints the results but does not write to the database, so we’ll need a second task to write the results. In Airflow, there are two ways of passing data between tasks: By using the Airflow metastore to write and read results between tasks. This is called XCom and covered in chapter 5. By writing results to and from a persistent location (e.g., disk or database) between tasks. Airflow tasks run independently of each other, possibly on different physical machines depending on your setup, and therefore cannot share objects in memory. Data between tasks must therefore be persisted elsewhere, where it resides after a task finishes and can be read by another task.
Airflow provides one mechanism out of the box called XCom, which allows storing and later reading any picklable object in the Airflow metastore. Pickle is Python’s serialization protocol, and serialization means converting an object in memory to a format that can be stored on disk to be read again later, possibly by another process. By default, all objects built from basic Python types (e.g., string, int, dict, list) can be pickled.
By default, Airflow will schedule and run any past schedule intervals that have not been run. As such, specifying a past start date and activating the corresponding DAG will result in all intervals that have passed before the current time being executed. This behavior is controlled by the DAG catchup parameter and can be disabled by setting catchup to false
Code for no catchup
Best practices for designing tasks
Airflow tasks: atomicity and idempotency.
Atomicity
The term atomicity is frequently used in database systems, where an atomic transaction is considered an indivisible and irreducible series of database operations such that either all occur or nothing occurs. Similarly, in Airflow, tasks should be defined so that Start date Now Start date Now Catchup = false Airflow starts processing, including past intervals (= backfilling). Catchup = true (default) Current interval Current interval Airflow starts processing from the current interval. Figure 3.8 Backfilling in Airflow. By default, Airflow will run tasks for all past intervals up to the current time. This behavior can be disabled by setting the catchup parameter of a DAG to false, in which case Airflow will only start executing tasks from the current interval. Scheduling in Airflow they either succeed and produce some proper result or fail in a manner that does not affect the state of the system
Example: Sending an email after writing to CSV creates two pieces of work in a single function, which breaks the atomicity of the task.To implement this functionality in an atomic fashion, we could simply split the email functionality into a separate task.
Idempotency
Tasks are said to be idempotent if calling the same task multiple times with the same inputs has no additional effect. This means that rerunning a task without changing the inputs should not change the overall output.
Rerunning this task for a given date would result in the task fetching the same set of events as its previous execution (assuming the date is within our 30-day window), and overwriting the existing JSON file in the /data/events folder, producing the same result. As such, this implementation of the fetch events task is clearly idempotent.
DAGs can run at regular intervals by setting the schedule interval. The work for an interval is started at the end of the interval. The schedule interval can be configured with cron and timedelta expressions. Data can be processed incrementally by dynamically setting variables with templating. The execution date refers to the start datetime of the interval, not to the actual time of execution. A DAG can be run back in time with backfilling. Idempotency ensures tasks can be rerun while producing the same output results
Templating tasks using the Airflow context
Providing User defined Variables to Python Operators
In Apache Airflow, op_args
and op_kwargs
are parameters used in the task definition to pass arguments to operators.
Operators in Airflow are the building blocks of workflows, representing individual tasks that need to be executed. Each operator has a set of arguments that define its behavior. However, in some cases, you may want to pass dynamic or variable values to these arguments when defining the tasks.
That’s where op_args
and op_kwargs
come in.
op_args
is used to pass a list of arguments to an operator. These arguments are positional and must be provided in the correct order expected by the operator. For example:
On the other hand, op_kwargs
is used to pass a dictionary of keyword arguments to an operator. This allows you to specify the arguments by their names, regardless of their order. For example:
Both op_args
and op_kwargs
can be used together, allowing you to pass a combination of positional and keyword arguments to an operator. For example:
When defining a task in Airflow, you can use these parameters to pass arguments to the operator. Here’s an example of how you can use op_args
and op_kwargs
while defining a task:
In this example, my_task
is an instance of the MyOperator
class, and it will receive 'value1'
as a positional argument and 'value2'
as a keyword argument with the name 'arg2'
. The operator can then use these values during its execution.
Using op_args
and op_kwargs
provides flexibility in passing dynamic values to operators, allowing you to customize their behavior based on the specific context or requirements of your workflow.
A useful tool to debug issues with templated arguments is the Airflow UI. You can inspect the templated argument values after running a task by selecting it in either the graph or tree view and clicking the Rendered Template button
The CLI provides us with exactly the same information as shown in the Airflow UI, without having to run a task, which makes it easier to inspect the result. The command to render templates using the CLI is
Hooking up other systems
it’s typically advised to apply XComs only for transferring small pieces of data such as a handful of strings (e.g., a list of names).
What is XCom how it is works?
In Apache Airflow, Xcom (short for cross-communication) is a mechanism that allows tasks to exchange small amounts of data between them. It serves as a communication channel for sharing information or passing values between different tasks within a workflow.
The Xcom system in Airflow works as follows:
-
During the execution of a task, an operator can produce output or results that need to be shared with other tasks. This output could be a value, a small dataset, or any other piece of information.
-
The task can use the
xcom_push()
method to push the output to the Xcom system. This method takes akey
and avalue
as parameters. Thekey
is used to identify the output data, while thevalue
represents the actual data to be shared. -
Other tasks in the workflow can retrieve the output of a previous task by using the
xcom_pull()
method. This method takes thetask_ids
and an optionalkey
parameter. It returns the value associated with the specifiedkey
from the specified task. -
The Xcom system stores the output data in its backend database, which can be a relational database like MySQL or PostgreSQL, or a key-value store like Redis, depending on the configuration.
By default, Airflow stores Xcom data in its metadata database, but you can also configure it to use an external database or message broker for scalability and durability.
Here’s an example that demonstrates the usage of Xcom in Airflow:
In this example, the push_data()
function is a Python callable used as an operator. It pushes the string “Hello, Airflow!” to the Xcom system using the xcom_push()
method.
The pull_data()
function is another Python callable used as an operator. It retrieves the value from the Xcom system using the xcom_pull()
method and prints it.
The push_task
and pull_task
are instances of the PythonOperator
class, representing the tasks in the workflow. The output of the push_task
is shared with the pull_task
using the Xcom system.
When the workflow is executed, the push_task
pushes the data to Xcom, and the pull_task
pulls the data from Xcom and prints it.
Xcom provides a simple way to share information between tasks, enabling coordination and data sharing within an Airflow workflow.
Some arguments of operators can be templated. Templating happens at runtime. Templating the PythonOperator works different from other operators; variables are passed to the provided callable. The result of templated arguments can be checked with airflow tasks render. Operators can communicate with other systems via hooks. Operators describe what to do; hooks determine how to do work.
Defining dependencies between tasks
BranchPythonOperator
In Apache Airflow, the BranchPythonOperator
is an operator that allows you to create conditional branching within your workflows. It enables you to execute different tasks or branches based on the result of a Python function that you define.
The BranchPythonOperator
works as follows:
-
When defining your workflow, you specify a
BranchPythonOperator
task, which includes the following parameters:task_id
: A unique identifier for the task.python_callable
: A Python function that determines the branching logic. This function should return the task ID of the next task to execute based on the current context.- Other optional parameters, such as
provide_context
to pass the context to the Python function.
-
During task execution, the
BranchPythonOperator
calls the specifiedpython_callable
function, passing the context as an argument. The context includes information such as the current execution date, task instance, and other relevant details. -
The
python_callable
function evaluates the necessary conditions based on the context and returns the task ID of the next task to execute. The returned task ID should match thetask_id
of one of the downstream tasks. -
The
BranchPythonOperator
uses the returned task ID to determine the next task to execute in the workflow. It dynamically sets the downstream dependency based on the returned task ID.
Here’s an example to illustrate the usage of BranchPythonOperator
:
In this example, the decide_branch()
function is the Python callable that determines the branching logic. It checks the current hour and returns the task ID of either 'morning_task'
or 'afternoon_task'
based on the result.
The decide_branch_task
is an instance of the BranchPythonOperator
class, representing the branching task in the workflow. It uses the decide_branch()
function to determine the next task to execute dynamically.
The morning_task
and afternoon_task
are downstream tasks, and the dependency is set based on the result of the decide_branch_task
.
By using the BranchPythonOperator
, you can create dynamic and conditional workflows in Airflow, allowing different branches of the workflow to be executed based on the outcome of the Python function.
Branching example from book
Take a look at this line carefully
In Apache Airflow, Trigger Rules are used to define the conditions under which a task should be triggered or skipped during workflow execution. Each task in Airflow can have a trigger rule associated with it, which determines how the task’s execution is affected by the status of its upstream tasks.
Here are the available trigger rules in Airflow:
-
all_success
(default): The task will be triggered only if all of its upstream tasks have succeeded. If any upstream task has failed, been skipped, or is in a state other than success, the task will be skipped. -
all_failed
: The task will be triggered only if all of its upstream tasks have failed. If any upstream task has succeeded, been skipped, or is in a state other than failure, the task will be skipped. -
all_done
: The task will be triggered only if all of its upstream tasks have completed, regardless of their status. If any upstream task is still running or has been skipped, the task will be skipped. -
one_success
: The task will be triggered if at least one of its upstream tasks has succeeded. It will be skipped only if all of its upstream tasks have failed or have been skipped. -
one_failed
: The task will be triggered if at least one of its upstream tasks has failed. It will be skipped only if all of its upstream tasks have succeeded or have been skipped. -
none_failed
: The task will be triggered if none of its upstream tasks have failed. It will be skipped if any of its upstream tasks have failed, even if others have succeeded or been skipped.
To apply a trigger rule to a task in Airflow, you can set the trigger_rule
parameter when defining the task. Here’s an example:
In this example, we have three tasks: task1
, task2
, and task3
. task1
is connected to both task2
and task3
.
task2
has a trigger rule of'all_done'
, so it will only be triggered if both upstream tasks (task1
) have completed, regardless of their status.task3
has a trigger rule of'one_failed'
, so it will be triggered if at least one upstream task (task1
) has failed. It will be skipped only if all upstream tasks have succeeded or been skipped.
By setting different trigger rules for tasks, you can define complex dependencies and conditions within your workflows, ensuring that tasks are executed or skipped based on the desired logic and the status of their upstream tasks.
Conditional tasks
Shared Data between task
Sharing data using XComs
Chaining Python tasks with the Taskflow API
dependencies called the Taskflow API. Although not without its flaws, the Taskflow API can considerably simplify your code if you’re primarily using PythonOperators and passing data between them as XComs.
PART 2 Beyond basics
Triggering Workflows
Polling conditions with sensors
implementing with FileSensor
By default, the sensor timeout is set to seven days. If the DAG schedule_interval is set to once a day, this will lead to an undesired snowball effect—which is surprisingly easy to encounter with many DAGs! The DAG runs once a day, and supermarkets 2, 3, and 4 will fail after seven days, as shown in figure 6.7. However, new DAG runs are added every day and the sensors for those respective days are started, and as a result more and more tasks start running. Here’s the catch: there’s a limit to the number of tasks Airflow can handle and will run (on various levels).
Setting the maximum number of concurrent tasks in a DAG
Day 1: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 3 tasks. Day 2: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 6 tasks. Day 3: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 9 tasks. Day 4: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 12 tasks. Day 5: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 15 tasks. Day 6: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 16 tasks; two new tasks cannot run, and any other task trying to run is blocked. This behavior is often referred to as sensor deadlock. In this example, the maximum number of running tasks in the supermarket couponing DAG is reached, and thus the impact is limited to that DAG, while other DAGs can still run. However, once the global Airflow limit of maximum tasks is reached, your entire system is stalled, which is obviously undesirable. This issue can be solved in various ways.
TriggerDagOperator
The TriggerDagRunOperator
is an operator in Apache Airflow that allows you to trigger the execution of another DAG (Directed Acyclic Graph) from within your workflow. It enables you to programmatically start the execution of a separate DAG, providing flexibility and the ability to orchestrate complex workflows.
Here’s how the TriggerDagRunOperator
works:
-
When defining your main DAG, you include a
TriggerDagRunOperator
task, specifying the DAG ID of the target DAG that you want to trigger. -
During task execution, the
TriggerDagRunOperator
triggers the execution of the target DAG by creating a new DagRun for that DAG. A DagRun is an instance of a DAG that represents a specific run or execution of the DAG. -
You can provide additional parameters to the
TriggerDagRunOperator
to customize the triggered DagRun. These parameters can include configuration variables, execution dates, and other context variables that will be passed to the triggered DAG. -
Once the DagRun is created, the scheduler of Airflow takes over and starts executing the tasks within the triggered DAG, following the defined dependencies and scheduling parameters.
Here’s an example to illustrate the usage of TriggerDagRunOperator
:
In this example, the main_dag
includes a TriggerDagRunOperator
task named trigger_task
. It is configured to trigger the DAG with the ID 'target_dag'
.
The execution_date
parameter is set to "{{ execution_date }}"
, which is a Jinja template variable that passes the current execution date to the triggered DAG. This allows the triggered DAG to use the same execution date as the main DAG.
When the trigger_task
is executed, it triggers the execution of 'target_dag'
, creating a new DagRun for that DAG. The target DAG will start executing its tasks based on its own schedule and dependencies.
By using the TriggerDagRunOperator
, you can create complex workflows that orchestrate the execution of multiple DAGs, enabling you to modularize and manage your workflows more effectively.
ExternalTaskSensor
Starting workflows with REST/CLI
Communicating with external systems
Moving data from between systems
Let’s imagine we have a very large job that would take all resources on the machine Airflow is running on. In this case, it’s better to run the job elsewhere; Airflow will start the job and wait for it to complete. The idea is that there should be a strong separation between orchestration and execution, which we can achieve by Airflow starting the job and waiting for completion and a data-processing framework such as Spark performing the actual work
In Spark, there are various ways to start a job: Using the SparkSubmitOperator—This requires a spark-submit binary and YARN client config on the Airflow machine to find the Spark instance. Using the SSHOperator—This requires SSH access to a Spark instance but does not require Spark client config on the Airflow instance. Using the SimpleHTTPOperator—This requires running Livy, a REST API for Apache Spark, to access Spark.
Building custom components
How to store API keys or connection variables like datas in apache airflow
In Apache Airflow, you can store API tokens or any other sensitive information using Airflow’s built-in feature called Connections. Connections allow you to securely store and manage credentials, API tokens, and other connection details used by your Airflow tasks.
Here are the steps to store API tokens using Airflow’s Connections feature:
-
Access the Airflow web UI: Open your web browser and navigate to the Airflow web interface.
-
Go to the Admin section: Click on the “Admin” menu on the top navigation bar. You will need the necessary permissions to access this section.
-
Select “Connections”: In the Admin section, find and click on the “Connections” option. This will open the Connections management page.
-
Add a new connection: On the Connections page, click on the ”+ Add a Connection” button to create a new connection.
-
Enter connection details: In the connection creation form, fill in the necessary details for your API token. You can provide a connection name, connection type, host, login, password, and any other relevant information.
-
Save the connection: Once you have entered the connection details, click on the “Save” button to save the connection.
-
Use the connection in your tasks: In your Airflow DAGs, you can access the stored API token by referencing the connection you created. You can use the connection’s name or provide the connection ID to retrieve the necessary credentials or API token within your task code.
By storing the API token as a connection in Airflow, you can avoid hardcoding sensitive information in your code and manage credentials centrally through the Airflow UI.
Note: It’s important to ensure that only authorized users have access to the Airflow web UI and appropriate permissions to view and manage connections, as they contain sensitive information.
And How can I get back in python code
To retrieve the stored API token or connection information from Apache Airflow within your Python code, you can use the BaseHook
class provided by Airflow. The BaseHook
class provides methods to retrieve connection details based on the connection name or ID.
Here’s an example of how you can retrieve the API token using BaseHook
:
In the above example, replace "my_api_connection"
with the actual name of your connection that holds the API token. The BaseHook.get_connection()
method retrieves the connection object based on the connection name provided. Then, you can access the API token using the password
attribute of the connection object.
You can also retrieve the connection by ID instead of the name using BaseHook.get_connection_by_id()
:
In this case, replace 123
with the actual ID of your connection.
By using the BaseHook
class and its methods, you can easily retrieve the stored API token or any other connection details within your Python code in Apache Airflow.
Custom hooks
Building Custom Operator
Although building a MovielensHook has allowed us to move a lot of complexity from our DAG into the hook, we still have to write a considerable amount of boilerplate code for defining start/end dates and writing the ratings to an output file. This means that, if we want to reuse this functionality in multiple DAGs, we will still have some considerable code duplication and extra effort involved. Fortunately, Airflow also allows us to build custom operators, which can be used to perform repetitive tasks with a minimal amount of boilerplate code. In this case, we could, for example, use this functionality to build a MovielensFetchRatingsOperator, which would allow us to fetch movie ratings using a specialized operator class.
Building custom sensors
NEED TO READ AND PRACTICE IN TESTING UNIT 9
Running tasks in containers Unit 10
Airflow deployments can be difficult to manage if they involve many different operators, as this requires knowledge of the different APIs and complicates debugging and dependency management. One way of tackling this issue is to use container technologies such as Docker to encapsulate your tasks inside container images and run these images from within Airflow. This containerized approach has several advantages, including easier dependency management, a more uniform interface for running tasks, and improved testability of tasks. Using the DockerOperator, you can run tasks in container images directly using Docker, similar to the docker run CLI command. You can use the KubernetesPodOperator to run containerized tasks in pods on a Kubernetes cluster. Kubernetes allows you to scale your containerized tasks across a compute cluster, which provides (among other things) greater scalability and more flexibility in terms of computing resources.