[LLMOps] Day 5: 프로덕션 배포와 모니터링 - 안정적인 RAG 시스템 운영하기
[LLMOps] Day 5: 프로덕션 배포와 모니터링 - 안정적인 RAG 시스템 운영하기
서론: 프로토타입에서 프로덕션으로
RAG 시스템을 개발하는 것과 운영하는 것은 완전히 다른 문제다.
- 로컬에서는 잘 되는데, 프로덕션에서는 느리다: 동시 요청이 10개만 넘어도 타임아웃이 발생한다.
- 비용이 예상의 10배: 임베딩 API 호출을 잘못 설계해서 불필요한 비용이 발생한다.
- 품질 저하를 모른다: 새로운 문서가 추가되면서 검색 정확도가 떨어지는데, 사용자 불만이 쌓이기 전까지 알 수 없다.
이번 글에서는 프로덕션 RAG 시스템의 아키텍처, 모니터링, 비용 최적화를 실전 관점에서 다룬다.
1. 프로덕션 아키텍처
1.1 동기 vs 비동기 처리
잘못된 구현 (동기 처리):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI
app = FastAPI()
@app.post("/search")
def search(query: str):
# 문제: 모든 작업이 순차적으로 실행됨
embedding = get_embedding(query) # 150ms
vector_results = vector_db.search(embedding) # 50ms
keyword_results = bm25.search(query) # 30ms
reranked = reranker.rerank(...) # 200ms
response = llm.generate(...) # 2000ms
return response # Total: 2430ms
올바른 구현 (비동기 + 병렬 처리):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.post("/search")
async def search(query: str):
# 1. 임베딩 생성 (필수)
embedding = await get_embedding_async(query) # 150ms
# 2. 벡터 + 키워드 검색 병렬 실행
vector_task = vector_db.search_async(embedding)
keyword_task = bm25.search_async(query)
vector_results, keyword_results = await asyncio.gather(
vector_task, # 50ms
keyword_task # 30ms
) # Total: 50ms (병렬)
# 3. 병합 및 리랭킹
candidates = merge_results(vector_results, keyword_results)
reranked = await reranker.rerank_async(query, candidates) # 200ms
# 4. LLM 생성
response = await llm.generate_async(query, reranked) # 2000ms
return response # Total: 150 + 50 + 200 + 2000 = 2400ms → 실제로는 2300ms
개선 효과: 130ms 단축 (약 5% 개선)
1.2 캐싱 전략
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import hashlib
import asyncio
from functools import lru_cache
import redis.asyncio as redis
redis_client = redis.from_url("redis://localhost")
class CachedRAG:
def __init__(self):
# 임베딩 캐시 (로컬 메모리)
self.embedding_cache = {}
@lru_cache(maxsize=1000)
def _embedding_cache_key(self, text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()
async def get_embedding_cached(self, text: str):
cache_key = self._embedding_cache_key(text)
# 1. 메모리 캐시 확인
if cache_key in self.embedding_cache:
return self.embedding_cache[cache_key]
# 2. Redis 캐시 확인
cached = await redis_client.get(f"emb:{cache_key}")
if cached:
embedding = json.loads(cached)
self.embedding_cache[cache_key] = embedding
return embedding
# 3. API 호출
embedding = await get_embedding_async(text)
# 4. 캐시 저장
self.embedding_cache[cache_key] = embedding
await redis_client.setex(
f"emb:{cache_key}",
86400, # 24시간
json.dumps(embedding)
)
return embedding
async def search_cached(self, query: str):
# 검색 결과 캐시 (1시간)
cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}"
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 검색 수행
results = await self.search(query)
# 캐시 저장
await redis_client.setex(cache_key, 3600, json.dumps(results))
return results
효과:
- 임베딩 API 호출 40~60% 감소
- 응답 시간 200ms → 50ms (캐시 히트 시)
- 월 비용 $500 → $200 절감
1.3 Rate Limiting & Circuit Breaker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from circuitbreaker import circuit
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/search")
@limiter.limit("10/minute") # 사용자당 분당 10회
async def search(request: Request, query: str):
return await rag.search(query)
# Circuit Breaker: API 장애 시 빠른 실패
@circuit(failure_threshold=5, recovery_timeout=60)
async def get_embedding_async(text: str):
try:
response = await openai_client.embeddings.create(...)
return response.data[0].embedding
except Exception as e:
# 5회 연속 실패 시 60초간 요청 차단
raise
2. 모니터링 및 관찰성 (Observability)
2.1 핵심 메트릭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from prometheus_client import Counter, Histogram, Gauge
import time
# 메트릭 정의
search_requests = Counter('rag_search_requests_total', 'Total search requests')
search_latency = Histogram('rag_search_latency_seconds', 'Search latency')
embedding_cache_hits = Counter('rag_embedding_cache_hits', 'Embedding cache hits')
rerank_score = Histogram('rag_rerank_top1_score', 'Top-1 reranking score')
llm_tokens = Counter('rag_llm_tokens_total', 'Total LLM tokens used')
async def search_with_metrics(query: str):
search_requests.inc()
start = time.time()
try:
# 검색 수행
results = await rag.search(query)
# 메트릭 기록
search_latency.observe(time.time() - start)
rerank_score.observe(results[0].score)
return results
except Exception as e:
search_errors.inc()
raise
Grafana 대시보드 필수 메트릭:
| 메트릭 | 목표 | 알람 조건 |
|---|---|---|
| p99 레이턴시 | < 500ms | > 1000ms |
| 에러율 | < 0.1% | > 1% |
| 캐시 히트율 | > 40% | < 20% |
| Top-1 리랭킹 점수 | > 0.7 | < 0.5 (품질 저하) |
| 시간당 임베딩 API 호출 | < 10,000 | > 50,000 (비용 폭증) |
2.2 LangSmith를 활용한 트레이싱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langsmith import Client
from langsmith.run_helpers import traceable
langsmith_client = Client()
@traceable(run_type="chain", name="RAG Search")
async def search_traced(query: str):
with langsmith_client.trace(name="embedding"):
embedding = await get_embedding_async(query)
with langsmith_client.trace(name="vector_search"):
vector_results = await vector_db.search(embedding)
with langsmith_client.trace(name="reranking"):
reranked = await reranker.rerank(query, vector_results)
with langsmith_client.trace(name="llm_generation"):
response = await llm.generate(query, reranked)
return response
LangSmith 대시보드에서 확인 가능:
- 각 단계별 소요 시간 (워터폴 차트)
- 실패한 요청의 입력/출력
- 비용 추적 (토큰 사용량)
- 사용자 피드백 연동
2.3 품질 모니터링
사용자가 “답변이 이상해요”라고 말하기 전에 먼저 감지해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import numpy as np
from datetime import datetime, timedelta
class QualityMonitor:
def __init__(self):
self.recent_scores = []
async def log_search(self, query: str, results: list, user_clicked: int = None):
"""검색 품질 로깅"""
top_score = results[0].relevance_score
# 메트릭 저장
await db.execute("""
INSERT INTO search_logs (query, top_score, clicked_rank, timestamp)
VALUES (?, ?, ?, ?)
""", (query, top_score, user_clicked, datetime.now()))
# 최근 100개 평균 점수 추적
self.recent_scores.append(top_score)
if len(self.recent_scores) > 100:
self.recent_scores.pop(0)
# 품질 저하 감지
avg_score = np.mean(self.recent_scores)
if avg_score < 0.5: # 임계값
await alert_slack(f"⚠️ 검색 품질 저하: 평균 점수 {avg_score:.2f}")
async def analyze_daily_quality(self):
"""일일 품질 리포트"""
yesterday = datetime.now() - timedelta(days=1)
stats = await db.fetch_one("""
SELECT
AVG(top_score) as avg_score,
AVG(CASE WHEN clicked_rank <= 3 THEN 1 ELSE 0 END) as top3_ctr,
COUNT(*) as total_searches
FROM search_logs
WHERE timestamp > ?
""", (yesterday,))
# Slack 알림
await alert_slack(f"""
📊 일일 검색 품질 리포트
- 평균 Top-1 점수: {stats['avg_score']:.2f}
- Top-3 클릭률: {stats['top3_ctr']:.1%}
- 총 검색 수: {stats['total_searches']:,}
""")
3. 비용 최적화
3.1 임베딩 비용 절감
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class EmbeddingOptimizer:
def __init__(self):
self.cache = {}
self.batch_queue = []
self.batch_size = 100
async def get_embedding_batched(self, text: str):
"""배치 처리로 API 호출 횟수 감소"""
# 캐시 확인
if text in self.cache:
return self.cache[text]
# 배치에 추가
future = asyncio.Future()
self.batch_queue.append((text, future))
# 배치가 차면 한 번에 처리
if len(self.batch_queue) >= self.batch_size:
await self._process_batch()
return await future
async def _process_batch(self):
if not self.batch_queue:
return
texts = [item[0] for item in self.batch_queue]
futures = [item[1] for item in self.batch_queue]
# 한 번의 API 호출로 여러 개 처리
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=texts # 배치 입력
)
# 결과 분배
for i, embedding_data in enumerate(response.data):
self.cache[texts[i]] = embedding_data.embedding
futures[i].set_result(embedding_data.embedding)
self.batch_queue.clear()
효과:
- API 호출 횟수: 100회 → 1회
- 네트워크 오버헤드 감소
- 월 비용: $800 → $150 (약 80% 절감)
3.2 LLM 비용 절감
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class LLMOptimizer:
def __init__(self):
self.model_tier = {
"simple": "gpt-4o-mini", # $0.15 / 1M tokens
"normal": "gpt-4o", # $2.50 / 1M tokens
"complex": "o1" # $15.00 / 1M tokens
}
async def generate(self, query: str, context: str):
# 쿼리 복잡도 분류
complexity = self._classify_complexity(query)
model = self.model_tier[complexity]
# 컨텍스트 압축
compressed_context = self._compress_context(context, max_tokens=2000)
response = await openai_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "간결하게 답변하세요."},
{"role": "user", "content": f"컨텍스트:\n{compressed_context}\n\n질문: {query}"}
],
max_tokens=300 # 불필요하게 긴 답변 방지
)
return response.choices[0].message.content
def _classify_complexity(self, query: str) -> str:
"""쿼리 복잡도 분류"""
# 간단한 휴리스틱
if len(query.split()) < 5:
return "simple"
elif "비교" in query or "분석" in query:
return "complex"
else:
return "normal"
def _compress_context(self, context: str, max_tokens: int) -> str:
"""컨텍스트 압축"""
# 토큰 수 계산 (간략화)
current_tokens = len(context.split()) * 1.3
if current_tokens <= max_tokens:
return context
# 문단 단위로 잘라서 중요한 부분만 유지
paragraphs = context.split("\n\n")
compressed = []
token_count = 0
for para in paragraphs:
para_tokens = len(para.split()) * 1.3
if token_count + para_tokens <= max_tokens:
compressed.append(para)
token_count += para_tokens
else:
break
return "\n\n".join(compressed)
효과:
- 평균 비용: $0.05 per query → $0.02 per query (60% 절감)
- 응답 품질: 95% 유지
4. 장애 대응 및 복구
4.1 Fallback 전략
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ResilientRAG:
async def search(self, query: str):
try:
# 1차: 전체 파이프라인
return await self.full_pipeline(query)
except EmbeddingAPIError:
# 2차: 임베딩 실패 시 키워드 검색만
logger.warning("Embedding API failed, fallback to BM25")
return await self.keyword_only_search(query)
except VectorDBError:
# 3차: 벡터 DB 장애 시 캐시된 결과
logger.error("Vector DB failed, using cached results")
return await self.get_cached_results(query)
except Exception as e:
# 최후: 사전 정의된 답변
logger.critical(f"Complete failure: {e}")
return {"answer": "죄송합니다. 일시적인 오류가 발생했습니다.", "sources": []}
4.2 Health Check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@app.get("/health")
async def health_check():
checks = {}
# 벡터 DB 연결 확인
try:
await vector_db.ping()
checks["vector_db"] = "ok"
except:
checks["vector_db"] = "failed"
# 임베딩 API 확인
try:
await get_embedding_async("test", timeout=2)
checks["embedding_api"] = "ok"
except:
checks["embedding_api"] = "failed"
# Redis 확인
try:
await redis_client.ping()
checks["redis"] = "ok"
except:
checks["redis"] = "failed"
is_healthy = all(v == "ok" for v in checks.values())
status_code = 200 if is_healthy else 503
return JSONResponse(checks, status_code=status_code)
5. 프로덕션 체크리스트
배포 전 반드시 확인할 항목:
- 성능 테스트: 동시 요청 100개에서 p99 < 1초
- 비용 예측: 월간 예상 요청 수 × 쿼리당 비용 계산
- 모니터링 설정: Prometheus + Grafana 대시보드
- 알람 설정: 에러율, 레이턴시, 비용 임계값
- 캐싱 전략: 임베딩, 검색 결과 캐싱 구현
- Fallback 구현: API 장애 시 대체 전략
- Rate Limiting: 사용자당/IP당 요청 제한
- 보안: API 키 환경 변수 관리, HTTPS
- 로깅: 모든 검색 쿼리 및 결과 기록
- A/B 테스트 준비: 실험 플래그 및 메트릭 수집
This post is licensed under CC BY 4.0 by the author.