Key Takeaways:

  • The traditional approach of managing Airflow with static, duplicated DAGs became cumbersome, prompting a shift to more efficient, dynamically generated DAGs using Python classes.
  • Centralizing configurations allowed Airflow to automatically adjust and regenerate DAGs, streamlining multi-tenant workflows with a standardized structure.
  • The new setup enhances scalability and ease of deployment, enabling GrowthLoop to efficiently manage diverse client needs on a single, multi-tenant Airflow cluster.
  • Table of Contents

    If you’ve been on the hunt for a workflow management platform, my guess is you’ve come across Apache Airflow already. Originally hailing from the corner of Airbnb, this widely used project is now under the Apache banner and is the tool of choice for many data teams. Airflow allows you to write complex workflows in a declarative manner and offers many out-of-box operators to do complex tasks.

    At we heavily use Airflow for complex ETL pipelines, machine learning pipelines, and statistical analysis. Moreover, we manage multiple airflow deployments and also run massive multi-tenant airflow clusters running a plethora of workloads. As we began to push airflow to its limits we recently undertook a reworking of how we deploy to airflow clusters — and dare I say found a better way to use airflow.

    Our main goal was to move away from the declarative format of deploying airflow and move more towards dynamically generated DAGs for flexibility and scalability — allowing us to quickly change what was running on airflow with as little as a feature flag modification. This lead to the inception of DAG factories.

    DAG Factories — Using a factory pattern with python classes that generate DAGs automatically based on dynamic input to the system.

    Enough with the backstory, it’s time to get to the exciting part. To set the stage, throughout this article we will assume that we want to execute two complex tasks in airflow — process_message & process_invoices.

    Before Picture

    Before we dive into the new setup, it’s important to take a quick detour to see how this would be generally done in airflow. One possible approach is to have a monorepo with individual folders for each of your projects. You can either duplicate DAGs in each project folder or have all your shared DAGs in a shared_dags folder. Your deployment pipeline can then pick the correct DAGs from the monorepo to push to each of the airflow clusters. In practice, the file structure might look something like this snippet.

    CODE: https://gist.github.com/tameem92/65715c34fee1515c36f111717a40721e.js

    As your project starts to grow, the complexity of DAGS and the deployment process will increase quickly. In a multi-tenant cluster, you would need the same DAG duplicated multiple times for airflow to consider them separately for each tenant.

    After Picture — DAG Factories

    Now, with DAG factories you get a better organizational structure that consolidates duplicated code across projects or tenants. By abstracting out the code into python classes that are responsible for the generation of DAG objects we make the setup more configurable. And best of all, it looks like Marie Kondo did a sweep of our monorepo and removed everything that didn’t spark joy.

    CODE: https://gist.github.com/tameem92/cd400701341cc62c3da1f4440e88eb3e.js

    Quick component breakdown 🕺🏽

    • projects/<name>/config.py — a file to fetch configuration from airflow variables or from a centralized config store
    • projects/<name>/main.py — the core file where we will call the factory methods to generate DAGs we want to run for a project
    • dag_factory — folder with all our DAGs in a factory pattern with a set format of standardized methods.

    Dynamic Configuration

    The first step towards this architecture was to get our airflow clusters to talk to a centralized configuration store on our platform. This allows airflow to dynamically fetch configurations from other services on the platform — like our web app, feature flags, or other business logic. We won’t dig too deep into the store itself but this is usually accomplished using tools like Etcd, Consul, or building your own thin configuration API.

    On the airflow side, the communication with the config store happens in the config.py file. We can easily fetch airflow variables or fetch configuration using the API for the config store. As these values change, airflow will automatically re-fetch and regenerate DAGs.

    CODE: https://gist.github.com/tameem92/1e2eab3dbaa8693715bd4a2e36c94bb1.js

    The Factory

    Moving on to the centerpiece, all our heavy lifting is being done in the dag_factory folder. The DAGFactory() class is responsible for mapping our supported dags in the factory and dynamically calling on the correct module based on the provided key.

    CODE: https://gist.github.com/tameem92/fb81df44b0c7b91266f3dc717bffa5e6.js

    As you can see in the gist above the create() function simply returns a correctly mapped dag builder class which can be called easily like this:

    factory = DAGFactory(config.environment)
    factory.create('ProcessMessages', config.process_messages)

    The python classes for the generation of DAGs for our process_invoices and process_messages tasks follow a specific format in order to be triggered from the DAGFactory. Each file has the following standardized structure:

    • init() — setups up global & dag specific configuration parameters.
    • build() — builds and returns the actual DAG object with all the tasks under it. This is where the actual bulk of the functionality for a task is defined.

    As an example, in this snippet, we take a look at the process_messages factory. It defines two tasks using KubernetesPodOperators to fetch and process messages. This DAG can now be called multiple times through our factory create() method with different variables — giving us dynamic variations of the same DAG.

    CODE: https://gist.github.com/tameem92/ac72dee0298f6190daddbccbca5543fc.js

    Putting it all together

    Everything eventually comes together in the projects/<project_name>/main.py files — where all that is left to do is call the build method on our factories and reconfigure those returned DAG objects under a parent. In this example, we can see how easily we can call on our factory methods to generate our DAG objects. Then we just add them to the main DAG using our trusty SubDagOperators.

    CODE: https://gist.github.com/tameem92/118521a9de3a274ddbb43d66a964ab71.js

    Once everything comes together, this setup also allows us to run a single cluster with multi-tenant DAG workloads. Let’s assume that our configuration store returns client_config which is an array of configurations for multiple clients. We can simply just loop over our client configs and call our factories to build variations of multiple DAGs for each client as shown in the next snippet. 🤯

    CODE: https://gist.github.com/tameem92/14547743b03730fb8ef9e0e361d1a6ef.js

    The bottom line

    Just like with any other tool, once you start to scale up it’s imperative to take a pause and rethink how things can be improved or optimized. Our need for a more dynamic and configuration-driven approach to airflow led us to build with dag factories, and hopefully, this will be helpful to the wider community using airflow. As for us, our airflow clusters are humming along and automatically building DAGs listening to configuration changes — allowing us to scale multi-tenant airflow clusters.

    Love talking tech or data?

    You’re in luck, me too! If you want to chat about cutting-edge technology, entrepreneurship, or the perils of being a startup founder, find me on Twitter or on LinkedIn.

    Published On:
    April 26, 2021
    Updated On:
    November 26, 2024
    Read Time:
    5 min
    Want to learn more?
    Book a Demo
    You May also like

    More from the Blog

    CDPs
    Why a composable CDP is key to your retail media network strategy

    Why a composable CDP is key to your retail media network strategy

    A retail media network (RMN) lets marketplaces monetize one of their best assets: customer data. Here’s how to drive more value from your RMN using a composable CDP.

    CDPs
    Why customer data platforms need to evolve to meet new industry demands

    Why customer data platforms need to evolve to meet new industry demands

    Find out how customer data platforms are being used by organizations and what features are essential for making full use of the technology’s potential.

    CDPs
    How does a composable CDP use large language models (LLM)?

    How does a composable CDP use large language models (LLM)?

    Learn how composable CDPs and LLMs work together for customized marketing campaign recommendations.

    Looking for guidance on your Data Warehouse?

    Supercharge your favorite marketing and sales tools with intelligent customer audiences built in BigQuery, Snowflake, or Redshift.

    Get Demo

    Unlock the full value of your customer data

    Get in touch with our team to learn how you can use GrowthLoop to activate data from your data warehouse to drive more revenue.

    Schedule a free demo
    Back to Blog
    Cloud Data Warehouse

    DAG Factories — A better way to Airflow

    Tameem Iftikhar

    Tameem Iftikhar

    If you’ve been on the hunt for a workflow management platform, my guess is you’ve come across Apache Airflow already. Originally hailing from the corner of Airbnb, this widely used project is now under the Apache banner and is the tool of choice for many data teams. Airflow allows you to write complex workflows in a declarative manner and offers many out-of-box operators to do complex tasks.

    At we heavily use Airflow for complex ETL pipelines, machine learning pipelines, and statistical analysis. Moreover, we manage multiple airflow deployments and also run massive multi-tenant airflow clusters running a plethora of workloads. As we began to push airflow to its limits we recently undertook a reworking of how we deploy to airflow clusters — and dare I say found a better way to use airflow.

    Our main goal was to move away from the declarative format of deploying airflow and move more towards dynamically generated DAGs for flexibility and scalability — allowing us to quickly change what was running on airflow with as little as a feature flag modification. This lead to the inception of DAG factories.

    DAG Factories — Using a factory pattern with python classes that generate DAGs automatically based on dynamic input to the system.

    Enough with the backstory, it’s time to get to the exciting part. To set the stage, throughout this article we will assume that we want to execute two complex tasks in airflow — process_message & process_invoices.

    Before Picture

    Before we dive into the new setup, it’s important to take a quick detour to see how this would be generally done in airflow. One possible approach is to have a monorepo with individual folders for each of your projects. You can either duplicate DAGs in each project folder or have all your shared DAGs in a shared_dags folder. Your deployment pipeline can then pick the correct DAGs from the monorepo to push to each of the airflow clusters. In practice, the file structure might look something like this snippet.

    CODE: https://gist.github.com/tameem92/65715c34fee1515c36f111717a40721e.js

    As your project starts to grow, the complexity of DAGS and the deployment process will increase quickly. In a multi-tenant cluster, you would need the same DAG duplicated multiple times for airflow to consider them separately for each tenant.

    After Picture — DAG Factories

    Now, with DAG factories you get a better organizational structure that consolidates duplicated code across projects or tenants. By abstracting out the code into python classes that are responsible for the generation of DAG objects we make the setup more configurable. And best of all, it looks like Marie Kondo did a sweep of our monorepo and removed everything that didn’t spark joy.

    CODE: https://gist.github.com/tameem92/cd400701341cc62c3da1f4440e88eb3e.js

    Quick component breakdown 🕺🏽

    • projects/<name>/config.py — a file to fetch configuration from airflow variables or from a centralized config store
    • projects/<name>/main.py — the core file where we will call the factory methods to generate DAGs we want to run for a project
    • dag_factory — folder with all our DAGs in a factory pattern with a set format of standardized methods.

    Dynamic Configuration

    The first step towards this architecture was to get our airflow clusters to talk to a centralized configuration store on our platform. This allows airflow to dynamically fetch configurations from other services on the platform — like our web app, feature flags, or other business logic. We won’t dig too deep into the store itself but this is usually accomplished using tools like Etcd, Consul, or building your own thin configuration API.

    On the airflow side, the communication with the config store happens in the config.py file. We can easily fetch airflow variables or fetch configuration using the API for the config store. As these values change, airflow will automatically re-fetch and regenerate DAGs.

    CODE: https://gist.github.com/tameem92/1e2eab3dbaa8693715bd4a2e36c94bb1.js

    The Factory

    Moving on to the centerpiece, all our heavy lifting is being done in the dag_factory folder. The DAGFactory() class is responsible for mapping our supported dags in the factory and dynamically calling on the correct module based on the provided key.

    CODE: https://gist.github.com/tameem92/fb81df44b0c7b91266f3dc717bffa5e6.js

    As you can see in the gist above the create() function simply returns a correctly mapped dag builder class which can be called easily like this:

    factory = DAGFactory(config.environment)
    factory.create('ProcessMessages', config.process_messages)

    The python classes for the generation of DAGs for our process_invoices and process_messages tasks follow a specific format in order to be triggered from the DAGFactory. Each file has the following standardized structure:

    • init() — setups up global & dag specific configuration parameters.
    • build() — builds and returns the actual DAG object with all the tasks under it. This is where the actual bulk of the functionality for a task is defined.

    As an example, in this snippet, we take a look at the process_messages factory. It defines two tasks using KubernetesPodOperators to fetch and process messages. This DAG can now be called multiple times through our factory create() method with different variables — giving us dynamic variations of the same DAG.

    CODE: https://gist.github.com/tameem92/ac72dee0298f6190daddbccbca5543fc.js

    Putting it all together

    Everything eventually comes together in the projects/<project_name>/main.py files — where all that is left to do is call the build method on our factories and reconfigure those returned DAG objects under a parent. In this example, we can see how easily we can call on our factory methods to generate our DAG objects. Then we just add them to the main DAG using our trusty SubDagOperators.

    CODE: https://gist.github.com/tameem92/118521a9de3a274ddbb43d66a964ab71.js

    Once everything comes together, this setup also allows us to run a single cluster with multi-tenant DAG workloads. Let’s assume that our configuration store returns client_config which is an array of configurations for multiple clients. We can simply just loop over our client configs and call our factories to build variations of multiple DAGs for each client as shown in the next snippet. 🤯

    CODE: https://gist.github.com/tameem92/14547743b03730fb8ef9e0e361d1a6ef.js

    The bottom line

    Just like with any other tool, once you start to scale up it’s imperative to take a pause and rethink how things can be improved or optimized. Our need for a more dynamic and configuration-driven approach to airflow led us to build with dag factories, and hopefully, this will be helpful to the wider community using airflow. As for us, our airflow clusters are humming along and automatically building DAGs listening to configuration changes — allowing us to scale multi-tenant airflow clusters.

    Love talking tech or data?

    You’re in luck, me too! If you want to chat about cutting-edge technology, entrepreneurship, or the perils of being a startup founder, find me on Twitter or on LinkedIn.

    Share on social media: 

    More from the Blog

    CDPs
    Why a composable CDP is key to your retail media network strategy

    Why a composable CDP is key to your retail media network strategy

    A retail media network (RMN) lets marketplaces monetize one of their best assets: customer data. Here’s how to drive more value from your RMN using a composable CDP.

    CDPs
    Why customer data platforms need to evolve to meet new industry demands

    Why customer data platforms need to evolve to meet new industry demands

    Find out how customer data platforms are being used by organizations and what features are essential for making full use of the technology’s potential.

    CDPs
    How does a composable CDP use large language models (LLM)?

    How does a composable CDP use large language models (LLM)?

    Learn how composable CDPs and LLMs work together for customized marketing campaign recommendations.

    Looking for guidance on your Data Warehouse?

    Supercharge your favorite marketing and sales tools with intelligent customer audiences built in BigQuery, Snowflake, or Redshift.

    Get Demo