Run Apache Airflow with Docker Compose and execute a workflow

airflowdockeretl

Apache Airflow is an OSS that schedules workflows and visualize pipelines. It is scalable and has abundant features. Also, it can be extended with your own Operators in addition to third-party, such as AWS and Slack, providers packages existing in the repository.

Run Airflow

Download docker-compose.yaml.

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.3/docker-compose.yaml'
$ cat docker-compose.yaml
...
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
  ...
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  ...

services:
  postgres:
    image: postgres:13
    ...

  redis:
    image: redis:latest
    ...

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    ...

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    ...

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    ...

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    ...

  airflow-init:
    <<: *airflow-common
    ...

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    ...

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    ...

volumes:
  postgres-db-volume:

First, run “docker compose up airflow-init” to initialize data stores.

$ mkdir airflow-test; cd airflow-test 
$ docker compose up airflow-init
$ docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED          STATUS                    PORTS      NAMES
00584b9d2f4e   postgres:13    "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   5432/tcp   airflow-test-postgres-1
2bb37e1f7221   redis:latest   "docker-entrypoint.s…"   34 seconds ago   Up 33 seconds (healthy)   6379/tcp   airflow-test-redis-1

Run “docker-compose up”, and then other components are started.

$ docker-compose up
$  docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED              STATUS                        PORTS                    NAMES
8f066461ed75   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-triggerer-1
15f3c0cab1e9   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   0.0.0.0:8080->8080/tcp   airflow-test-airflow-webserver-1
da02685c233f   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-worker-1
4be546b0869e   apache/airflow:2.4.3   "/usr/bin/dumb-init …"   About a minute ago   Up About a minute (healthy)   8080/tcp                 airflow-test-airflow-scheduler-1
00584b9d2f4e   postgres:13            "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        5432/tcp                 airflow-test-postgres-1
2bb37e1f7221   redis:latest           "docker-entrypoint.s…"   2 minutes ago        Up 2 minutes (healthy)        6379/tcp                 airflow-test-redis-1

$ open http://localhost:8080 

Both username and password of web interface are “airflow”.

Define a workflow

Construct DAG by expressing dependencies between tasks, which are execution units, with >>. Multiple dependencies can be expressed in one line by connecting task and a list, but lists cannot be connected together, so in that case write them separately as follows.

task1 >> [task2, task3] # ok
[task1, task2] >> task3 # ok
[task1, task2] >> [task3 >> task4] # ng
# ok
task1 >> [task3, task4] 
task2 >> [task3, task4]

Values can be passed between tasks with pushing to / pulling from XComs. The default value of do_xcom_push for BashOperator is True same as BaseOperator, so if nothing is passed, the last line of stdout will be pushed.

$ cat dags/test.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
  'test',
  default_args={
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
  },
  description='Test DAG',
  start_date=datetime(2022, 11, 17),
) as dag:
  t1 = BashOperator(
    task_id='task1',
    bash_command='date',
  )

  t2 = BashOperator(
    task_id='task2',
    bash_command='echo current time is $AAAA',
    env={
      "AAAA": '{{ ti.xcom_pull(task_ids="task1") }}',
    },
  )

  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

  t1 >> [t2, t3]

Place it in the mounted dags directory, and then it is imported.

Running it and looking at logs, you can confirm the value is passed between Tasks.