[Airflow 101] Day 3: 태스크 간의 대화법, XCom과 Jinja Template
들어가며: 고립된 태스크들을 연결하라
Airflow의 태스크(Task)들은 기본적으로 서로 완전히 격리(Isolated)되어 있습니다. Task A가 서울에서 실행되고, Task B가 부산에서 실행되는 것과 같습니다(실제로 분산 환경에서는 다른 서버에서 돕니다). 그래서 Task A 안에서 변수를 선언해도 Task B는 그 변수를 알 수 없습니다.
하지만 우리는 종종 이런 흐름이 필요합니다.
Task A: “오늘 주문 건수는 100건이야.” (데이터 생성) Task B: “그래? 그럼 100건 × 5000원 = 50만원이 매출이네.” (데이터 활용)
이 대화를 가능하게 해주는 것이 바로 XCom과 Jinja Template입니다.
1. Jinja Templating: 실행 시점에 결정되는 마법의 변수
Airflow는 파이썬의 템플릿 엔진인 Jinja2를 내장하고 있습니다. 이를 사용하면 코드 안에 변수 구멍을 뚫어놓고, 실행되는 시점에 값을 채워 넣을 수 있습니다.
가장 대표적인 것이 `` (Date String)입니다.
왜 필요한가? (Idempotency)
매일 자정에 도는 쿼리가 있다고 칩시다.
1
2
3
-- 나쁜 예 (오늘 날짜를 직접 구함)
SELECT * FROM orders WHERE date = NOW();
이 쿼리는 문제가 있습니다. 만약 어제(1월 6일) 데이터 처리가 실패해서 오늘(1월 7일) 다시 돌리면, NOW()는 1월 7일을 가리키기 때문에 어제 데이터를 영원히 잃어버리게 됩니다.
Airflow는 이 문제를 해결하기 위해 “이 DAG가 실행되어야 했던 기준 날짜(Logical Date)”를 변수로 제공합니다.
1
2
3
-- 좋은 예 (Airflow가 주입해주는 날짜 사용)
SELECT * FROM orders WHERE date = '';
이렇게 짜면, 1월 6일 자 작업을 1년 뒤에 재실행해도 ``는 2026-01-06으로 고정됩니다. 이것이 멱등성(Idempotency)의 핵심입니다.
2. XCom (Cross Communications): 태스크 간의 메신저
XCom은 “작은 데이터”를 태스크 간에 교환하기 위한 메커니즘입니다.
- Push: 데이터를 보냄 (Airflow 메타데이터 DB에 저장됨)
- Pull: 데이터를 받음
⚠️ 주의: XCom은 메타데이터 DB(Postgres/MySQL)에 저장됩니다. 따라서 수 기가바이트의 대용량 데이터(DataFrame 전체 등)를 XCom으로 넘기면 DB가 터집니다. 파일 경로나, 통계값 같은 작은 메타데이터만 넘겨야 합니다. 대용량 데이터는 S3나 HDFS 같은 외부 저장소에 저장하고 그 ‘경로(Path)’만 XCom으로 넘기세요.
3. 실전 예제: 스마트한 데이터 파이프라인 만들기
이제 이론을 코드로 옮겨봅시다. 이번에 만들 DAG는 다음과 같은 일을 합니다.
- Extract: 모의 주문 데이터를 생성하여 리턴합니다. (XCom Push)
- Transform: 주문 데이터를 받아 매출액을 계산합니다. (XCom Pull)
- Load: 계산된 결과를 파일로 저장하되, 파일명에 날짜를 넣습니다. (Jinja Template)
dags/xcom_jinja_demo.py 파일을 생성하세요.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _get_order_stats(**context):
"""
주문 건수를 생성하고 리턴하는 함수
PythonOperator에서 return 값은 자동으로 XCom에 'return_value'라는 key로 Push 됩니다.
"""
print("주문 데이터를 집계합니다...")
order_count = 100
print(f"오늘의 주문 건수: {order_count}")
return order_count
def _calculate_revenue(**context):
"""
이전 태스크의 결과를 받아서 매출을 계산하는 함수
"""
# 1. XCom에서 데이터 꺼내기 (Pull)
# ti = Task Instance
ti = context['ti']
order_count = ti.xcom_pull(task_ids='extract_order')
if not order_count:
raise ValueError("주문 데이터가 없습니다!")
revenue = order_count * 5000 # 객단가 5000원 가정
print(f"총 매출액: {revenue}")
return revenue
with DAG(
dag_id='xcom_jinja_demo',
start_date=datetime(2026, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['tutorial', 'xcom', 'jinja']
) as dag:
# 1. Extract: 값 리턴 (Auto XCom Push)
extract = PythonOperator(
task_id='extract_order',
python_callable=_get_order_stats,
)
# 2. Transform: 값 받아서 처리
transform = PythonOperator(
task_id='calc_revenue',
python_callable=_calculate_revenue,
provide_context=True, # Airflow의 context 변수(ti 등)를 함수에 전달
)
# 3. Load: BashOperator + Jinja Template 사용
# : 실행 기준 날짜 (YYYY-MM-DD)
# : 템플릿 안에서도 XCom을 당겨올 수 있습니다!
load = BashOperator(
task_id='save_report',
bash_command='echo "Date: , Revenue: " > /opt/airflow/logs/report_.txt'
)
extract >> transform >> load
4. 실행 및 결과 확인
(1) XCom 확인하기
DAG를 실행(Trigger) 시키고, 성공(Success)했다면 Admin -> XComs 메뉴로 들어가 보세요. extract_order 태스크가 100이라는 값을, calc_revenue 태스크가 500000이라는 값을 저장하고 있는 것을 볼 수 있습니다. 이것이 태스크 간에 데이터가 이동한 흔적입니다.
(2) Jinja Template 결과 확인하기
세 번째 태스크(save_report)는 Bash 명령어로 파일을 생성했습니다. 로그 탭을 열어보면 실제 실행된 명령어가 보일 것입니다.
1
2
3
# 템플릿이 렌더링 된 결과
echo "Date: 2026-01-07, Revenue: 500000" > /opt/airflow/logs/report_2026-01-07.txt
코드에는 ``라고 썼지만, Airflow가 실행 시점에 실제 날짜로 바꿔치기(Rendering) 해준 것입니다.
Day 3 요약
오늘은 Airflow를 더 똑똑하게 만드는 두 가지 무기를 장착했습니다.
- Jinja Template (``): 코드를 수정하지 않고도 날짜별로 다른 파일명, 다른 쿼리를 실행하게 해 줍니다. (멱등성의 핵심!)
- XCom: 고립된 태스크끼리 데이터를 주고받는 다리 역할을 합니다.
- Python 함수에서
return하면 자동 Push. ti.xcom_pull로 데이터 수신.
- Python 함수에서