У меня в опыте три развертывания Apache Airflow в продакшн, и я ни разу не находил в Гугле способа хранить DAG-файлы в git и деплоить их в Airflow. Пришлось изобретать свой.
Цель #
Дать возможность разработчикам редактировать DAG-файлы, зависимости и т.д. локально на своих компьютерах, пушить код в git-репозиторий, и через несколько минут получать эти изменения в Airflow.
Так же передо мной стояла задача хранить конфигурацию Apache Airflow в git.
Дисклаймер
В этой статье я не рассматриваю способы тестирования DAG-файлов перед выкаткой на прод, хотя это очень важно.
У Airflow есть документация, в которой описаны различные best practices, включая способы тестирования.
Условия #
У меня установлен Apache Airflow согласно прекрасной документации. В моем случае я не использую Airflow в Docker, а запускаю его на виртуальной машине через systemd.
Что нужно #
- git-репозиторий в котором мы будем хранить наш код и пайплайн
- CI/CD система (GitLab CI, GitHub Actions, Drone CI, тысячи их)
- Настроенный runner для вашей CI/CD системы
- Установленный Ansible, вместе с модулем ansible.posix.synchronize на машине с runner-ом, и установленный rsync на обоих машинах
Готовим репозиторий #
Создал директорию dags
. В ней я храню все DAG-и, дополнительные файлы, например, скрипты. Вообще что угодно — это всё будет деплоиться в наш инстанс Airflow.
Создал в корне репозитория файлы:
- connections.json — здесь мы храним необходимые подключения для Airflow в следующем виде:
{
"test_connection": {
"conn_type": "postgres",
"description": "Тестовая БД",
"host": "localhost",
"login": "dev_ro",
"password": "super_password",
"schema": "mark",
"port": 5432,
"extra": "{\"cursor\": \"dictcursor\"}"
}
}
- requirements.txt — зависимости для Python, которые нужны для запуска DAG-ов. Записываю в стандартном для Python виде:
apache-airflow-providers-postgres==2.0.0
pytz==2021.1
pytzdata==2020.1
- variables.json — переменные, которые Airflow будет использовать в работе:
{
"my_var": "test",
"your_var": "hello"
}
Еще один дисклаймер:
я не рассматриваю сейчас способы хранения sensitive data в переменных и подключениях. Для этого нужно использовать немного другой способ. Например — шифровать файлы с помощью Ansible Vault.
- airflow.cfg — конфигурация Apache Airflow. По умолчанию можно записать данные из актуального файла
/etc/airflow/airflow.cfg
Плейбук #
Деплой я настроил с помощью любимого Ansible.
В корневой директории репозитория создал директорию ansible
, а в ней файл ansible.cfg
:
[defaults]
host_key_checking=False
ansible_ssh_private_key_file=~/.ssh/id_rsa
ansible_python_interpreter=/usr/bin/python3
callbacks_enabled = profile_tasks
forks = 25
Там же создал файл hosts.yml
и в нем указал мой хост с Airflow:
all:
children:
airflow:
hosts:
airflow.your.domain:
Теперь переходим к нашему плейбуку. У меня на сайте есть несколько статей по базовому Ansible, рекомендую посмотреть: первая статья, вторая статья.
Создал файл playbook.yml
и начал заполнять:
- name: Airflow deploy playbook
hosts: airflow
gather_facts: false
become: true
vars:
src_path: ".."
dst_path: /etc/airflow
airflow_home_env:
AIRFLOW_HOME: /etc/airflow
DAG #
Переходим к таскам. Их я разделил тегами, чтобы была возможность запускать разные команды из одного плейбука:
# SYNC
- name: Sync DAGs directory
ansible.posix.synchronize:
src: '{{ src_path }}/dags/'
dest: '{{ dst_path }}/dags/'
delete: true
recursive: true
tags:
- sync
- name: Change files permissions
ansible.builtin.file:
path: '{{ dst_path }}'
state: directory
recurse: true
owner: airflow
group: airflow
tags:
- sync
Опишу что тут происходит. В первом таске я использую модуль ansible.posix.sychronize
, который под капотом запускает rsync. Синхронизирую директорию ../dags
в директории с репозиторием и директорию /etc/airflow/dags
на машине с Airflow.
Я указываю директиву delete: true
, чтобы удалять те файлы, что были удалены из репозитория, и директиву recursive: true
, чтобы синхронизировать так же директории внутри dags
.
Во втором таске я проставляю корректные права на файлы.
Зависимости #
# Install requirements
- name: Copy requirements.txt file
ansible.builtin.copy:
src: "{{ src_path }}/requirements.txt"
dest: "{{ dst_path }}/"
owner: airflow
group: airflow
mode: 0775
tags:
- requirements
- name: Install requirements python modules
ansible.builtin.pip:
requirements: "{{ dst_path }}/requirements.txt"
tags:
- requirements
register: pip_output
become_user: airflow
- name: Print install output
ansible.builtin.debug:
var: pip_output.stdout_lines
tags:
- requirements
В этом блоке я копирую файл с зависимостями, запускаю его установку и вывожу результат.
Переменные #
# Import variables
- name: Copy variables.json file
ansible.builtin.copy:
src: "{{ src_path }}/variables.json"
dest: "{{ dst_path }}/"
owner: airflow
group: airflow
mode: 0775
tags:
- variables
- name: Import variables
ansible.builtin.shell:
cmd: /home/airflow/.local/bin/airflow variables import variables.json
chdir: '{{ dst_path }}'
become_user: airflow
tags:
- variables
environment: "{{ airflow_home_env }}"
register: variables_import_output
- name: Print variables import output
ansible.builtin.debug:
var: variables_import_output.stdout_lines
tags:
- variables
Первым делом я, уже стандартно, копирую файл с переменными. Затем совершенно некрасивый, но необходимый этап — bashsinble — запускаю команду импорта переменных, указывая chdir, пользователя airflow
и переменной AIRFLOW_HOME
.
Подключения #
Импорт подключений выглядит абсолютно так же как и импорт переменных. Меняется только тэг и команда.
Деплой в GitHub Actions #
В GHA мы храним пайплайны(workflows) в директории .github/workflows
. У меня она выглядит так:
> tree .github
.github
└── workflows
├── airflow_cfg.yml
├── connections.yml
├── dags.yml
├── requirements.yml
└── variables.yml
Разделил все на отдельные файлы, чтобы была возможность запускать их по отдельности вручную, если понадобится. Что в dags.yml
:
name: Deploy DAG files
on:
workflow_dispatch:
push:
branches:
- master
paths:
- 'dags/**'
jobs:
sync-dags:
runs-on: self-hosted
steps:
- name: Checkout repo
uses: actions/checkout@v3
- name: Run Common playbook
env:
PY_COLORS: '1'
ANSIBLE_FORCE_COLOR: '1'
run: |
cd ansible && \
ansible-playbook -i hosts.yml airflow-deploy.yml \
--tags sync
Разберу по частям:
workflow_dispatch:
позволяет запускать пайплайны вручную из меню Actions, а так же запускать их не из master ветки.branch: master
иpaths:
запустит пайплайн только в случае коммита в master ветку и при изменении в указанных файлахjobs:
указываю, что запускаться мы будем на self-hosted раннерах. Запускаю модуль Checkout для скачивания репозитория и запускаю плейбук с определенным тегом.env:
передаю несколько переменных, например для включения цветного вывода в GitHub
Остальные workflows выглядят абсолютно так же, меняется только название, директория в paths: и тэг в ansible.
Так же у меня в репозиторий добавлен скрипт для уведомлений о деплоях в Slack, который я вызываю после выполнения плейбука в зависимости от результата:
- name: Send OK branch notify
if: ${{ github.ref != 'refs/heads/master' && success() }}
run: python3 notify.py deploy_branch
- name: Send OK notify
if: ${{ github.ref == 'refs/heads/master' && success() }}
run: python3 notify.py deploy_ok
- name: Send FAIL notify
if: failure()
run: python3 notify.py deploy_failed
Итог #
Airflow периодически проверяет наличие новых файлов и изменения в старых. Интервал проверки настраивается в конфигурационном файле.
Теперь я могу запускать пайплайны вручную из вкладки Actions:
Разработчики коммитят код в ветки, делают Pull Requests, ревьювят свой код. Все довольны.
📱 Оставить комментарий к посту или задать любой вопрос ты можешь в моем Telegram-канале:
🇬🇧English version on Medium.