Docker Compose で Apache Airflow を起動しワークフローを実行する

airflowdockeretl

Apache Airflowはワークフローのスケジューリングやパイプラインの可視化などを行うOSS。 スケーラブルで豊富な機能を持ち、 リポジトリに含まれている AWS や Slack といったサードパーティの Providers packages に加えて、 自前の Operator を実装して拡張できるようになっている。

Airflow の起動

docker-compose.yaml をダウンロードしてくる。

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.3/docker-compose.yaml'
$ cat docker-compose.yaml
...
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
  ...
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  ...

services:
  postgres:
    image: postgres:13
    ...

  redis:
    image: redis:latest
    ...

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    ...

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    ...

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    ...

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    ...

  airflow-init:
    <<: *airflow-common
    ...

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    ...

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    ...

volumes:
  postgres-db-volume:

まず docker compose up airflow-init するとデータストアの初期化が行われ、

$ mkdir airflow-test; cd airflow-test 
$ docker compose up airflow-init
$ docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED          STATUS                    PORTS      NAMES
00584b9d2f4e   postgres:13    "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   5432/tcp   airflow-test-postgres-1
2bb37e1f7221   redis:latest   "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   6379/tcp   airflow-test-redis-1

docker-compose up すると残りのコンポーネントが立ち上がる。

$ docker-compose up
$  docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED              STATUS                        PORTS                    NAMES
8f066461ed75   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-triggerer-1
15f3c0cab1e9   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   0.0.0.0:8080->8080/tcp   airflow-test-airflow-webserver-1
da02685c233f   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-worker-1
4be546b0869e   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-scheduler-1
00584b9d2f4e   postgres:13            "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        5432/tcp                 airflow-test-postgres-1
2bb37e1f7221   redis:latest           "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        6379/tcp                 airflow-test-redis-1

$ open http://localhost:8080 

Web interface には Username, Password 共に airflow でログインできる。

ワークフローの定義

処理の単位である Task を >> で繋げて DAG を構築していく。 Task 間の値の受け渡しは XComs への push/pull で行う。 BashOperator の do_xcom_push のデフォルト値は BaseOperator と同じく True なので、何も渡さなければ標準出力の最後の行が push される。

$ cat dags/test.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
  'test',
  default_args={
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
  },
  description='Test DAG',
  schedule=timedelta(minutes=1),
  start_date=datetime(2022, 11, 17),
) as dag:
  t1 = BashOperator(
    task_id='task1',
    bash_command='date',
    xcom_push=True,
  )

  t2 = BashOperator(
    task_id='task2',
    bash_command='echo current time is $AAAA',
    env={
      "AAAA": '{{ ti.xcom_pull(task_ids="task1") }}',
    },
  )

  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

  t1 >> [t2, t3]

マウントされている dags ディレクトリに置くと Import される。

実行してログを見ると Task 間で値が受け渡されていることが確認できる。