Airflow には外部サービスの認証情報を格納するオブジェクトとして Connections がある。
UI から登録するほか AIRFLOW_CONN_(conn_id) 環境変数で与えたり、
# export AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-*****@/'
Connection(
conn_id="slack_api_default",
conn_type="slack",
password="xoxb-*****",
)
# export AIRFLOW_CONN_SLACK_DEFAULT='slackwebhook://:T00000000%2FB00000000%2FXXXXXXXXXXXXXXXXXXXXXXXX@/'
Connection(
conn_id="slack_default",
conn_type="slackwebhook",
password="T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
)
AWS や Google Cloud の Secrets Manager をバックエンドとすることもできる。Google Cloud ではデフォルトで airflow-connections-(conn_id) が参照される。Cloud Composer の場合環境変数では設定できず airflow_config_overrides に (section)-(name) を渡して設定し、apache-airflow-providers-slack をインストールする必要があった。
GCP の マネージド Airflow サービスCloud Composer を Terraform で立ち上げてワークフローを動かす - sambaiz-net
resource "google_composer_environment" "composer_env" {
...
config {
...
software_config {
...
# env_variables = {
# AIRFLOW__SECRETS__BACKEND = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
# AIRFLOW__SECRETS__BACKEND_KWARGS = "{\"project_id\": \"*****\"}"
# }
airflow_config_overrides = {
"secrets-backend" = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
"secrets-backend_kwargs" = "{\"project_id\": \"*****\"}"
}
pypi_packages = {
apache-airflow-providers-slack = "~=8.9.2"
}
}
}
}
resource "google_project_iam_member" "composer_secret_access" {
project = "*****"
member = format("serviceAccount:%s", google_service_account.composer_service_account.email)
role = "roles/secretmanager.secretAccessor"
}
# echo '{
# "conn_type": "slack",
# "password": "xoxb-*****"
# }' | gcloud secrets create airflow-connections-slack_default --data-file=-
あとは SlackAPIPostOperator や SlackWebhookOperator に conn_id を渡すことで Slack に投稿できる。
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def slack_failure_notification(context):
slack_msg = (
f":red_circle: Task Failed\n"
f"*DAG*: {context['task_instance'].dag_id}\n"
f"*Task*: {context['task_instance'].task_id}\n"
f"*Execution Time*: {context['execution_date']}\n"
)
SlackWebhookOperator(
task_id='slack_notification',
slack_webhook_conn_id='slack_default',
message=slack_msg
).execute(context=context)
with DAG(
dag_id='example_failure_notification',
default_args={
'on_failure_callback': slack_failure_notification,
'retries': 0,
}
start_date=datetime(2024, 12, 27),
schedule_interval=None,
catchup=False,
) as dag:
BashOperator(
task_id='failing_task',
bash_command='exit 1'
)