Welcome to our comprehensive guide on Apache Airflow essentials! In this blog, we’ll delve into the core concepts and practical steps to create effective workflows using Apache Airflow, an open-source framework renowned for its versatility and scalability. Whether you’re a beginner venturing into workflow management or an experienced user seeking to enhance your skills, this guide is tailored to provide you with a solid understanding of Apache Airflow and its key components.
So, let’s embark on this journey into the realm of Apache Airflow and unlock the power of seamless workflow management!
Apache Airflow is a top choice for managing complex data workflows. Here’s why:
Workflow Management: Simplifies the definition, scheduling, and monitoring of workflows using DAGs (Directed Acyclic Graphs). We will discuss DAGs further in the later part of the blog.
Versatility: Integrates Python, Bash, and SQL seamlessly.
Scalability: Scales horizontally for large data processing across systems like Apache Spark and Kubernetes.
Dependency Management: Automates task dependencies for correct execution order.
Monitoring and Alerting: Offers a web UI and integrates with tools like Prometheus.
Customizability: Highly extensible, allowing users to develop custom components.
Active Community: Supported by a vibrant user community for continuous updates and support.
virtualenv env --python=python3
pip install apache-airflow
Install the specific version of the flask
pip install Flask-Session==0.5.0
Initializing Airflow Database
airflow db init
├── logs # where you find all your logs.
├── airflow.db
└── airflow.cfg
Create the user
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
Start the Airflow webserver
airflow webserver --port 8080
Start the Airflow Scheduler
airflow scheduler
The Airflow web UI
Simply Sign in, and you’ll be directed to the home UI of Apache Airflow.
Here’s what each part means:
In Apache Airflow, a DAG is represented as a Python script where you define the structure of your workflow using Airflow’s operators (tasks) and specify the dependencies between them. These scripts are usually stored in the dags directory in your Airflow installation.
DAGs allow you to:
Create your DAGs directory: In your Airflow installation, there’s typically a directory called airflow in the home directory. You can create the dags folder inside the airflow directory (/home/airflow/dags), where you’ll store your DAG definition files.
Create a Python script for your DAG: Inside the dags directory, create a new Python script (e.g., /home/airflow/dags/my_dag.py). This script will contain the definition of your DAG.
Define your DAG: Please refer to the following simple code to define your DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from PyPDF2 import PdfReader
# Function to read data from a PDF file
def read_pdf():
# Open the PDF file with your pdf file path
with open('example.pdf', 'rb') as file:
# Create a PDF reader object
pdf_reader = PdfReader(file)
# Initialize variable to store data
extracted_text = ""
# Loop through each page and extract text
for page in pdf_reader.pages:
extracted_text += page.extract_text()
return extracted_text
# Function to split text into two parts
def split_text(text=""):
text_length = len(text)
mid_index = text_length // 2
return text[:mid_index], text[mid_index:]
# Function to write text to a file
def write_text_to_file(text="", file_name=""):
with open(file_name, 'w') as file:
file.write(text)
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 2, 1),
'schedule_interval': '@daily',
}
# Define the DAG with its name and default arguments
with DAG('pdf_data_transfer_workflow', default_args=default_args, catchup=False) as dag:
# Task 1: Read PDF
task_read_pdf = PythonOperator(
task_id='read_pdf',
python_callable=read_pdf
)
# Task 2: Split Text
task_split_text = PythonOperator(
task_id='split_text',
python_callable=split_text,
provide_context=True
)
# Task 3: Write Part 1 to File
task_write_part1 = PythonOperator(
task_id='write_part1_to_file',
python_callable=write_text_to_file,
op_kwargs={'file_name': "text_file_part1.txt"},
provide_context=True
)
# Task 4: Write Part 2 to File
task_write_part2 = PythonOperator(
task_id='write_part2_to_file',
python_callable=write_text_to_file,
op_kwargs={'file_name': "text_file_part2.txt"},
provide_context=True
)
# Define task dependencies
task_read_pdf >> task_split_text >> [task_write_part1,task_write_part2]
Note: Please ensure to use the full path to your PDF file instead of ‘example.pdf’.
In this DAG, We import necessary modules including DAG, PythonOperator, datetime, and PyPDF2 for PDF processing. We add common settings for all tasks in the DAG, such as the owner, start date, and schedule interval. Finally, define the DAG with the name ‘pdf_data_transfer_workflow’, using the default arguments provided.
We perform the following tasks in the DAG:
Task Dependencies
Task 1 (read_pdf) must be completed before Task 2 (split_text) can start.
This means that before you can begin Task 2, which is splitting the text, you need to ensure that Task 1, which involves reading the PDF, is finished. So, you can’t start splitting the text until you’ve successfully read the PDF document.
Task 2 (split_text) must be completed before both Task 3 (write_part1_to_file) and Task 4 (write_part2_to_file) can start.
This indicates that Task 2, which is splitting the text, serves as a prerequisite for both Task 3 and Task 4. So, you have to complete the splitting of the text before you can initiate Task 3, which involves writing part 1 of the split text to a file, as well as before starting Task 4, which involves writing part 2 of the split text to a file.
Run the webserver and scheduler commands and visit the web interface.
pip install PyPDF2. Once installed, close the webserver and scheduler, then start them again.
airflow standalone
This command allows you to easily make changes in the DAG files. Additionally, when installing new libraries, you only need to refresh the web interface, as changes are automatically detected.
After installing the necessary libraries, you should be able to locate your DAG file.
You can run the DAG file by directly clicking on the play button symbol.
After running the DAG file, you can check its status in the web interface.
The DAG file is currently running.
Now, you can see that the DAG file status is successful.
By clicking the success button, you can view how the DAG file is executed. You can also view the graph and logs.
To view the logs of a specific task, simply click on it in the graph.
If you’re looking to streamline your workflows with Apache Airflow, we can guide you through the setup and implementation process, ensuring it fits your needs perfectly.
Our experts in Generative AI, Python Programming, and Chatbot Development can help you build innovative solutions and scale your business faster.