Sending notifications from Airflow using Slack credentials stored in Secrets Manager

gcpairflowterraform

Airflow has Connections as objects for storing authentication credentials for external services.

You can register them through the UI or provide them via AIRFLOW_CONN_(conn_id) environment variables.

# export AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-*****@/'
Connection(
    conn_id="slack_api_default",
    conn_type="slack",
    password="xoxb-*****",
)

# export AIRFLOW_CONN_SLACK_DEFAULT='slackwebhook://:T00000000%2FB00000000%2FXXXXXXXXXXXXXXXXXXXXXXXX@/'
Connection(
    conn_id="slack_default",
    conn_type="slackwebhook",
    password="T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
)

You can also use AWS or Google Cloud Secrets Manager as a backend. In Google Cloud, airflow-connections-(conn_id) is referenced by default. For Cloud Composer, it cannot be set through environment variables and must be configured by passing (section)-(name) to airflow_config_overrides, and apache-airflow-providers-slack needs to be installed.

Setting up GCP’s managed Airflow service Cloud Composer using Terraform and running workflows - sambaiz-net

resource "google_composer_environment" "composer_env" {
  ...
  config {
    ...
    software_config {
	    ...
	    # env_variables = {
      #  AIRFLOW__SECRETS__BACKEND        = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
      #  AIRFLOW__SECRETS__BACKEND_KWARGS = "{\"project_id\": \"*****\"}"
      # }
	    airflow_config_overrides = {
        "secrets-backend"        = "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
        "secrets-backend_kwargs" = "{\"project_id\": \"*****\"}"
      }
      pypi_packages = {
	      apache-airflow-providers-slack = "~=8.9.2"
      }
    }
  }
}

resource "google_project_iam_member" "composer_secret_access" {
  project = "*****"
  member  = format("serviceAccount:%s", google_service_account.composer_service_account.email)
  role    = "roles/secretmanager.secretAccessor"
}

# echo '{
#  "conn_type": "slack",
#  "password": "xoxb-*****"
# }' | gcloud secrets create airflow-connections-slack_default --data-file=-

Then it can post to Slack by passing conn_id to SlackAPIPostOperator or SlackWebhookOperator.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def slack_failure_notification(context):
    slack_msg = (
        f":red_circle: Task Failed\n"
        f"*DAG*: {context['task_instance'].dag_id}\n"
        f"*Task*: {context['task_instance'].task_id}\n"
        f"*Execution Time*: {context['execution_date']}\n"
    )

    SlackWebhookOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_default',
        message=slack_msg
    ).execute(context=context)

with DAG(
    dag_id='example_failure_notification',
    default_args={
		    'on_failure_callback': slack_failure_notification,
        'retries': 0,
		}
    start_date=datetime(2024, 12, 27),
    schedule_interval=None,
    catchup=False,
) as dag:
  BashOperator(
    task_id='failing_task',
    bash_command='exit 1'
  )