Airflow から Secrets Manager に格納されている Slack の認証情報を用いて通知を飛ばす

gcpairflowterraform

Airflow には外部サービスの認証情報を格納するオブジェクトとして Connections がある。

UI から登録するほか AIRFLOW_CONN_(conn_id) 環境変数で与えたり、

# export AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-*****@/'
Connection(
    conn_id="slack_api_default",
    conn_type="slack",
    password="xoxb-*****",
)

# export AIRFLOW_CONN_SLACK_DEFAULT='slackwebhook://:T00000000%2FB00000000%2FXXXXXXXXXXXXXXXXXXXXXXXX@/'
Connection(
    conn_id="slack_default",
    conn_type="slackwebhook",
    password="T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
)

AWSGoogle Cloud の Secrets Manager をバックエンドとすることもできる。Google Cloud ではデフォルトで airflow-connections-(conn_id) が参照される。Cloud Composer の場合環境変数では設定できず airflow_config_overrides に (section)-(name) を渡して設定し、apache-airflow-providers-slack をインストールする必要があった。

GCP の マネージド Airflow サービスCloud Composer を Terraform で立ち上げてワークフローを動かす - sambaiz-net

resource "google_composer_environment" "composer_env" {
  ...
  config {
    ...
    software_config {
	    ...
	    # env_variables = {
      #  AIRFLOW__SECRETS__BACKEND        = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
      #  AIRFLOW__SECRETS__BACKEND_KWARGS = "{\"project_id\": \"*****\"}"
      # }
	    airflow_config_overrides = {
        "secrets-backend"        = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
        "secrets-backend_kwargs" = "{\"project_id\": \"*****\"}"
      }
      pypi_packages = {
	      apache-airflow-providers-slack = "~=8.9.2"
      }
    }
  }
}

resource "google_project_iam_member" "composer_secret_access" {
  project = "*****"
  member  = format("serviceAccount:%s", google_service_account.composer_service_account.email)
  role    = "roles/secretmanager.secretAccessor"
}

# echo '{
#  "conn_type": "slack",
#  "password": "xoxb-*****"
# }' | gcloud secrets create airflow-connections-slack_default --data-file=-

あとは SlackAPIPostOperatorSlackWebhookOperator に conn_id を渡すことで Slack に投稿できる。

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def slack_failure_notification(context):
    slack_msg = (
        f":red_circle: Task Failed\n"
        f"*DAG*: {context['task_instance'].dag_id}\n"
        f"*Task*: {context['task_instance'].task_id}\n"
        f"*Execution Time*: {context['execution_date']}\n"
    )

    SlackWebhookOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_default',
        message=slack_msg
    ).execute(context=context)

with DAG(
    dag_id='example_failure_notification',
    default_args={
		    'on_failure_callback': slack_failure_notification,
        'retries': 0,
		}
    start_date=datetime(2024, 12, 27),
    schedule_interval=None,
    catchup=False,
) as dag:
  BashOperator(
    task_id='failing_task',
    bash_command='exit 1'
  )