Apache Airflow is an open-source platform that helps to manage and automate complex workflows including ETL job orchestration. It allows you to easily schedule, monitor, and manage tasks involved in ETL processes, making it a valuable tool for businesses that need to streamline and organize their processes. With Airflow, you can visualize and manage your workflows as directed acyclic graphs (DAGs) and programmatically specify dependencies between tasks with the help of Python.
Azure Data Factory is a cloud-based data integration service provided by Microsoft Azure. It helps organizations manage and process large amounts of data from various sources. With Azure Data Factory, you can create workflows to extract, transform, and load data (ETL) into the desired data stores. This service can be used to move and integrate data from a variety of sources, including on-premises databases, cloud-based storage solutions, and big data platforms. Azure Data Factory also provides built-in data processing capabilities, allowing you to clean, transform, and process data before loading it into your data stores. The service is highly scalable, secure and can be managed or monitored through a user-friendly interface, making it accessible to non-technical users as well. In short, Azure Data Factory is a powerful solution for managing and processing data, providing organizations with the ability to unlock insights and make data-driven decisions.
In February 2023, Microsoft announced the launch of Managed Airflow, which allows users to run Apache Airflow within Azure Data Factory. This is very valuable for customers with an Airflow investment.
The integration provides various benefits, such as:
To demonstrate how simple it is to get started with Managed Airflow, we at the Big Data & Analytics Business Unit have created a comprehensive step-by-step guide that outlines the process involved in running an environment.
To proceed with this guide, you will need to set up several prerequisites:
4. After clicking the create button, you should see your new Airflow environment displayed on the main screen.
Because Managed Airflow does not store our DAGs directly, we need to save them in a blob and connect them later with a linked service to our Airflow environment.
To do so, follow these two steps:
If you already have existing DAG files, you have the option to upload them into the dags blob instead of following the next step, and then proceed with Step 4: Connect the DAGs with Managed Airflow.
We want to create a DAG that consists of 3 tasks:
At first, we want to execute generate_numbers and after this task finishes, square_numbers and cube_numbers should start in parallel. Like in this visualization:
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2023, 2, 14),
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5)
}
dag = DAG(‘random_numbers’, default_args=default_args, schedule_interval=None)
default_args defines some default arguments that are used by all the tasks in the DAG, unless the individual tasks override them. In this example the following arguments are especially important:
After that you can define the DAG itself, using the DAG class provided by Airflow:
We define tasks in Apache Airflow using operators such as BashOperator or PythonOperator, which are wrapper classes that execute their respective scripts.
Every task gets a task_id and a reference to the DAG. Other arguments depend on the type of operator you are going to use. We won’t go over every line of code in the following sections, so please refer to the documentation for more detailed information.
The following code will generate 10 random numbers in an array, return it, and sleep for 3 seconds:
generate_numbers = BashOperator(
task_id=’generate_numbers’,
bash_command=’my_numbers=(); for i in {1..10}; do my_numbers+=($RANDOM); done; echo “${my_numbers[@]}”; sleep 3;’,
dag=dag,
)
These two tasks are very similiar. Here it is important to reference the generated numbers from the previous task and also cast them to integers. You can define a function outside and reference it per python_callable:
def square_numbers(numbers):
numbers = [int(n) for n in numbers.split()]
return [n ** 2 for n in numbers]
square_op = PythonOperator(
task_id=’square_numbers’,
python_callable=square_numbers,
op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
dag=dag,
)
def cube_numbers(numbers):
numbers = [int(n) for n in numbers.split()]
return [n ** 3 for n in numbers]
cube_op = PythonOperator(
task_id=’cube_numbers’,
python_callable=cube_numbers,
op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
dag=dag,
)
Now we have to define which task gets executed when and how it depends on previos tasks. In our example we wanted to execute generate_numbers first and afterwards execute square_numbers and cube_numbers in parallel. We define this dependencies using the >> and [] operators:
In our case:
generate_numbers >> [square_op, cube_op]
Now we are done! The resulting DAG file should look like this:
import random
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# Define default arguments for the DAG
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2023, 2, 14),
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5)
}
# Define the DAG
dag = DAG(‘random_numbers’, default_args=default_args, schedule_interval=None)
# Define a BashOperator that generates 10 random numbers and sleeps for 3 seconds
generate_numbers = BashOperator(
task_id=’generate_numbers’,
bash_command=’my_numbers=(); for i in {1..10}; do my_numbers+=($RANDOM); done; echo “${my_numbers[@]}”; sleep 3;’,
dag=dag,
)
# Define two PythonOperators that square and cube the generated numbers, respectively
def square_numbers(numbers):
numbers = [int(n) for n in numbers.split()]
return [n ** 2 for n in numbers]
square_op = PythonOperator(
task_id=’square_numbers’,
python_callable=square_numbers,
op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
dag=dag,
)
def cube_numbers(numbers):
numbers = [int(n) for n in numbers.split()]
return [n ** 3 for n in numbers]
cube_op = PythonOperator(
task_id=’cube_numbers’,
python_callable=cube_numbers,
op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
dag=dag,
)
# Set the task dependencies
generate_numbers >> [square_op, cube_op]
To inform Azure Data Factory where our DAGs are, we must create a linked service by following these steps:
2. A dialog will open, and you must create or use a linked service:
3. A linked service for Azure Blob Storage will be created after you fill in all necessary information.
4. After creating the linked service, the previous dialog will show again. To find the right container and blob in your storage, hit the “Browse” button and navigate to the specific location:
If everything worked fine, the Import button should light up and your DAGs can be imported. The import process may take around a minute.
start the UI just click the Monitor button (a speedometer symbol)
new tab will open and welcome you with this overview screen:
Here you can see all your created DAGs, run or delete them, inspect them further or just check their current status. If you followed Step 3, our random_numers DAG will appear there.
If you have not specified a schedule_interval, you’ll have to manually run the DAG. Otherwise, it will be executed automatically at the specified time. Because we did not set an interval, we must hit the play button at the right and trigger it:
After starting the DAG you will see circles in a specific colour, which represent the current status as colour.
It will begin as queued (grey), changes to running (light green) and ends with success (dark green). Here are all possible colours as reference:
o inspect the DAG in Detail, you can click its name. After doing this you will see a more detailed status page:
At the top are multiple tabs like Grid (current page), Graph, Calendar and many more. Checkout the graph-tab:
This will give you a good visual representation of your DAG and where it might fail.
To see the results (like numbers) that our tasks have generated, we are using XComs (“cross-communications”) which are a mechanism that let tasks talk to each other. You have multiple ways to see these:
Accessing XComs per Graph View
If you are in the Graph View as mentioned previously, you can do the following steps:
1.Select your task of interest
2.Select XCom Button
3.See results under return_value
Accessing XComs per Admin View
If you want to see all XComs of all DAGs at once, you can do it, too!
Just click at the top-bar Admin->XComs:
In this post, we explored how Apache Airflow can provide a solid foundation for data workflow management, and how easily you can get started with the platform. We also explored how fast you can take your existing DAGs to the next level by moving it to Microsoft Azure Data Factory to enjoy the scalability, cost benefits of managed Airflow, and to reduce management overhead of on-premises Airflow. By leveraging the power and flexibility of Apache Airflow and Azure Data Factory, you can supercharge your data processing and management and achieve your goals faster and more efficiently than ever before.
Ready to take your data workflows to the next level? Make the move from on-premises Airflow to Azure Data Factory today and enjoy the power and flexibility of the platform. Our team can help you make the transition smoothly and efficiently. Contact us now to learn more!