When processing a task in an EMR cluster, add a Step to the EMR cluster with EmrAddStepsOperator, and then wait for its execution to end with EmrStepSensor. When the Step fails, only the Sensor fails, so there is a problem that the Step is not re-executed even if it is retried.
If you put processes consisting of multiple Tasks in a TaskGroup, it can be grouped on the UI, but it is currently not supported to set a retry like a Task. Besides, SubDAG has problems in terms of the performance etc., so it has been already deprecated. Therefore, I retry them with Callback that is called when Task fails etc.
The following object is passed to the Callback.
# def on_failure(context):
# print(context)
{
'conf': <***.configuration.AirflowConfigParser object at 0xffffa02942d0>,
'dag': <DAG: test_callback>,
'dag_run': <DagRun test_callback @ 2022-12-18 03:11:29.683523+00:00: manual__2022-12-18T03:11:29.683523+00:00, state:running, queued_at: 2022-12-18 03:11:29.759660+00:00. externally triggered: True>,
'data_interval_end': DateTime(2022, 12, 18, 3, 0, 0, tzinfo=Timezone('UTC')),
'data_interval_start': DateTime(2022, 12, 18, 2, 0, 0, tzinfo=Timezone('UTC')),
'ds': '2022-12-18',
'ds_nodash': '20221218',
'execution_date': DateTime(2022, 12, 18, 3, 11, 29, 683523, tzinfo=Timezone('UTC')),
'inlets': [],
'logical_date': DateTime(2022, 12, 18, 3, 11, 29, 683523, tzinfo=Timezone('UTC')),
'macros': <module '***.macros' from '/home/***/.local/lib/python3.7/site-packages/***/macros/__init__.py'>,
'next_ds': '2022-12-18',
'next_ds_nodash': '20221218',
'next_execution_date': DateTime(2022, 12, 18, 3, 11, 29, 683523, tzinfo=Timezone('UTC')),
'outlets': [],
'params': {},
'prev_data_interval_start_success': DateTime(2022, 12, 18, 2, 0, 0, tzinfo=Timezone('UTC')),
'prev_data_interval_end_success': DateTime(2022, 12, 18, 3, 0, 0, tzinfo=Timezone('UTC')),
'prev_ds': '2022-12-18',
'prev_ds_nodash': '20221218',
'prev_execution_date': DateTime(2022, 12, 18, 3, 11, 29, 683523, tzinfo=Timezone('UTC')),
'prev_execution_date_success': DateTime(2022, 12, 18, 3, 0, 9, 505185, tzinfo=Timezone('UTC')),
'prev_start_date_success': DateTime(2022, 12, 18, 3, 0, 9, 577917, tzinfo=Timezone('UTC')),
'run_id': 'manual__2022-12-18T03:11:29.683523+00:00',
'task': <Task(BashOperator): task2>,
'task_instance': <TaskInstance: test_callback.task2 manual__2022-12-18T03:11:29.683523+00:00 [failed]>,
'task_instance_key_str': 'test_callback__task2__20221218',
'test_mode': False,
'ti': <TaskInstance: test_callback.task2 manual__2022-12-18T03:11:29.683523+00:00 [failed]>,
'tomorrow_ds': '2022-12-19',
'tomorrow_ds_nodash': '20221219',
'triggering_dataset_events': <Proxy at 0xffff8b9e2a50 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0xffff8ba4b950>>,
'ts': '2022-12-18T03:11:29.683523+00:00',
'ts_nodash': '20221218T031129',
'ts_nodash_with_tz': '20221218T031129.683523+0000',
'var': {'json': None, 'value': None},
'conn': None,
'yesterday_ds': '2022-12-17',
'yesterday_ds_nodash': '20221217',
'exception': AirflowException('Bash command failed. The command returned a non-zero exit code 1.')}
The following DAG has the second Task that can fail but even if it is retried, the result doesn’t change like the EMR example. Updating the state of the upstream Task when the downstream Task retries/fails can retry/fail them like one Task.
from datetime import timedelta
from airflow import DAG
from airflow.utils.state import State
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
def retry_upstream(context):
tasks = context["dag_run"].get_task_instances()
for task in tasks:
if task.task_id in context["task"].upstream_task_ids:
task.set_state(State.UP_FOR_RETRY)
def fail_upstream(context):
tasks = context["dag_run"].get_task_instances()
for task in tasks:
if task.task_id in context["task"].upstream_task_ids:
task.set_state(State.FAILED)
with DAG(
'test_callback',
start_date=days_ago(1),
default_args={
'retries': 1,
'retry_delay': timedelta(seconds=10),
'retry_exponential_backoff': True
}
) as dag:
t1 = BashOperator(
task_id='task1',
bash_command='echo $(expr $RANDOM % 3)',
)
t2 = BashOperator(
task_id='task2',
bash_command='exit {{ti.xcom_pull(task_ids="task1")}}',
on_retry_callback=retry_upstream,
on_failure_callback=fail_upstream,
)
t1 >> t2
For EMR, DescribeStep may fail with Rate exceeded, so it is good to decide whether to re-execute the first Task depending on the error content.
should_readd_task_to_emr = 'ThrottlingException' in str(context['exception'])
References
Bet you didn’t know this about Airflow! | by Jyoti Dhiman | Towards Data Science