Перейти к основному содержимому

Apache Airflow: деплой DAG файлов из git

·1037 слов·5 минут
Гайды
Юрий Семеньков
Автор
Юрий Семеньков
DevOps, tech, geek, mentor
Оглавление

У меня в опыте три развертывания Apache Airflow в продакшн, и я ни разу не находил в Гугле способа хранить DAG-файлы в git и деплоить их в Airflow. Пришлось изобретать свой.

Цель
#

Дать возможность разработчикам редактировать DAG-файлы, зависимости и т.д. локально на своих компьютерах, пушить код в git-репозиторий, и через несколько минут получать эти изменения в Airflow.

Так же передо мной стояла задача хранить конфигурацию Apache Airflow в git.

Дисклаймер

В этой статье я не рассматриваю способы тестирования DAG-файлов перед выкаткой на прод, хотя это очень важно.
У Airflow есть документация, в которой описаны различные best practices, включая способы тестирования.

Условия
#

У меня установлен Apache Airflow согласно прекрасной документации. В моем случае я не использую Airflow в Docker, а запускаю его на виртуальной машине через systemd.

Что нужно
#

  • git-репозиторий в котором мы будем хранить наш код и пайплайн
  • CI/CD система (GitLab CI, GitHub Actions, Drone CI, тысячи их)
  • Настроенный runner для вашей CI/CD системы
  • Установленный Ansible, вместе с модулем ansible.posix.synchronize на машине с runner-ом, и установленный rsync на обоих машинах

Готовим репозиторий
#

Создал директорию dags. В ней я храню все DAG-и, дополнительные файлы, например, скрипты. Вообще что угодно — это всё будет деплоиться в наш инстанс Airflow.

Создал в корне репозитория файлы:

  • connections.json — здесь мы храним необходимые подключения для Airflow в следующем виде:
{
    "test_connection": {
        "conn_type": "postgres",
        "description": "Тестовая БД",
        "host": "localhost",
        "login": "dev_ro",
        "password": "super_password",
        "schema": "mark",
        "port": 5432,
        "extra": "{\"cursor\": \"dictcursor\"}"
      }
}
  • requirements.txt — зависимости для Python, которые нужны для запуска DAG-ов. Записываю в стандартном для Python виде:
apache-airflow-providers-postgres==2.0.0
pytz==2021.1
pytzdata==2020.1
  • variables.json — переменные, которые Airflow будет использовать в работе:
{
  "my_var": "test",
  "your_var": "hello"
}

Еще один дисклаймер:

я не рассматриваю сейчас способы хранения sensitive data в переменных и подключениях. Для этого нужно использовать немного другой способ. Например — шифровать файлы с помощью Ansible Vault.

  • airflow.cfg — конфигурация Apache Airflow. По умолчанию можно записать данные из актуального файла /etc/airflow/airflow.cfg

Плейбук
#

Деплой я настроил с помощью любимого Ansible.

В корневой директории репозитория создал директорию ansible, а в ней файл ansible.cfg:

[defaults]
host_key_checking=False
ansible_ssh_private_key_file=~/.ssh/id_rsa
ansible_python_interpreter=/usr/bin/python3
callbacks_enabled = profile_tasks
forks = 25

Там же создал файл hosts.yml и в нем указал мой хост с Airflow:

all:
  children:
    airflow:
      hosts:
        airflow.your.domain:

Теперь переходим к нашему плейбуку. У меня на сайте есть несколько статей по базовому Ansible, рекомендую посмотреть: первая статья, вторая статья.

Создал файл playbook.yml и начал заполнять:

- name: Airflow deploy playbook
  hosts: airflow
  gather_facts: false
  become: true

  vars:
    src_path: ".."
    dst_path: /etc/airflow
    airflow_home_env:
      AIRFLOW_HOME: /etc/airflow

DAG
#

Переходим к таскам. Их я разделил тегами, чтобы была возможность запускать разные команды из одного плейбука:

# SYNC
    - name: Sync DAGs directory
      ansible.posix.synchronize:
        src: '{{ src_path }}/dags/'
        dest: '{{ dst_path }}/dags/'
        delete: true
        recursive: true
      tags:
        - sync
        
    - name: Change files permissions
      ansible.builtin.file:
        path: '{{ dst_path }}'
        state: directory
        recurse: true
        owner: airflow
        group: airflow
      tags:
        - sync

Опишу что тут происходит. В первом таске я использую модуль ansible.posix.sychronize, который под капотом запускает rsync. Синхронизирую директорию ../dags в директории с репозиторием и директорию /etc/airflow/dags на машине с Airflow.

Я указываю директиву delete: true, чтобы удалять те файлы, что были удалены из репозитория, и директиву recursive: true, чтобы синхронизировать так же директории внутри dags.

Во втором таске я проставляю корректные права на файлы.

Зависимости
#

# Install requirements
    - name: Copy requirements.txt file
      ansible.builtin.copy:
        src: "{{ src_path }}/requirements.txt"
        dest: "{{ dst_path }}/"
        owner: airflow
        group: airflow
        mode: 0775
      tags:
        - requirements
        
    - name: Install requirements python modules
      ansible.builtin.pip:
        requirements: "{{ dst_path }}/requirements.txt"
      tags:
        - requirements
      register: pip_output
      become_user: airflow
      
    - name: Print install output
      ansible.builtin.debug:
        var: pip_output.stdout_lines
      tags:
        - requirements

В этом блоке я копирую файл с зависимостями, запускаю его установку и вывожу результат.

Переменные
#

# Import variables
    - name: Copy variables.json file
      ansible.builtin.copy:
        src: "{{ src_path }}/variables.json"
        dest: "{{ dst_path }}/"
        owner: airflow
        group: airflow
        mode: 0775
      tags:
        - variables
        
    - name: Import variables
      ansible.builtin.shell:
        cmd: /home/airflow/.local/bin/airflow variables import variables.json
        chdir: '{{ dst_path }}'
      become_user: airflow
      tags:
        - variables
      environment: "{{ airflow_home_env }}"
      register: variables_import_output
      
    - name: Print variables import output
      ansible.builtin.debug:
        var: variables_import_output.stdout_lines
      tags:
        - variables

Первым делом я, уже стандартно, копирую файл с переменными. Затем совершенно некрасивый, но необходимый этап — bashsinble — запускаю команду импорта переменных, указывая chdir, пользователя airflow и переменной AIRFLOW_HOME.

Подключения
#

Импорт подключений выглядит абсолютно так же как и импорт переменных. Меняется только тэг и команда.

Деплой в GitHub Actions
#

В GHA мы храним пайплайны(workflows) в директории .github/workflows. У меня она выглядит так:

> tree .github
.github
└── workflows
    ├── airflow_cfg.yml
    ├── connections.yml
    ├── dags.yml
    ├── requirements.yml
    └── variables.yml

Разделил все на отдельные файлы, чтобы была возможность запускать их по отдельности вручную, если понадобится. Что в dags.yml:

name: Deploy DAG files

on:
  workflow_dispatch:
  push:
    branches:
      - master
    paths:
      - 'dags/**'
  
jobs:
  sync-dags:
    runs-on: self-hosted
    steps:
      - name: Checkout repo
        uses: actions/checkout@v3
      - name: Run Common playbook
        env:
          PY_COLORS: '1'
          ANSIBLE_FORCE_COLOR: '1'
        run: |
          cd ansible && \
          ansible-playbook -i hosts.yml airflow-deploy.yml \
          --tags sync          

Разберу по частям:

  • workflow_dispatch: позволяет запускать пайплайны вручную из меню Actions, а так же запускать их не из master ветки.
  • branch: master и paths: запустит пайплайн только в случае коммита в master ветку и при изменении в указанных файлах
  • jobs: указываю, что запускаться мы будем на self-hosted раннерах. Запускаю модуль Checkout для скачивания репозитория и запускаю плейбук с определенным тегом.
  • env: передаю несколько переменных, например для включения цветного вывода в GitHub

Остальные workflows выглядят абсолютно так же, меняется только название, директория в paths: и тэг в ansible.

Так же у меня в репозиторий добавлен скрипт для уведомлений о деплоях в Slack, который я вызываю после выполнения плейбука в зависимости от результата:

      - name: Send OK branch notify
        if: ${{ github.ref != 'refs/heads/master' && success() }}
        run: python3 notify.py deploy_branch
        
      - name: Send OK notify
        if: ${{ github.ref == 'refs/heads/master' && success() }}
        run: python3 notify.py deploy_ok
        
      - name: Send FAIL notify
        if: failure()
        run: python3 notify.py deploy_failed

Итог
#

Airflow периодически проверяет наличие новых файлов и изменения в старых. Интервал проверки настраивается в конфигурационном файле.

Теперь я могу запускать пайплайны вручную из вкладки Actions:

Разработчики коммитят код в ветки, делают Pull Requests, ревьювят свой код. Все довольны.

📱 Оставить комментарий к посту или задать любой вопрос ты можешь в моем Telegram-канале:

@etogeek

🇬🇧English version on Medium.

Related

Перенос git репозитория
·135 слов·1 минута
Софт Гайды
Как посмотреть внешний IP в Linux
·126 слов·1 минута
Гайды
caffeinate — macbook, не спи
·210 слов·1 минута
Гайды