Apache Airflowはワークフローのスケジューリングやパイプラインの可視化などを行うOSS。 スケーラブルで豊富な機能を持ち、 リポジトリに含まれている AWS や Slack といったサードパーティの Providers packages に加えて、 自前の Operator を実装して拡張できるようになっている。
Airflow の起動
docker-compose.yaml をダウンロードしてくる。
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.3/docker-compose.yaml'
$ cat docker-compose.yaml
...
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
...
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
...
services:
postgres:
image: postgres:13
...
redis:
image: redis:latest
...
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
...
airflow-scheduler:
<<: *airflow-common
command: scheduler
...
airflow-worker:
<<: *airflow-common
command: celery worker
...
airflow-triggerer:
<<: *airflow-common
command: triggerer
...
airflow-init:
<<: *airflow-common
...
airflow-cli:
<<: *airflow-common
profiles:
- debug
...
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
...
volumes:
postgres-db-volume:
まず docker compose up airflow-init するとデータストアの初期化が行われ、
$ mkdir airflow-test; cd airflow-test
$ docker compose up airflow-init
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
00584b9d2f4e postgres:13 "docker-entrypoint.s…" 34 seconds ago Up 33 seconds (healthy) 5432/tcp airflow-test-postgres-1
2bb37e1f7221 redis:latest "docker-entrypoint.s…" 34 seconds ago Up 33 seconds (healthy) 6379/tcp airflow-test-redis-1
docker-compose up すると残りのコンポーネントが立ち上がる。
$ docker-compose up
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8f066461ed75 apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-triggerer-1
15f3c0cab1e9 apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 0.0.0.0:8080->8080/tcp airflow-test-airflow-webserver-1
da02685c233f apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-worker-1
4be546b0869e apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-scheduler-1
00584b9d2f4e postgres:13 "docker-entrypoint.s…" 2 minutes ago Up 2 minutes (healthy) 5432/tcp airflow-test-postgres-1
2bb37e1f7221 redis:latest "docker-entrypoint.s…" 2 minutes ago Up 2 minutes (healthy) 6379/tcp airflow-test-redis-1
$ open http://localhost:8080
Web interface には Username, Password 共に airflow でログインできる。
ワークフローの定義
処理の単位である Task を >> で繋げることで依存関係を表現して DAG を構築していく。 Task と配列を繋げることで1行で複数の依存を表現できるが、配列同士を繋ぐことはできないのでその場合は次のように分けて書く。
task1 >> [task2, task3] # ok
[task1, task2] >> task3 # ok
[task1, task2] >> [task3 >> task4] # ng
# ok
task1 >> [task3, task4]
task2 >> [task3, task4]
Task 間の値の受け渡しは XComs への push/pull で行う。 BashOperator の do_xcom_push のデフォルト値は BaseOperator と同じく True なので、何も渡さなければ標準出力の最後の行が push される。
$ cat dags/test.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'test',
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Test DAG',
start_date=datetime(2022, 11, 17),
) as dag:
t1 = BashOperator(
task_id='task1',
bash_command='date',
)
t2 = BashOperator(
task_id='task2',
bash_command='echo current time is $AAAA',
env={
"AAAA": '{{ ti.xcom_pull(task_ids="task1") }}',
},
)
t3 = BashOperator(
task_id='task3',
bash_command='sleep 20',
)
t1 >> [t2, t3]
マウントされている dags ディレクトリに置くと Import される。
実行してログを見ると Task 間で値が受け渡されていることが確認できる。