Airflow: How to deploy DAG and plugin changes without downtimes.

What is this guide good for?

  • New Dags
  • Changes in existing DAGS
  • New Directories ( dag_bags )
  • New custom plugins
  • Changes in custom plugins


  • git clone the whole repo to a temporary directory (If the process is running for the first time)
  • git pull changes if the repo has been previously cloned into said temporary directory


"current_branch_hash": "",
"git_branch": "master",
"git_repo": "git@github.your_repo.git"
  • current_branch_hash will work only as quick reference to know what version of your master or staging branches you are running.
  • git_branch will specify which branch should Airflow be in sync with, I have either master or staging in this variable.
  • git_repo the SSH git repo address as in:
chmod 400 ~/.ssh/key_with_access_to_git
chmod 400 ~/.ssh/
IdentityFile ~/.ssh/key_with_access_to_git
IdentitiesOnly yes
StrictHostKeyChecking no


import logging, os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.bash_sensor import BashSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 10, 16, 00, 00, 00),
'depend_on_past': False,
'catchup': False,
'retries': 5,
'retry_delay': timedelta(seconds=60),
'email': [''],
'email_on_failure': True,
'email_on_retry': True,
'provide_context': True
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"
git_main_dags_directory = home_directory+"/git_dags"
git_dags_directory = git_main_dags_directory+"/src/dags/"
git_plugins_directory = git_main_dags_directory+"/src/plugins/"
git_branch = Variable.get("git_branch", default_var='staging')
git_repo = Variable.get("git_repo")
def validate_git_dags_folder(**kwargs):"validate_git_dags_folder STARTED")
if not os.path.exists(git_main_dags_directory):
if len(os.listdir(git_main_dags_directory)) == 0:
return 'git_clone'
return 'git_pull'def save_branch_hash(**context):
return Variable.set('current_branch_hash', context['task_instance'].xcom_pull(task_ids='get_last_branch_version'))
with DAG('01_dag_git_sync',
schedule_interval='* * * * *',
catchup=False) as dag:
sensor_dir_exists_cmd = """
# Check if the folder exists
if [ ! -d {git_dags_directory} ]; then
echo "{git_dags_directory} does not exist. Will create it and git clone repo"
exit 0
# check if there are changes in branch
cd {git_dags_directory}
git fetch
local_hash=`git rev-parse {branch}`
remote_hash=`git rev-parse origin/{branch}`
# if [ $status -eq 0 ]
if [ $local_hash != $remote_hash ]
echo "{branch} is not updated... will pull recent changes"
exit 0
echo "Everything is updated with branch {branch}"
exit 1
""".format(branch = git_branch, git_dags_directory = git_main_dags_directory)
sensor_dir_exists = BashSensor(task_id='git_changes_sensor', bash_command=sensor_dir_exists_cmd, poke_interval= 60,)""" Validates if the git folder is empty or not """
validate_git_folder = BranchPythonOperator(task_id='validate_git_folder', python_callable=validate_git_dags_folder)
""" If the git folder is empty, clone the repo """
bash_command_clone = "git clone --single-branch --branch {} {} {}".format(git_branch, git_repo, git_main_dags_directory)"bash command sent to server: {}".format(bash_command_clone))
git_clone = BashOperator(task_id='git_clone', bash_command=bash_command_clone)
""" If the git folder is not empty, pull the latest changes """
bash_command_pull = "git -C {} pull origin {}".format(git_main_dags_directory, git_branch)
git_pull = BashOperator(task_id='git_pull', bash_command=bash_command_pull)
""" Dummy operator (DO NOT DELETE, IT WOULD BREAK THE FLOW) """
finished_pulling = DummyOperator(task_id='finished_pulling', dag=dag, trigger_rule='none_failed')
""" Sync Dags Directory """
bash_command_sync_dags = "rsync -rv --delete --exclude '*.pyc' --exclude '' {} {} ".format(git_dags_directory, dags_directory)
git_sync_dags = BashOperator(task_id='git_sync_dags', bash_command=bash_command_sync_dags)
""" Sync Plugin Directory """
bash_command_sync_plugins = "rsync -rv --delete --exclude '*.pyc' {} {} ".format(git_plugins_directory, plugins_directory)
git_sync_plugins = BashOperator(task_id='git_sync_plugins', bash_command=bash_command_sync_plugins)
""" Get branch last hash and save it into an XCOM"""
bash_command_get_branch_hash = 'git -C {} rev-parse {}'.format(git_main_dags_directory, git_branch)
get_branch_hash = BashOperator(task_id='get_last_branch_version', bash_command=bash_command_get_branch_hash, xcom_push=True)
""" Save HASH version into Airflow Variable"""
save_branch_hash = PythonOperator(task_id='save_last_branch_version', python_callable=save_branch_hash, provide_context=True)
""" DAG Flow """
sensor_dir_exists >> validate_git_folder >> [git_clone, git_pull] >> finished_pulling >> [ git_sync_dags, git_sync_plugins ] >> get_branch_hash >> save_branch_hashChange Paths:
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"

Testing it

Get history of git branch hashes


  • New Dags will take a while to appear in the DAGS list, this depends on the dag_dir_list_interval configuration.
  • For some reason, Airflow takes longer to notice changes to plugins, but in my experience they take about 1 or 2 minutes to be refreshed in Airflow.
  • This DAG is great for staging and production environments, but it’s a real pain for local changes, so when working locally, make sure to have this DAG turned off or else it will be overwriting your local changes every minute.
  • The Sensor is checking for new changes every 60 seconds, this can be changed in line 81 poke_interval=60 .
  • The Sensor will fail after 7 days of constantly working (I guess it’s an Airlfow task timeout setting) but it will immediately run again sensing for git changes.
  • Have in mind that a sensor always running means that 1 task is always running, hence, if your Airflow configuration is set to run 16 tasks simultaneously, you’ll always be using 1 so now you’ll have 15 task slots available.
    This can be easily fixed changing the dag_concurrency configuration: dag_concurrency = 17




Data Engineer

Applying Builder Pattern for Sign in process (User-Password, Facebook, Google)

How I came up with the idea of — The External Cron Jobs System 🚀

Serializers in Rails

Improving web performance by downloading the image efficiently.

PVS-Studio Team: Switching to Clang Improved PVS-Studio C++ Analyzer’s Performance


Open sourcing rpc_ts, an RPC framework for TypeScript

Why I love Java

