Post

[Airflow 101] Day 3: 태스크 간의 대화법, XCom과 Jinja Template

[Airflow 101] Day 3: 태스크 간의 대화법, XCom과 Jinja Template

들어가며: 고립된 태스크들을 연결하라

Airflow의 태스크(Task)들은 기본적으로 서로 완전히 격리(Isolated)되어 있습니다. Task A가 서울에서 실행되고, Task B가 부산에서 실행되는 것과 같습니다(실제로 분산 환경에서는 다른 서버에서 돕니다). 그래서 Task A 안에서 변수를 선언해도 Task B는 그 변수를 알 수 없습니다.

하지만 우리는 종종 이런 흐름이 필요합니다.

Task A: “오늘 주문 건수는 100건이야.” (데이터 생성) Task B: “그래? 그럼 100건 × 5000원 = 50만원이 매출이네.” (데이터 활용)

이 대화를 가능하게 해주는 것이 바로 XComJinja Template입니다.


1. Jinja Templating: 실행 시점에 결정되는 마법의 변수

Airflow는 파이썬의 템플릿 엔진인 Jinja2를 내장하고 있습니다. 이를 사용하면 코드 안에 변수 구멍을 뚫어놓고, 실행되는 시점에 값을 채워 넣을 수 있습니다.

가장 대표적인 것이 `` (Date String)입니다.

왜 필요한가? (Idempotency)

매일 자정에 도는 쿼리가 있다고 칩시다.

1
2
3
-- 나쁜 예 (오늘 날짜를 직접 구함)
SELECT * FROM orders WHERE date = NOW();

이 쿼리는 문제가 있습니다. 만약 어제(1월 6일) 데이터 처리가 실패해서 오늘(1월 7일) 다시 돌리면, NOW()는 1월 7일을 가리키기 때문에 어제 데이터를 영원히 잃어버리게 됩니다.

Airflow는 이 문제를 해결하기 위해 “이 DAG가 실행되어야 했던 기준 날짜(Logical Date)”를 변수로 제공합니다.

1
2
3
-- 좋은 예 (Airflow가 주입해주는 날짜 사용)
SELECT * FROM orders WHERE date = '';

이렇게 짜면, 1월 6일 자 작업을 1년 뒤에 재실행해도 ``는 2026-01-06으로 고정됩니다. 이것이 멱등성(Idempotency)의 핵심입니다.


2. XCom (Cross Communications): 태스크 간의 메신저

XCom은 “작은 데이터”를 태스크 간에 교환하기 위한 메커니즘입니다.

  • Push: 데이터를 보냄 (Airflow 메타데이터 DB에 저장됨)
  • Pull: 데이터를 받음

⚠️ 주의: XCom은 메타데이터 DB(Postgres/MySQL)에 저장됩니다. 따라서 수 기가바이트의 대용량 데이터(DataFrame 전체 등)를 XCom으로 넘기면 DB가 터집니다. 파일 경로나, 통계값 같은 작은 메타데이터만 넘겨야 합니다. 대용량 데이터는 S3나 HDFS 같은 외부 저장소에 저장하고 그 ‘경로(Path)’만 XCom으로 넘기세요.


3. 실전 예제: 스마트한 데이터 파이프라인 만들기

이제 이론을 코드로 옮겨봅시다. 이번에 만들 DAG는 다음과 같은 일을 합니다.

  1. Extract: 모의 주문 데이터를 생성하여 리턴합니다. (XCom Push)
  2. Transform: 주문 데이터를 받아 매출액을 계산합니다. (XCom Pull)
  3. Load: 계산된 결과를 파일로 저장하되, 파일명에 날짜를 넣습니다. (Jinja Template)

dags/xcom_jinja_demo.py 파일을 생성하세요.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def _get_order_stats(**context):
    """
    주문 건수를 생성하고 리턴하는 함수
    PythonOperator에서 return 값은 자동으로 XCom에 'return_value'라는 key로 Push 됩니다.
    """
    print("주문 데이터를 집계합니다...")
    order_count = 100
    print(f"오늘의 주문 건수: {order_count}")
    return order_count

def _calculate_revenue(**context):
    """
    이전 태스크의 결과를 받아서 매출을 계산하는 함수
    """
    # 1. XCom에서 데이터 꺼내기 (Pull)
    # ti = Task Instance
    ti = context['ti']
    order_count = ti.xcom_pull(task_ids='extract_order')
    
    if not order_count:
        raise ValueError("주문 데이터가 없습니다!")

    revenue = order_count * 5000 # 객단가 5000원 가정
    print(f"총 매출액: {revenue}")
    
    return revenue

with DAG(
    dag_id='xcom_jinja_demo',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['tutorial', 'xcom', 'jinja']
) as dag:

    # 1. Extract: 값 리턴 (Auto XCom Push)
    extract = PythonOperator(
        task_id='extract_order',
        python_callable=_get_order_stats,
    )

    # 2. Transform: 값 받아서 처리
    transform = PythonOperator(
        task_id='calc_revenue',
        python_callable=_calculate_revenue,
        provide_context=True, # Airflow의 context 변수(ti 등)를 함수에 전달
    )

    # 3. Load: BashOperator + Jinja Template 사용
    # : 실행 기준 날짜 (YYYY-MM-DD)
    # : 템플릿 안에서도 XCom을 당겨올 수 있습니다!
    load = BashOperator(
        task_id='save_report',
        bash_command='echo "Date: , Revenue: " > /opt/airflow/logs/report_.txt'
    )

    extract >> transform >> load


4. 실행 및 결과 확인

(1) XCom 확인하기

DAG를 실행(Trigger) 시키고, 성공(Success)했다면 Admin -> XComs 메뉴로 들어가 보세요. extract_order 태스크가 100이라는 값을, calc_revenue 태스크가 500000이라는 값을 저장하고 있는 것을 볼 수 있습니다. 이것이 태스크 간에 데이터가 이동한 흔적입니다.

(2) Jinja Template 결과 확인하기

세 번째 태스크(save_report)는 Bash 명령어로 파일을 생성했습니다. 로그 탭을 열어보면 실제 실행된 명령어가 보일 것입니다.

1
2
3
# 템플릿이 렌더링 된 결과
echo "Date: 2026-01-07, Revenue: 500000" > /opt/airflow/logs/report_2026-01-07.txt

코드에는 ``라고 썼지만, Airflow가 실행 시점에 실제 날짜로 바꿔치기(Rendering) 해준 것입니다.


Day 3 요약

오늘은 Airflow를 더 똑똑하게 만드는 두 가지 무기를 장착했습니다.

  1. Jinja Template (``): 코드를 수정하지 않고도 날짜별로 다른 파일명, 다른 쿼리를 실행하게 해 줍니다. (멱등성의 핵심!)
  2. XCom: 고립된 태스크끼리 데이터를 주고받는 다리 역할을 합니다.
    • Python 함수에서 return 하면 자동 Push.
    • ti.xcom_pull로 데이터 수신.
This post is licensed under CC BY 4.0 by the author.