Apache Airflow is an OSS that schedules workflows and visualize pipelines. It is scalable and has abundant features. Also, it can be extended with your own Operators in addition to third-party, such as AWS and Slack, providers packages existing in the repository.
Run Airflow
Download docker-compose.yaml.
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.3/docker-compose.yaml'
$ cat docker-compose.yaml
...
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
...
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
...
services:
postgres:
image: postgres:13
...
redis:
image: redis:latest
...
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
...
airflow-scheduler:
<<: *airflow-common
command: scheduler
...
airflow-worker:
<<: *airflow-common
command: celery worker
...
airflow-triggerer:
<<: *airflow-common
command: triggerer
...
airflow-init:
<<: *airflow-common
...
airflow-cli:
<<: *airflow-common
profiles:
- debug
...
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
...
volumes:
postgres-db-volume:
First, run “docker compose up airflow-init” to initialize data stores.
$ mkdir airflow-test; cd airflow-test
$ docker compose up airflow-init
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
00584b9d2f4e postgres:13 "docker-entrypoint.s…" 34 seconds ago Up 33 seconds (healthy) 5432/tcp airflow-test-postgres-1
2bb37e1f7221 redis:latest "docker-entrypoint.s…" 34 seconds ago Up 33 seconds (healthy) 6379/tcp airflow-test-redis-1
Run “docker-compose up”, and then other components are started.
$ docker-compose up
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8f066461ed75 apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-triggerer-1
15f3c0cab1e9 apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 0.0.0.0:8080->8080/tcp airflow-test-airflow-webserver-1
da02685c233f apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-worker-1
4be546b0869e apache/airflow:2.4.3 "/usr/bin/dumb-init …" About a minute ago Up About a minute (healthy) 8080/tcp airflow-test-airflow-scheduler-1
00584b9d2f4e postgres:13 "docker-entrypoint.s…" 2 minutes ago Up 2 minutes (healthy) 5432/tcp airflow-test-postgres-1
2bb37e1f7221 redis:latest "docker-entrypoint.s…" 2 minutes ago Up 2 minutes (healthy) 6379/tcp airflow-test-redis-1
$ open http://localhost:8080
Both username and password of web interface are “airflow”.
Define a workflow
Construct DAG by expressing dependencies between tasks, which are execution units, with >>. Multiple dependencies can be expressed in one line by connecting task and a list, but lists cannot be connected together, so in that case write them separately as follows.
task1 >> [task2, task3] # ok
[task1, task2] >> task3 # ok
[task1, task2] >> [task3 >> task4] # ng
# ok
task1 >> [task3, task4]
task2 >> [task3, task4]
Values can be passed between tasks with pushing to / pulling from XComs. The default value of do_xcom_push for BashOperator is True same as BaseOperator, so if nothing is passed, the last line of stdout will be pushed.
$ cat dags/test.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'test',
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Test DAG',
start_date=datetime(2022, 11, 17),
) as dag:
t1 = BashOperator(
task_id='task1',
bash_command='date',
)
t2 = BashOperator(
task_id='task2',
bash_command='echo current time is $AAAA',
env={
"AAAA": '{{ ti.xcom_pull(task_ids="task1") }}',
},
)
t3 = BashOperator(
task_id='task3',
bash_command='sleep 20',
)
t1 >> [t2, t3]
Place it in the mounted dags directory, and then it is imported.
Running it and looking at logs, you can confirm the value is passed between Tasks.