Airflow で 過去の Task への依存を表す

airflowetl

定期的な集計の際に過去の集計値が必要になることがあるが、そのようなワークフローを単純に定期実行すると、処理が間に合わなかったり失敗した際にそれ以後の処理が連鎖的に失敗してしまう。 Airflow では次の方法で過去の Task への依存を記述することができ、これによって過去の集計が終わるのを待ったり、失敗時に依存しているものだけをまとめて再実行することができる。

BaseOperator.depends_on_past

True にすると過去分の同 Task が完了するまで実行されなくなる。

start_date を過去にして DAG を有効にすると現在までの時間の分が一斉に catchup されるので、 時間がかかる task3 の実行が保留されていることが確認しやすい。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(
  'test_depends_on_past',
  start_date=days_ago(1),
  default_args={
    'depends_on_past': True,
  },
  schedule="@hourly",
  catchup=True
) as dag:

  t1 = DummyOperator(task_id='task1')
  t2 = DummyOperator(task_id='task2')
  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

ExternalTaskSensor と ExternalTaskMarker

過去の他の Task の完了を待つ場合は ExternalTaskSensor を用いる。 異なる DAG の Task を指定することもできる。

ExternalTaskMarker は Status が Recursive で Clear された際に対象の Task の Status も Clear することで併せて再実行されるようにする。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor, ExternalTaskMarker

with DAG(
  'test_external_task',
  start_date=days_ago(1),
  schedule="@hourly",
  catchup=True
) as dag:

  t1 = DummyOperator(task_id='task1')
  t2 = DummyOperator(task_id='task2')
  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

  wait_past_t3 = ExternalTaskSensor(
    task_id="wait_past_task3",
    external_dag_id='{{ dag.dag_id }}',
    external_task_id=t3.task_id,
    execution_delta=timedelta(hours=1),
    timeout=600,
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    mode="reschedule",
  )

  clear_future_wait_past_t3 = ExternalTaskMarker(
    task_id="clear_future_wait_past_task3",
    external_dag_id='{{ dag.dag_id }}',
    external_task_id=wait_past_t3.task_id,
    execution_date='{{ next_execution_date }}'
  )

  t1 >> [t2, t3]
  wait_past_t3 >> t2
  t3 >> clear_future_wait_past_t3

実行が進まない場合

実行している runs の数が core.max_active_runs_per_dag に当たると それ以外の runs が running にならないため、次のように上限に当たった状態で依存先の runs を clear するとタイムアウトまで待ち続けることになってしまう。 このような状態になってしまった場合は runs を clear するなどして空けると実行される。

あるいは設定を変更することもできて、そうする場合は併せて priority_weight や weight_rule を設定して Task の実行順序をコントロールしても良いかもしれない。