Express dependencies on past tasks in Airflow

airflowetl

If past aggregated values are required for periodic aggregation and such a workflow is simply executed periodically, subsequent processing will fail in a chain reaction when the processing fails or hasn’t been completed in time. Airflow allows you to describe dependencies on past tasks in the following way. This makes it possible to wait for the past aggregation to finish or to re-execute only dependent tasks collectively in the event of failure.

BaseOperator.depends_on_past

If set to True, the task won’t be executed until the same Task in the past is completed.

If you set the start_date to the past and enable the DAG, all the time up to the present will be catchuped, so it is easy to see that task3, which takes a long time, is pending execution.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(
  'test_depends_on_past',
  start_date=days_ago(1),
  default_args={
    'depends_on_past': True,
  },
  schedule="@hourly",
  catchup=True
) as dag:

  t1 = DummyOperator(task_id='task1')
  t2 = DummyOperator(task_id='task2')
  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

ExternalTaskSensor and ExternalTaskMarker

You can use ExternalTaskSensor to wait for the completion of another task in the past. This is also supporting a task in another DAG.

When ExternalTaskMarker is cleared with recursive, it clears the status of the target task, and it is re-executed.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor, ExternalTaskMarker

with DAG(
  'test_external_task',
  start_date=days_ago(1),
  schedule="@hourly",
  catchup=True
) as dag:

  t1 = DummyOperator(task_id='task1')
  t2 = DummyOperator(task_id='task2')
  t3 = BashOperator(
    task_id='task3',
    bash_command='sleep 20',
  )

  wait_past_t3 = ExternalTaskSensor(
    task_id="wait_past_task3",
    external_dag_id='{{ dag.dag_id }}',
    external_task_id=t3.task_id,
    execution_delta=timedelta(hours=1),
    timeout=600,
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    mode="reschedule",
  )

  clear_future_wait_past_t3 = ExternalTaskMarker(
    task_id="clear_future_wait_past_task3",
    external_dag_id='{{ dag.dag_id }}',
    external_task_id=wait_past_t3.task_id,
    execution_date='{{ next_execution_date }}'
  )

  t1 >> [t2, t3]
  wait_past_t3 >> t2
  t3 >> clear_future_wait_past_t3

If tasks does not progress

If the number of active runs reaches to core.max_active_runs_per_dag, other runs do not become running, so if you clear the dependent runs when the upper limit is reached as follows, they will wait until the timeout. When this happens, you need to clear active runs to execute new ones.

Or you can change the settings. If you do that, you might be better to control order which attaches worker slots to tasks with priority_weight and weight_rule.