Post

[Airflow 101] Day 4: 외부 세계와의 연결, Provider & Connection (feat. Slack 알림)

[Airflow 101] Day 4: 외부 세계와의 연결, Provider & Connection (feat. Slack 알림)

들어가며: 비밀번호를 코드에 적지 마세요

개발 초보 시절, 우리는 종종 이런 실수를 하곤 합니다.

1
2
3
4
# 절대 하지 말아야 할 짓
aws_access_key = "AKIAIOSFODNN7EXAMPLE" 
slack_token = "xoxb-1234-5678-abcdef"

비밀번호나 API Key를 코드에 하드코딩하는 것은 보안상 최악의 패턴입니다. 깃허브(GitHub)에 올리는 순간 전 세계 해커들의 먹잇감이 되죠.

Airflow는 이 문제를 해결하기 위해 Connection(커넥션)이라는 안전한 금고를 제공합니다. 그리고 이 금고를 열쇠로 외부 서비스(AWS, GCP, Slack, MySQL 등)와 쉽게 연동할 수 있도록 Provider(프로바이더)라는 라이브러리 묶음을 제공합니다.

오늘은 “작업이 실패하면 내 Slack으로 비명(알림)을 지르는 파이프라인”을 만들어보겠습니다.


1. Provider 설치: Slack과 대화하기

Airflow는 기본 설치 시 핵심 기능만 들어있습니다. Slack과 대화하려면 Slack Provider를 추가로 설치해야 합니다.

Docker 환경에서 실행 중이므로, 컨테이너 내부에 진입해 라이브러리를 설치해 봅시다. (실제 운영 환경에서는 Dockerfile에 미리 명시해서 이미지를 빌드해야 합니다.)

1
2
3
4
5
6
# 1. 실행 중인 Scheduler 컨테이너 이름 확인 (보통 airflow-tutorial-airflow-scheduler-1 등의 이름)
docker ps 

# 2. 컨테이너 내부에서 pip install 실행 (컨테이너 이름은 본인 환경에 맞게 수정)
docker exec -it <컨테이너_ID_혹은_이름> pip install apache-airflow-providers-slack

설치 후에는 웹서버와 스케줄러를 재시작해야 안전하게 반영됩니다 (docker compose restart).


2. Connection 설정: 열쇠 보관하기

이제 Slack의 Webhook URL을 Airflow의 금고(Connection)에 저장할 차례입니다. (Slack Incoming Webhook URL 발급 방법은 이미 알고 있다고 가정합니다. 만약 없다면 Slack API에서 간단히 생성 가능합니다.)

  1. Airflow Web UI 접속
  2. 상단 메뉴 Admin -> Connections 클릭
  3. + (Create) 버튼 클릭
  4. 다음과 같이 입력합니다:
    • Connection Id: slack_conn (코드에서 이 이름을 부르게 됩니다)
    • Connection Type: HTTP
    • Host: https://hooks.slack.com/services/
    • Password: T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX (Webhook URL의 뒷부분)
    • 참고: Webhook URL 전체가 https://hooks.slack.com/services/T000... 라면, Host에는 앞부분을, Password에는 뒷부분을 넣습니다.

이제 코드는 slack_conn이라는 이름만 알면 되고, 실제 비밀 토큰은 몰라도 됩니다.


3. 실전 예제 1: 작업 완료 시 Slack 알림 보내기

가장 단순한 형태는 파이프라인의 마지막 단계로 알림 태스크를 넣는 것입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from airflow import DAG
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id='slack_notification_demo',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    # 1. 실제 작업을 수행하는 태스크
    work = BashOperator(
        task_id='do_heavy_work',
        bash_command='sleep 5 && echo "작업 완료!"',
    )

    # 2. Slack으로 알림을 보내는 태스크
    notify = SlackWebhookOperator(
        task_id='send_slack',
        http_conn_id='slack_conn',  # 아까 등록한 Connection ID
        message=':white_check_mark: 작업이 성공적으로 끝났습니다! ()', # 이모지 사용 가능
        channel='#general',
        username='Airflow Bot'
    )

    work >> notify

이 DAG를 실행하면 work가 끝나고 나서 Slack의 #general 채널에 메시지가 뜰 것입니다.


4. 실전 예제 2: “실패했을 때만” 알림 보내기 (Callbacks)

사실 성공 알림보다 더 중요한 것은 “실패 알림”입니다. 밤새 파이프라인이 돌다가 에러가 났을 때만 개발자를 깨워야 하니까요.

Airflow는 이를 위해 on_failure_callback이라는 강력한 기능을 제공합니다. 태스크가 실패하면 자동으로 실행되는 함수입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime

# 실패 시 실행될 콜백 함수 정의
def send_slack_alert(context):
    slack_msg = f"""
    :red_circle: Task Failed.
    *Task*: {context.get('task_instance').task_id}  
    *Dag*: {context.get('task_instance').dag_id} 
    *Execution Time*: {context.get('execution_date')}  
    """
    
    # Operator를 함수 안에서 실행
    alert = SlackWebhookOperator(
        task_id='slack_fail_alert',
        http_conn_id='slack_conn',
        message=slack_msg,
        channel='#alerts'
    )
    return alert.execute(context=context)

with DAG(
    dag_id='slack_alert_callback_demo',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    # 모든 태스크에 기본적으로 적용될 설정
    default_args={
        'on_failure_callback': send_slack_alert  # 실패 시 이 함수를 호출하라!
    }
) as dag:

    # 성공하는 태스크
    t1 = BashOperator(
        task_id='success_task',
        bash_command='echo "난 성공해"',
    )

    # 무조건 실패하는 태스크 (테스트용)
    t2 = BashOperator(
        task_id='fail_task',
        bash_command='exit 1',  # 0이 아닌 종료 코드는 실패로 간주
    )

    t1 >> t2

이 DAG를 실행하면 t1은 성공해서 조용히 넘어가지만, t2가 실패하는 순간 send_slack_alert 함수가 트리거되어 Slack으로 빨간 경고 메시지가 날아옵니다.


Day 4 요약

오늘은 Airflow를 운영(Operations) 관점에서 바라보았습니다.

  • Provider: Airflow의 기능을 확장하는 플러그인 (예: apache-airflow-providers-slack).
  • Connection: 인증 정보를 코드와 분리하여 안전하게 관리하는 저장소.
  • Callback: 태스크의 성공/실패 여부에 따라 자동으로 후속 조치(알림 등)를 취하는 패턴.
This post is licensed under CC BY 4.0 by the author.