Docker Compose で Apache Airflow を起動しワークフローを実行する

airflowdockeretl

Apache Airflowはワークフローのスケジューリングやパイプラインの可視化などを行うOSS。 スケーラブルで豊富な機能を持ち、 リポジトリに含まれている AWS や Slack といったサードパーティの Providers packages に加えて、 自前の Operator を実装して拡張できるようになっている。

Airflow の起動

docker-compose.yaml をダウンロードしてくる。

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.3/docker-compose.yaml'
$ cat docker-compose.yaml
...
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
  ...
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  ...

services:
  postgres:
    image: postgres:13
    ...

  redis:
    image: redis:latest
    ...

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    ...

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    ...

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    ...

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    ...

  airflow-init:
    <<: *airflow-common
    ...

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    ...

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    ...

volumes:
  postgres-db-volume:

まず docker compose up airflow-init するとデータストアの初期化が行われ、

$ mkdir airflow-test; cd airflow-test 
$ docker compose up airflow-init
$ docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED          STATUS                    PORTS      NAMES
00584b9d2f4e   postgres:13    "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   5432/tcp   airflow-test-postgres-1
2bb37e1f7221   redis:latest   "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   6379/tcp   airflow-test-redis-1

docker-compose up すると残りのコンポーネントが立ち上がる。

$ docker-compose up
$  docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED              STATUS                        PORTS                    NAMES
8f066461ed75   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-triggerer-1
15f3c0cab1e9   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   0.0.0.0:8080->8080/tcp   airflow-test-airflow-webserver-1
da02685c233f   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-worker-1
4be546b0869e   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-scheduler-1
00584b9d2f4e   postgres:13            "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        5432/tcp                 airflow-test-postgres-1
2bb37e1f7221   redis:latest           "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        6379/tcp                 airflow-test-redis-1

$ open http://localhost:8080 

Web interface には Username, Password 共に airflow でログインできる。

ワークフローの定義

処理の単位である Task を >> で繋げることで依存関係を表現して DAG を構築していく。 Task と配列を繋げることで1行で複数の依存を表現できるが、配列同士を繋ぐことはできないのでその場合は次のように分けて書く。

task1 >> [task2, task3] # ok
[task1, task2] >> task3 # ok
[task1, task2] >> [task3 >> task4] # ng
# ok
task1 >> [task3, task4] 
task2 >> [task3, task4]

Task 間の値の受け渡しは XComs への push/pull で行う。 BashOperator の do_xcom_push のデフォルト値は BaseOperator と同じく True なので、何も渡さなければ標準出力の最後の行が push される。

$ cat dags/test.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
  'test',
  default_args={
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
  },
  description='Test DAG',
  start_date=datetime(2022, 11, 17),
) as dag:
  t1 = BashOperator(
    task_id='task1',
    bash_command='date',
  )

  t2 = BashOperator(
    task_id='task2',
    bash_command='echo current time is $AAAA',
    env={
      "AAAA": '{{ ti.xcom_pull(task_ids="task1") }}',
    },
  )

  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

  t1 >> [t2, t3]

マウントされている dags ディレクトリに置くと Import される。

実行してログを見ると Task 間で値が受け渡されていることが確認できる。