Airflow: How to deploy DAG and plugin changes without downtimes.

What is this guide good for?

  • New Dags
  • Changes in existing DAGS
  • New Directories ( dag_bags )
  • New custom plugins
  • Changes in custom plugins

Workflow

01_dag_git_sync
  • git clone the whole repo to a temporary directory (If the process is running for the first time)
  • git pull changes if the repo has been previously cloned into said temporary directory

Requirements:

{
"current_branch_hash": "",
"git_branch": "master",
"git_repo": "git@github.your_repo.git"
}
  • current_branch_hash will work only as quick reference to know what version of your master or staging branches you are running.
  • git_branch will specify which branch should Airflow be in sync with, I have either master or staging in this variable.
  • git_repo the SSH git repo address as in:
~/.ssh/key_with_access_to_git
~/.ssh/key_with_access_to_git.pub
chmod 400 ~/.ssh/key_with_access_to_git
chmod 400 ~/.ssh/key_with_access_to_git.pub
Host github.com
IdentityFile ~/.ssh/key_with_access_to_git
IdentitiesOnly yes
StrictHostKeyChecking no

DAG CODE

import logging, os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.bash_sensor import BashSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 10, 16, 00, 00, 00),
'depend_on_past': False,
'catchup': False,
'retries': 5,
'retry_delay': timedelta(seconds=60),
'email': ['your@email.com'],
'email_on_failure': True,
'email_on_retry': True,
'provide_context': True
}
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"
git_main_dags_directory = home_directory+"/git_dags"
git_dags_directory = git_main_dags_directory+"/src/dags/"
git_plugins_directory = git_main_dags_directory+"/src/plugins/"
git_branch = Variable.get("git_branch", default_var='staging')
git_repo = Variable.get("git_repo")
logging.info(git_repo)
def validate_git_dags_folder(**kwargs):
logging.info("validate_git_dags_folder STARTED")
if not os.path.exists(git_main_dags_directory):
os.makedirs(git_main_dags_directory)
if len(os.listdir(git_main_dags_directory)) == 0:
return 'git_clone'
return 'git_pull'def save_branch_hash(**context):
return Variable.set('current_branch_hash', context['task_instance'].xcom_pull(task_ids='get_last_branch_version'))
with DAG('01_dag_git_sync',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False) as dag:
sensor_dir_exists_cmd = """
# Check if the folder exists
if [ ! -d {git_dags_directory} ]; then
echo "{git_dags_directory} does not exist. Will create it and git clone repo"
exit 0
fi
# check if there are changes in branch
cd {git_dags_directory}
git fetch
local_hash=`git rev-parse {branch}`
remote_hash=`git rev-parse origin/{branch}`
# if [ $status -eq 0 ]
if [ $local_hash != $remote_hash ]
then
echo "{branch} is not updated... will pull recent changes"
exit 0
else
echo "Everything is updated with branch {branch}"
exit 1
fi
""".format(branch = git_branch, git_dags_directory = git_main_dags_directory)
sensor_dir_exists = BashSensor(task_id='git_changes_sensor', bash_command=sensor_dir_exists_cmd, poke_interval= 60,)""" Validates if the git folder is empty or not """
validate_git_folder = BranchPythonOperator(task_id='validate_git_folder', python_callable=validate_git_dags_folder)
""" If the git folder is empty, clone the repo """
bash_command_clone = "git clone --single-branch --branch {} {} {}".format(git_branch, git_repo, git_main_dags_directory)
logging.info("bash command sent to server: {}".format(bash_command_clone))
git_clone = BashOperator(task_id='git_clone', bash_command=bash_command_clone)
""" If the git folder is not empty, pull the latest changes """
bash_command_pull = "git -C {} pull origin {}".format(git_main_dags_directory, git_branch)
git_pull = BashOperator(task_id='git_pull', bash_command=bash_command_pull)
""" Dummy operator (DO NOT DELETE, IT WOULD BREAK THE FLOW) """
finished_pulling = DummyOperator(task_id='finished_pulling', dag=dag, trigger_rule='none_failed')
""" Sync Dags Directory """
bash_command_sync_dags = "rsync -rv --delete --exclude '*.pyc' --exclude 'SYNC_FILE-DONT_DELETE.py' {} {} ".format(git_dags_directory, dags_directory)
git_sync_dags = BashOperator(task_id='git_sync_dags', bash_command=bash_command_sync_dags)
""" Sync Plugin Directory """
bash_command_sync_plugins = "rsync -rv --delete --exclude '*.pyc' {} {} ".format(git_plugins_directory, plugins_directory)
git_sync_plugins = BashOperator(task_id='git_sync_plugins', bash_command=bash_command_sync_plugins)
""" Get branch last hash and save it into an XCOM"""
bash_command_get_branch_hash = 'git -C {} rev-parse {}'.format(git_main_dags_directory, git_branch)
get_branch_hash = BashOperator(task_id='get_last_branch_version', bash_command=bash_command_get_branch_hash, xcom_push=True)
""" Save HASH version into Airflow Variable"""
save_branch_hash = PythonOperator(task_id='save_last_branch_version', python_callable=save_branch_hash, provide_context=True)
""" DAG Flow """
sensor_dir_exists >> validate_git_folder >> [git_clone, git_pull] >> finished_pulling >> [ git_sync_dags, git_sync_plugins ] >> get_branch_hash >> save_branch_hashChange Paths:
home_directory = os.environ['AIRFLOW_HOME']
dags_directory = os.environ['AIRFLOW_HOME']+"/dags/"
plugins_directory = os.environ['AIRFLOW_HOME']+"/plugins/"

Testing it

Get history of git branch hashes

Notes

  • New Dags will take a while to appear in the DAGS list, this depends on the dag_dir_list_interval configuration.
  • For some reason, Airflow takes longer to notice changes to plugins, but in my experience they take about 1 or 2 minutes to be refreshed in Airflow.
  • This DAG is great for staging and production environments, but it’s a real pain for local changes, so when working locally, make sure to have this DAG turned off or else it will be overwriting your local changes every minute.
  • The Sensor is checking for new changes every 60 seconds, this can be changed in line 81 poke_interval=60 .
  • The Sensor will fail after 7 days of constantly working (I guess it’s an Airlfow task timeout setting) but it will immediately run again sensing for git changes.
  • Have in mind that a sensor always running means that 1 task is always running, hence, if your Airflow configuration is set to run 16 tasks simultaneously, you’ll always be using 1 so now you’ll have 15 task slots available.
    This can be easily fixed changing the dag_concurrency configuration: dag_concurrency = 17

--

--

--

Data Engineer

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Applying Builder Pattern for Sign in process (User-Password, Facebook, Google)

How I came up with the idea of ExternalCronJobs.com — The External Cron Jobs System 🚀

Serializers in Rails

Improving web performance by downloading the image efficiently.

PVS-Studio Team: Switching to Clang Improved PVS-Studio C++ Analyzer’s Performance

WHAT TOOLS DO I USE AS MULESOFT DEVELOPER

Open sourcing rpc_ts, an RPC framework for TypeScript

Why I love Java

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ender Frias

Ender Frias

Data Engineer

More from Medium

Designing nRT CDC Fw for Hadoop Data Lake — Part 1

Migrating Transactional Data to MongoDB in AWS with the Zaloni Arena Data Governance Platform

Minimize the cost of your AWS batch data pipelines with serverless and ephemeral compute

Distributed Database Systems