Managed Apache Airflow with Azure Data Factory

Miguel Mioskowski
5. April 2023
Reading time: 2 min
Managed Apache Airflow with Azure Data Factory

What is Apache Airflow?

Apache Airflow is an open-source platform that helps to manage and automate complex workflows including ETL job orchestration. It allows you to easily schedule, monitor, and manage tasks involved in ETL processes, making it a valuable tool for businesses that need to streamline and organize their processes. With Airflow, you can visualize and manage your workflows as directed acyclic graphs (DAGs) and programmatically specify dependencies between tasks with the help of Python. 

What is Azure Data Factory?

Azure Data Factory is a cloud-based data integration service provided by Microsoft Azure. It helps organizations manage and process large amounts of data from various sources. With Azure Data Factory, you can create workflows to extract, transform, and load data (ETL) into the desired data stores. This service can be used to move and integrate data from a variety of sources, including on-premises databases, cloud-based storage solutions, and big data platforms. Azure Data Factory also provides built-in data processing capabilities, allowing you to clean, transform, and process data before loading it into your data stores. The service is highly scalable, secure and can be managed or monitored through a user-friendly interface, making it accessible to non-technical users as well. In short, Azure Data Factory is a powerful solution for managing and processing data, providing organizations with the ability to unlock insights and make data-driven decisions.

Integration of Airflow into Azure Data Factory

In February 2023, Microsoft announced the launch of Managed Airflow, which allows users to run Apache Airflow within Azure Data Factory. This is very valuable for customers with an Airflow investment.

The integration provides various benefits, such as:

  • Creating and managing Airflow environments with just a click
  • Easy migration of existing Airflow DAGs into Azure (lift and shift)
  • Built-in authentication and authorization with Azure Active Directory
  • Improved scalability, security

To demonstrate how simple it is to get started with Managed Airflow, we at the Big Data & Analytics Business Unit have created a comprehensive step-by-step guide that outlines the process involved in running an environment.

Step-by-Step: Mastering Managed Airflow

 class=
Image 1: Steps to get started (https://techcommunity.microsoft.com/t5/azure-data-factory-blog/introducing-managed-airflow-in-azure-data-factory/ba-p/3730151)

Prerequisites

To proceed with this guide, you will need to set up several prerequisites:

  • Azure Account: If you don’t already have one, you can sign up for a Free Trial
  • Azure Blob Storage: Cost-effective storage for unstructured files (needed for our DAGs)
  • Azure Data Factory: Create or use an existing Data Factory that supports Managed Airflows Availability Zones: East US, South Central US, West US, UK South, North Europe, Southeast Asia, Germany West Central)

Step 1: Create an Airflow environment

  1. Launch Azure Data Factory Studio, and on the home screen (represented by a house symbol on the left), navigate to the Manage hub (represented by a bag symbol).
  2. Under the Airflow (Preview) option, select Airflow.
  3. To create your environment, click the New button, and a setup dialog box will appear. You can leave all settings at their default values, except for choosing a name for your environment. 
 class=
Image 2: Navigating to Managed Airflow in Azure Data Factory


4. After clicking the create button, you should see your new Airflow environment displayed on the main screen.

 class=
Image 3: Airflow setup dialog

Step 2: Create a blob for the DAGs

Because Managed Airflow does not store our DAGs directly, we need to save them in a blob and connect them later with a linked service to our Airflow environment.

To do so, follow these two steps:

  1. Create or select a container in Azure Blob Storage (for example, you can create a container called “apache-airflow-playground”).
  2. Within this container, create a blob that must be named dags and plugins (if the blob is not named like this, the connection will not work).

If you already have existing DAG files, you have the option to upload them into the dags blob instead of following the next step, and then proceed with Step 4: Connect the DAGs with Managed Airflow.

Step 3: Create a DAG

We want to create a DAG that consists of 3 tasks:

  • generate_numbers: A Bash-script that generates 10 random numbers in an array and sleeps for 3 seconds
  • square_numbers: A Python-script that receives an array of numbers and returns an array with the square of each number
  • cube_numbers: A Python-script that receives an array of numbers and returns an array with the cubeof each number

At first, we want to execute generate_numbers and after this task finishes, square_numbers and cube_numbers should start in parallel. Like in this visualization:

 class=
Image 4: Visualization of DAG to build

DAG definition

default_args = {
   ‘owner’: ‘airflow’,
   ‘depends_on_past’: False,
   ‘start_date’: datetime(2023, 2, 14),
   ‘retries’: 1,
   ‘retry_delay’: timedelta(minutes=5)
}

dag = DAG(‘random_numbers’, default_args=default_args, schedule_interval=None)

default_args defines some default arguments that are used by all the tasks in the DAG, unless the individual tasks override them. In this example the following arguments are especially important:

  • depends_on_past: When set to True, keeps a task from getting triggered, if the previous schedule for the task hasn’t succeeded
  • start_date: This is the date and time when the first run of the DAG will occur (datetime object)
  • retries: The number of times a task should be retried in case of failure
  • retry_delay: The amount of time to wait before retrying a failed task (timedelta object)

After that you can define the DAG itself, using the DAG class provided by Airflow:

  • ‘random_numbers’: The name of the DAG
  • default_args=default_args: Sets the default arguments we previously defined
  • schedule_interval=None: The frequency of the DAG run. Setting this to None means we have to trigger it manually

Tasks definition

We define tasks in Apache Airflow using operators such as BashOperator or PythonOperator, which are wrapper classes that execute their respective scripts.

Every task gets a task_id and a reference to the DAG. Other arguments depend on the type of operator you are going to use. We won’t go over every line of code in the following sections, so please refer to the documentation for more detailed information.

Generate numbers

The following code will generate 10 random numbers in an array, return it, and sleep for 3 seconds:

generate_numbers = BashOperator(
   task_id=’generate_numbers’,
   bash_command=’my_numbers=(); for i in {1..10}; do my_numbers+=($RANDOM); done; echo “${my_numbers[@]}”; sleep 3;’,
   dag=dag,
)

Square and cube numbers

These two tasks are very similiar. Here it is important to reference the generated numbers from the previous task and also cast them to integers. You can define a function outside and reference it per python_callable:

def square_numbers(numbers):
   numbers = [int(n) for n in numbers.split()]
   return [n ** 2 for n in numbers]

square_op = PythonOperator(
   task_id=’square_numbers’,
   python_callable=square_numbers,
   op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
   dag=dag,
)

def cube_numbers(numbers):
   numbers = [int(n) for n in numbers.split()]
   return [n ** 3 for n in numbers]

cube_op = PythonOperator(
   task_id=’cube_numbers’,
   python_callable=cube_numbers,
   op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
   dag=dag,
)

Set task dependencies

Now we have to define which task gets executed when and how it depends on previos tasks. In our example we wanted to execute generate_numbers first and afterwards execute square_numbers and cube_numbers in parallel. We define this dependencies using the >> and [] operators:

  • A >> B: After task A follows task B
  • [A, B]: Tasks A and B are running in parallel

In our case: 

generate_numbers >> [square_op, cube_op]

Now we are done! The resulting DAG file should look like this:

import random
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Define default arguments for the DAG
default_args = {
   ‘owner’: ‘airflow’,
   ‘depends_on_past’: False,
   ‘start_date’: datetime(2023, 2, 14),
   ‘retries’: 1,
   ‘retry_delay’: timedelta(minutes=5)
}

# Define the DAG
dag = DAG(‘random_numbers’, default_args=default_args, schedule_interval=None)

# Define a BashOperator that generates 10 random numbers and sleeps for 3 seconds
generate_numbers = BashOperator(
   task_id=’generate_numbers’,
   bash_command=’my_numbers=(); for i in {1..10}; do my_numbers+=($RANDOM); done; echo “${my_numbers[@]}”; sleep 3;’,
   dag=dag,
)

# Define two PythonOperators that square and cube the generated numbers, respectively
def square_numbers(numbers):
   numbers = [int(n) for n in numbers.split()]
   return [n ** 2 for n in numbers]

square_op = PythonOperator(
   task_id=’square_numbers’,
   python_callable=square_numbers,
   op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
   dag=dag,
)

def cube_numbers(numbers):
   numbers = [int(n) for n in numbers.split()]
   return [n ** 3 for n in numbers]

cube_op = PythonOperator(
   task_id=’cube_numbers’,
   python_callable=cube_numbers,
   op_kwargs={‘numbers’: ‘{{ task_instance.xcom_pull(task_ids=”generate_numbers”) }}’},
   dag=dag,
)

# Set the task dependencies
generate_numbers >> [square_op, cube_op]

Step 4: Connect the DAGs with Managed Airflow

To inform Azure Data Factory where our DAGs are, we must create a linked service by following these steps:

  1. Hover over the Airflow environment and select “Import Files” 
 class=
Image 5: Location of importing DAGs in environment


2. A dialog will open, and you must create or use a linked service:

 class=
Image 6: Import DAG dialog


3. A linked service for Azure Blob Storage will be created after you fill in all necessary information.

 class=
Image 7: Linked service dialog


4. After creating the linked service, the previous dialog will show again. To find the right container and blob in your storage, hit the “Browse” button and navigate to the specific location:

 class=
Image 8: Imprt DAG dialog with file browser location

If everything worked fine, the Import button should light up and your DAGs can be imported. The import process may take around a minute.

Step 5: Run and monitor

 start the UI just click the Monitor button (a speedometer symbol)

 class=
Image 9: Button to start Managed Airflow UI

 new tab will open and welcome you with this overview screen:

 class=
Image 10: Main screen of Airflow UI

Here you can see all your created DAGs, run or delete them, inspect them further or just check their current status. If you followed Step 3, our random_numers DAG will appear there.

Run the DAG

If you have not specified a schedule_interval, you’ll have to manually run the DAG. Otherwise, it will be executed automatically at the specified time. Because we did not set an interval, we must hit the play button at the right and trigger it:

 class=
Image 11: Button to trigger a DAG manually

Monitor current status

After starting the DAG you will see circles in a specific colour, which represent the current status as colour.


It will begin as queued (grey), changes to running (light green) and ends with success (dark green). Here are all possible colours as reference:

 class=
Image 12: DAG status colors

o inspect the DAG in Detail, you can click its name. After doing this you will see a more detailed status page:

 class=
Image 13: DAG detail page

At the top are multiple tabs like Grid (current page), Graph, Calendar and many more. Checkout the graph-tab:

 class=
Image 14: DAG visualized in a graph view

This will give you a good visual representation of your DAG and where it might fail.

See the results

To see the results (like numbers) that our tasks have generated, we are using XComs (“cross-communications”) which are a mechanism that let tasks talk to each other. You have multiple ways to see these:

Accessing XComs per Graph View

If you are in the Graph View as mentioned previously, you can do the following steps:

 class=

1.Select your task of interest

 class=

2.Select XCom Button

 class=

3.See results under return_value

Accessing XComs per Admin View

If you want to see all XComs of all DAGs at once, you can do it, too!
Just click at the top-bar Admin->XComs:

 class=

Conclusion

In this post, we explored how Apache Airflow can provide a solid foundation for data workflow management, and how easily you can get started with the platform. We also explored how fast you can take your existing DAGs to the next level by moving it to Microsoft Azure Data Factory to enjoy the scalability, cost benefits of managed Airflow, and to reduce management overhead of on-premises Airflow. By leveraging the power and flexibility of Apache Airflow and Azure Data Factory, you can supercharge your data processing and management and achieve your goals faster and more efficiently than ever before.

Ready to take your data workflows to the next level? Make the move from on-premises Airflow to Azure Data Factory today and enjoy the power and flexibility of the platform. Our team can help you make the transition smoothly and efficiently. Contact us now to learn more!