Airflow: How to deploy DAG and plugin changes without downtimes.
I am writing this little guide with the hope of making life easier to any of my fellow Data Engineers.
Please have in mind that English is not my first language and that this is the very first time I dare to write anything like this in any blog.
What is this guide good for?
With this DAG you’ll be able to keep your dags
and plugins
directories up to date automatically, avoiding having to deploy a complete instance of Airflow for little to big DAG / plugin changes.
I have been using this approach in staging and in the productive environments without problems for these use cases:
DAGS:
- New Dags
- Changes in existing DAGS
- New Directories (
dag_bags
)
CUSTOM PLUGINS:
- New custom plugins
- Changes in custom plugins
Workflow
The complete workflow is as follows:

1- A bash sensor
is checking the master or staging branch for changes.
2- After a change is detected, a branch python operator
will validate if the DAG is running for the first time or not and will either:
git clone
the whole repo to a temporary directory (If the process is running for the first time)git pull
changes if the repo has been previously cloned into said temporary directory
3- A dummy operator
just letting us know that the previous step finished.
4- Two bash tasks
will run in parallel and willsync
the DAGS and plugins directories with the latest changes
5- A bash operator
will get the most recent git hash and will save it into an XCOM. (This will help us to keep a change log and get to a working git branch hash if necessary).
6- A python operator
will save the latest hash into an Airflow Variable, this for easier and faster access to the currently running git branch hash
Requirements:
This guide assumes you already have Airflow installed and running.
1- Airflow Variables
Create or import these 3 Airflow variables:
{
"current_branch_hash": "",
"git_branch": "master",
"git_repo": "git@github.your_repo.git"
}
current_branch_hash
will work only as quick reference to know what version of yourmaster
orstaging
branches you are running.git_branch
will specify which branch should Airflow be in sync with, I have either master or staging in this variable.git_repo
the SSH git repo address as in:

2- Git Access:
This approach connects to my GIT repo through an SSH Key, so make sure your airflow’s instance has an ssh key in:
~/.ssh/key_with_access_to_git
~/.ssh/key_with_access_to_git.pub
Make sure both keys have the right permissions:
chmod 400 ~/.ssh/key_with_access_to_git
chmod 400 ~/.ssh/key_with_access_to_git.pub
Also, you’ll need to to add the github host to your ~/.ssh/config
file:
Host github.com
IdentityFile ~/.ssh/key_with_access_to_git
IdentitiesOnly yes
StrictHostKeyChecking no
Next, Add the SSH key registered into your GIT Repo

If you are stuck in this part, Follow this guide to create SSH Keys and link them to Github (just remember to generate the key without passphrase).
This stack overflow question helped me a lot too.
DAG CODE
Copy and paste the following code in your dags
directory with the filename: SYNC_FILE-DONT_DELETE.py
DO NOT RENAME, THIS FILENAME IS USED IN THE LINE 99 OF THE CODE,
import logging, os
from datetime import datetime, timedeltafrom airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.bash_sensor import BashSensordefault_args = {
'owner': 'airflow',
'start_date': datetime(2019, 10, 16, 00, 00, 00),
'depend_on_past': False,
'catchup': False,
'retries': 5,
'retry_delay': timedelta(seconds=60),
'email': ['your@email.com'],
'email_on_failure': True,
'email_on_retry': True,
'provide_context': True
}home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"git_main_dags_directory = home_directory+"/git_dags"
git_dags_directory = git_main_dags_directory+"/src/dags/"
git_plugins_directory = git_main_dags_directory+"/src/plugins/"git_branch = Variable.get("git_branch", default_var='staging')
git_repo = Variable.get("git_repo")
logging.info(git_repo)def validate_git_dags_folder(**kwargs):
logging.info("validate_git_dags_folder STARTED")
if not os.path.exists(git_main_dags_directory):
os.makedirs(git_main_dags_directory)if len(os.listdir(git_main_dags_directory)) == 0:
return 'git_clone'return 'git_pull'def save_branch_hash(**context):
return Variable.set('current_branch_hash', context['task_instance'].xcom_pull(task_ids='get_last_branch_version'))with DAG('01_dag_git_sync',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False) as dag:sensor_dir_exists_cmd = """
# Check if the folder exists
if [ ! -d {git_dags_directory} ]; then
echo "{git_dags_directory} does not exist. Will create it and git clone repo"
exit 0
fi# check if there are changes in branch
cd {git_dags_directory}
git fetch
local_hash=`git rev-parse {branch}`
remote_hash=`git rev-parse origin/{branch}`# if [ $status -eq 0 ]
if [ $local_hash != $remote_hash ]
then
echo "{branch} is not updated... will pull recent changes"
exit 0
else
echo "Everything is updated with branch {branch}"
exit 1
fi
""".format(branch = git_branch, git_dags_directory = git_main_dags_directory)sensor_dir_exists = BashSensor(task_id='git_changes_sensor', bash_command=sensor_dir_exists_cmd, poke_interval= 60,)""" Validates if the git folder is empty or not """
validate_git_folder = BranchPythonOperator(task_id='validate_git_folder', python_callable=validate_git_dags_folder)""" If the git folder is empty, clone the repo """
bash_command_clone = "git clone --single-branch --branch {} {} {}".format(git_branch, git_repo, git_main_dags_directory)
logging.info("bash command sent to server: {}".format(bash_command_clone))
git_clone = BashOperator(task_id='git_clone', bash_command=bash_command_clone)""" If the git folder is not empty, pull the latest changes """
bash_command_pull = "git -C {} pull origin {}".format(git_main_dags_directory, git_branch)
git_pull = BashOperator(task_id='git_pull', bash_command=bash_command_pull)""" Dummy operator (DO NOT DELETE, IT WOULD BREAK THE FLOW) """
finished_pulling = DummyOperator(task_id='finished_pulling', dag=dag, trigger_rule='none_failed')""" Sync Dags Directory """
bash_command_sync_dags = "rsync -rv --delete --exclude '*.pyc' --exclude 'SYNC_FILE-DONT_DELETE.py' {} {} ".format(git_dags_directory, dags_directory)
git_sync_dags = BashOperator(task_id='git_sync_dags', bash_command=bash_command_sync_dags)""" Sync Plugin Directory """
bash_command_sync_plugins = "rsync -rv --delete --exclude '*.pyc' {} {} ".format(git_plugins_directory, plugins_directory)
git_sync_plugins = BashOperator(task_id='git_sync_plugins', bash_command=bash_command_sync_plugins)""" Get branch last hash and save it into an XCOM"""
bash_command_get_branch_hash = 'git -C {} rev-parse {}'.format(git_main_dags_directory, git_branch)
get_branch_hash = BashOperator(task_id='get_last_branch_version', bash_command=bash_command_get_branch_hash, xcom_push=True)""" Save HASH version into Airflow Variable"""
save_branch_hash = PythonOperator(task_id='save_last_branch_version', python_callable=save_branch_hash, provide_context=True)""" DAG Flow """
sensor_dir_exists >> validate_git_folder >> [git_clone, git_pull] >> finished_pulling >> [ git_sync_dags, git_sync_plugins ] >> get_branch_hash >> save_branch_hashChange Paths:
If you don’t have an AIRFLOW_HOME
environment variable, change the code in lines 24, 25, 26
to match your airflow home directory.
In the same lines, make sure the paths for the dags
and plugins
directories are correct:
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"
Testing it
After deploying your DAG, it should automatically trigger the first time and clone
the repo.
Make a simple change in your master
or staging
branch and push changes to origin.
The sensor should detect the changes and trigger the whole process again.
Get history of git branch hashes
In Airflow go to: Admin > XComs
and add a filter:
Task Id = get_last_branch_version

This will get you all the branches, this should help you to go back to a working branch if needed.
Notes
- New Dags will take a while to appear in the DAGS list, this depends on the
dag_dir_list_interval
configuration. - For some reason, Airflow takes longer to notice changes to plugins, but in my experience they take about 1 or 2 minutes to be refreshed in Airflow.
- This DAG is great for staging and production environments, but it’s a real pain for local changes, so when working locally, make sure to have this DAG turned off or else it will be overwriting your local changes every minute.
- The Sensor is checking for new changes every 60 seconds, this can be changed in line 81
poke_interval=60
. - The Sensor will fail after 7 days of constantly working (I guess it’s an Airlfow task timeout setting) but it will immediately run again sensing for git changes.
- Have in mind that a sensor always running means that 1 task is always running, hence, if your Airflow configuration is set to run 16 tasks simultaneously, you’ll always be using 1 so now you’ll have 15 task slots available.
This can be easily fixed changing thedag_concurrency
configuration:dag_concurrency = 17
I hope this guide was clear enough and that it can help more than 1 person.
Reach me in the comments if you have questions.
Thanks for reading