Airflow: How to deploy DAG and plugin changes without downtimes.

I am writing this little guide with the hope of making life easier to any of my fellow Data Engineers.

Please have in mind that English is not my first language and that this is the very first time I dare to write anything like this in any blog.

What is this guide good for?

I have been using this approach in staging and in the productive environments without problems for these use cases:

DAGS:

  • New Dags
  • Changes in existing DAGS
  • New Directories ( dag_bags )

CUSTOM PLUGINS:

  • New custom plugins
  • Changes in custom plugins

Workflow

01_dag_git_sync

1- A bash sensor is checking the master or staging branch for changes.

2- After a change is detected, a branch python operator will validate if the DAG is running for the first time or not and will either:

  • git clone the whole repo to a temporary directory (If the process is running for the first time)
  • git pull changes if the repo has been previously cloned into said temporary directory

3- A dummy operator just letting us know that the previous step finished.

4- Two bash tasks will run in parallel and willsync the DAGS and plugins directories with the latest changes

5- A bash operator will get the most recent git hash and will save it into an XCOM. (This will help us to keep a change log and get to a working git branch hash if necessary).

6- A python operator will save the latest hash into an Airflow Variable, this for easier and faster access to the currently running git branch hash

Requirements:

1- Airflow Variables

Create or import these 3 Airflow variables:

{
"current_branch_hash": "",
"git_branch": "master",
"git_repo": "git@github.your_repo.git"
}
  • current_branch_hash will work only as quick reference to know what version of your master or staging branches you are running.
  • git_branch will specify which branch should Airflow be in sync with, I have either master or staging in this variable.
  • git_repo the SSH git repo address as in:

2- Git Access:

This approach connects to my GIT repo through an SSH Key, so make sure your airflow’s instance has an ssh key in:

~/.ssh/key_with_access_to_git
~/.ssh/key_with_access_to_git.pub

Make sure both keys have the right permissions:

chmod 400 ~/.ssh/key_with_access_to_git
chmod 400 ~/.ssh/key_with_access_to_git.pub

Also, you’ll need to to add the github host to your ~/.ssh/config file:

Host github.com
IdentityFile ~/.ssh/key_with_access_to_git
IdentitiesOnly yes
StrictHostKeyChecking no

Next, Add the SSH key registered into your GIT Repo

If you are stuck in this part, Follow this guide to create SSH Keys and link them to Github (just remember to generate the key without passphrase).

This stack overflow question helped me a lot too.

DAG CODE

DO NOT RENAME, THIS FILENAME IS USED IN THE LINE 99 OF THE CODE,

import logging, os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.bash_sensor import BashSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 10, 16, 00, 00, 00),
'depend_on_past': False,
'catchup': False,
'retries': 5,
'retry_delay': timedelta(seconds=60),
'email': ['your@email.com'],
'email_on_failure': True,
'email_on_retry': True,
'provide_context': True
}
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"
git_main_dags_directory = home_directory+"/git_dags"
git_dags_directory = git_main_dags_directory+"/src/dags/"
git_plugins_directory = git_main_dags_directory+"/src/plugins/"
git_branch = Variable.get("git_branch", default_var='staging')
git_repo = Variable.get("git_repo")
logging.info(git_repo)
def validate_git_dags_folder(**kwargs):
logging.info("validate_git_dags_folder STARTED")
if not os.path.exists(git_main_dags_directory):
os.makedirs(git_main_dags_directory)
if len(os.listdir(git_main_dags_directory)) == 0:
return 'git_clone'
return 'git_pull'def save_branch_hash(**context):
return Variable.set('current_branch_hash', context['task_instance'].xcom_pull(task_ids='get_last_branch_version'))
with DAG('01_dag_git_sync',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False) as dag:
sensor_dir_exists_cmd = """
# Check if the folder exists
if [ ! -d {git_dags_directory} ]; then
echo "{git_dags_directory} does not exist. Will create it and git clone repo"
exit 0
fi
# check if there are changes in branch
cd {git_dags_directory}
git fetch
local_hash=`git rev-parse {branch}`
remote_hash=`git rev-parse origin/{branch}`
# if [ $status -eq 0 ]
if [ $local_hash != $remote_hash ]
then
echo "{branch} is not updated... will pull recent changes"
exit 0
else
echo "Everything is updated with branch {branch}"
exit 1
fi
""".format(branch = git_branch, git_dags_directory = git_main_dags_directory)
sensor_dir_exists = BashSensor(task_id='git_changes_sensor', bash_command=sensor_dir_exists_cmd, poke_interval= 60,)""" Validates if the git folder is empty or not """
validate_git_folder = BranchPythonOperator(task_id='validate_git_folder', python_callable=validate_git_dags_folder)
""" If the git folder is empty, clone the repo """
bash_command_clone = "git clone --single-branch --branch {} {} {}".format(git_branch, git_repo, git_main_dags_directory)
logging.info("bash command sent to server: {}".format(bash_command_clone))
git_clone = BashOperator(task_id='git_clone', bash_command=bash_command_clone)
""" If the git folder is not empty, pull the latest changes """
bash_command_pull = "git -C {} pull origin {}".format(git_main_dags_directory, git_branch)
git_pull = BashOperator(task_id='git_pull', bash_command=bash_command_pull)
""" Dummy operator (DO NOT DELETE, IT WOULD BREAK THE FLOW) """
finished_pulling = DummyOperator(task_id='finished_pulling', dag=dag, trigger_rule='none_failed')
""" Sync Dags Directory """
bash_command_sync_dags = "rsync -rv --delete --exclude '*.pyc' --exclude 'SYNC_FILE-DONT_DELETE.py' {} {} ".format(git_dags_directory, dags_directory)
git_sync_dags = BashOperator(task_id='git_sync_dags', bash_command=bash_command_sync_dags)
""" Sync Plugin Directory """
bash_command_sync_plugins = "rsync -rv --delete --exclude '*.pyc' {} {} ".format(git_plugins_directory, plugins_directory)
git_sync_plugins = BashOperator(task_id='git_sync_plugins', bash_command=bash_command_sync_plugins)
""" Get branch last hash and save it into an XCOM"""
bash_command_get_branch_hash = 'git -C {} rev-parse {}'.format(git_main_dags_directory, git_branch)
get_branch_hash = BashOperator(task_id='get_last_branch_version', bash_command=bash_command_get_branch_hash, xcom_push=True)
""" Save HASH version into Airflow Variable"""
save_branch_hash = PythonOperator(task_id='save_last_branch_version', python_callable=save_branch_hash, provide_context=True)
""" DAG Flow """
sensor_dir_exists >> validate_git_folder >> [git_clone, git_pull] >> finished_pulling >> [ git_sync_dags, git_sync_plugins ] >> get_branch_hash >> save_branch_hashChange Paths:

If you don’t have an AIRFLOW_HOME environment variable, change the code in lines 24, 25, 26 to match your airflow home directory.

In the same lines, make sure the paths for the dags and plugins directories are correct:

home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"

Testing it

Make a simple change in your master or staging branch and push changes to origin.

The sensor should detect the changes and trigger the whole process again.

Get history of git branch hashes

Task Id = get_last_branch_version

This will get you all the branches, this should help you to go back to a working branch if needed.

Notes

  • For some reason, Airflow takes longer to notice changes to plugins, but in my experience they take about 1 or 2 minutes to be refreshed in Airflow.
  • This DAG is great for staging and production environments, but it’s a real pain for local changes, so when working locally, make sure to have this DAG turned off or else it will be overwriting your local changes every minute.
  • The Sensor is checking for new changes every 60 seconds, this can be changed in line 81 poke_interval=60 .
  • The Sensor will fail after 7 days of constantly working (I guess it’s an Airlfow task timeout setting) but it will immediately run again sensing for git changes.
  • Have in mind that a sensor always running means that 1 task is always running, hence, if your Airflow configuration is set to run 16 tasks simultaneously, you’ll always be using 1 so now you’ll have 15 task slots available.
    This can be easily fixed changing the dag_concurrency configuration: dag_concurrency = 17

I hope this guide was clear enough and that it can help more than 1 person.

Reach me in the comments if you have questions.

Thanks for reading

Data Engineer