Introduction
Welcome to our comprehensive guide on Apache Airflow essentials! In this blog, we’ll delve into the core concepts and practical steps to create effective workflows using Apache Airflow, an open-source framework renowned for its versatility and scalability. Whether you’re a beginner venturing into workflow management or an experienced user seeking to enhance your skills, this guide is tailored to provide you with a solid understanding of Apache Airflow and its key components.
So, let’s embark on this journey into the realm of Apache Airflow and unlock the power of seamless workflow management!
What is Apache Airflow Framework?
Apache Airflow serves as an open-source solution for crafting, organizing, and overseeing batch-oriented workflows. Its adaptable Python framework allows for the construction of workflows that interface with a wide array of technologies. Through a user-friendly web interface, the state of these workflows can be effectively managed. Airflow can be deployed in various configurations, ranging from a single process on a personal computer to a distributed setup capable of handling extensive workflows.
Key Features of Airflow: Why Opt for Seamless Workflow Management?
Apache Airflow is a top choice for managing complex data workflows. Here’s why:
Workflow Management: Simplifies the definition, scheduling, and monitoring of workflows using DAGs (Directed Acyclic Graphs). We will discuss DAGs further in the later part of the blog.
Versatility: Integrates Python, Bash, and SQL seamlessly.
Scalability: Scales horizontally for large data processing across systems like Apache Spark and Kubernetes.
Dependency Management: Automates task dependencies for correct execution order.
Monitoring and Alerting: Offers a web UI and integrates with tools like Prometheus.
Customizability: Highly extensible, allowing users to develop custom components.
Active Community: Supported by a vibrant user community for continuous updates and support.
Core Components of Apache Airflow
There are the following core components:
Web server
Scheduler
Metastore
Trigger
Executor
Web server
The Airflow Web Server provides a web-based UI for users to interact with and monitor their workflows.
It allows users to view DAGs, inspect task instances, review logs, trigger and manage DAG runs, and monitor the overall status of their workflows.
The web server also provides a REST API for programmatic interaction with Airflow.
Scheduler
The Scheduler is responsible for triggering task instances based on the defined schedule in the DAGs.
It continuously monitors DAGs and determines when to execute individual tasks based on their – – dependencies and schedule_interval.
Airflow supports multiple schedulers for high availability and scalability. This allows for workload distribution and ensures that the scheduler is not a single point of failure.
Metastore
The Metastore is the persistent storage system used by Airflow to store metadata about DAGs, tasks, and their status.
Metadata includes details about DAGs, task instances, historical runs, and connection configurations.
By default, Airflow uses a relational database as its metastore, and you can configure it to use databases like SQLite, MySQL, or PostgreSQL.
Trigger
In Airflow, a trigger is an event or condition that initiates the execution of a DAG or a specific task within a DAG.
DAGs can be triggered manually by users through the web interface or programmatically using the command line or the API.
Additionally, DAGs can be scheduled to run automatically based on a predefined schedule_interval.
Executor
The Executor is responsible for running task instances. It specifies how and where tasks should be executed.
Airflow supports multiple executors, including the SequentialExecutor (for single-node execution), LocalExecutor (for parallel execution on a single machine), and CeleryExecutor (for distributed execution using Celery).
The choice of executor depends on the scale and requirements of your workflow.
Installation and setup
Step 1: Create a Virtual Environment
First things first, it’s essential to create a virtual environment to keep your project dependencies separate from the system’s Python environment.
Open your terminal and run the following command for macOS/Linux:
virtualenv env --python=python3
Step 2: Install Apache Airflow
You need to install Apache Airflow. You can easily do this using pip.
pip install apache-airflow
Install the specific version of the flask
Also, install the specific version of Flask to ensure compatibility with Apache Airflow.
pip install Flask-Session==0.5.0
Initializing Airflow Database
Initialize the Airflow metadata database, where Airflow stores its configurations, metadata, and historical job information.
airflow db init
After running this command, you will find the airflow folder in the home directory. In this folder, you will find all the functions, such as the TaskFlow API.
├── logs # where you find all your logs.
├── airflow.db
└── airflow.cfg
Create the user
Now, you can create the user to execute commands. You can also change the username and password.
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
Start the Airflow webserver
Starts the web server by running the below command, allowing you to access the Airflow web interface.
airflow webserver --port 8080
Start the Airflow Scheduler
Start the scheduler in a separate terminal window or background process. This command triggers and manages the execution of your workflow tasks.
airflow scheduler
Step 3: Access the Web Interface
Open your web browser and navigate to the specified address to interact with Airflow through the web interface.
Visit http://localhost:8080 in your browser to access the Airflow web interface.
The Airflow web UI
Once you’ve set up Apache Airflow, you’ll be
able to access its user-friendly web interface. Simply use your username and password to log in.
Simply Sign in, and you’ll be directed to the home UI of Apache Airflow.
What is DAG?
A DAG (Directed Acyclic Graph) in the context of Apache Airflow refers to a collection of tasks and dependencies between them. It’s a way to represent a workflow or a data pipeline as a series of tasks that need to be executed in a particular order.
Here’s what each part means:
Directed: Each task has a defined direction or order in which it needs to be executed.
Acyclic: There are no cycles or loops in the graph. This means that tasks are executed in a linear sequence without any repeating patterns.
In Apache Airflow, a DAG is represented as a Python script where you define the structure of your workflow using Airflow’s operators (tasks) and specify the dependencies between them. These scripts are usually stored in the dags directory in your Airflow installation.
DAGs allow you to:
Define the structure of your workflow.
Specify the order in which tasks should be executed.
Handle dependencies between tasks, ensuring that a task is only executed once its dependencies have been completed.
How to create the DAG file?
To create the DAG file, follow these steps:
Create your DAGs directory: In your Airflow installation, there’s typically a directory called airflow
in the home directory. You can create the dags
folder inside the airflow
directory (/home/airflow/dags), where you’ll store your DAG definition files.
Create a Python script for your DAG: Inside the dags
directory, create a new Python script (e.g., /home/airflow/dags/my_dag.py). This script will contain the definition of your DAG.
Define your DAG: Please refer to the following simple code to define your DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from PyPDF2 import PdfReader
# Function to read data from a PDF file
def read_pdf():
# Open the PDF file with your pdf file path
with open('example.pdf', 'rb') as file:
# Create a PDF reader object
pdf_reader = PdfReader(file)
# Initialize variable to store data
extracted_text = ""
# Loop through each page and extract text
for page in pdf_reader.pages:
extracted_text += page.extract_text()
return extracted_text
# Function to split text into two parts
def split_text(text=""):
text_length = len(text)
mid_index = text_length // 2
return text[:mid_index], text[mid_index:]
# Function to write text to a file
def write_text_to_file(text="", file_name=""):
with open(file_name, 'w') as file:
file.write(text)
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 2, 1),
'schedule_interval': '@daily',
}
# Define the DAG with its name and default arguments
with DAG('pdf_data_transfer_workflow', default_args=default_args, catchup=False) as dag:
# Task 1: Read PDF
task_read_pdf = PythonOperator(
task_id='read_pdf',
python_callable=read_pdf
)
# Task 2: Split Text
task_split_text = PythonOperator(
task_id='split_text',
python_callable=split_text,
provide_context=True
)
# Task 3: Write Part 1 to File
task_write_part1 = PythonOperator(
task_id='write_part1_to_file',
python_callable=write_text_to_file,
op_kwargs={'file_name': "text_file_part1.txt"},
provide_context=True
)
# Task 4: Write Part 2 to File
task_write_part2 = PythonOperator(
task_id='write_part2_to_file',
python_callable=write_text_to_file,
op_kwargs={'file_name': "text_file_part2.txt"},
provide_context=True
)
# Define task dependencies
task_read_pdf >> task_split_text >> [task_write_part1,task_write_part2]
Note: Please ensure to use the full path to your PDF file instead of ‘example.pdf’.
In this DAG, We import necessary modules including DAG, PythonOperator, datetime, and PyPDF2 for PDF processing. We add common settings for all tasks in the DAG, such as the owner, start date, and schedule interval. Finally, define the DAG with the name ‘pdf_data_transfer_workflow’, using the default arguments provided.
We perform the following tasks in the DAG:
Task 1 (read_pdf): Calls the read_pdf function to extract text from the PDF file.
Task 2 (split_text): Calls the split_text function to split the extracted text into two parts.
Task 3 (write_part1_to_file): Writes the first part of the text to a file named text_file_part1.txt.
Task 4 (write_part2_to_file): Writes the second part of the text to a file named text_file_part2.txt.
Task Dependencies
Task 1 (read_pdf) must be completed before Task 2 (split_text) can start.
This means that before you can begin Task 2, which is splitting the text, you need to ensure that Task 1, which involves reading the PDF, is finished. So, you can’t start splitting the text until you’ve successfully read the PDF document.
Task 2 (split_text) must be completed before both Task 3 (write_part1_to_file) and Task 4 (write_part2_to_file) can start.
This indicates that Task 2, which is splitting the text, serves as a prerequisite for both Task 3 and Task 4. So, you have to complete the splitting of the text before you can initiate Task 3, which involves writing part 1 of the split text to a file, as well as before starting Task 4, which involves writing part 2 of the split text to a file.
Run the webserver and scheduler commands and visit the web interface.
After running the command, check the web interface to view your DAG file. If you encounter an import error, you can resolve it by installing PyPDF2 using the pip command in the terminal. In the terminal, execute
pip install PyPDF2
. Once installed, close the webserver and scheduler, then start them again.
Performing these stop-start actions for each change can be tedious and time-consuming. To streamline the process, you can use the following command in a separate terminal.
airflow standalone
This command allows you to easily make changes in the DAG files. Additionally, when installing new libraries, you only need to refresh the web interface, as changes are automatically detected.
After installing the necessary libraries, you should be able to locate your DAG file.
You can run the DAG file by directly clicking on the play button symbol.
After running the DAG file, you can check its status in the web interface.
The DAG file is currently running.
Now, you can see that the DAG file status is successful.
By clicking the success button, you can view how the DAG file is executed. You can also view the graph and logs.
To view the logs of a specific task, simply click on it in the graph.