Post

[LLMOps] Day 5: 프로덕션 배포와 모니터링 - 안정적인 RAG 시스템 운영하기

[LLMOps] Day 5: 프로덕션 배포와 모니터링 - 안정적인 RAG 시스템 운영하기

서론: 프로토타입에서 프로덕션으로

RAG 시스템을 개발하는 것과 운영하는 것은 완전히 다른 문제다.

  • 로컬에서는 잘 되는데, 프로덕션에서는 느리다: 동시 요청이 10개만 넘어도 타임아웃이 발생한다.
  • 비용이 예상의 10배: 임베딩 API 호출을 잘못 설계해서 불필요한 비용이 발생한다.
  • 품질 저하를 모른다: 새로운 문서가 추가되면서 검색 정확도가 떨어지는데, 사용자 불만이 쌓이기 전까지 알 수 없다.

이번 글에서는 프로덕션 RAG 시스템의 아키텍처, 모니터링, 비용 최적화를 실전 관점에서 다룬다.

1. 프로덕션 아키텍처

1.1 동기 vs 비동기 처리

잘못된 구현 (동기 처리):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI

app = FastAPI()

@app.post("/search")
def search(query: str):
    # 문제: 모든 작업이 순차적으로 실행됨
    embedding = get_embedding(query)           # 150ms
    vector_results = vector_db.search(embedding)  # 50ms
    keyword_results = bm25.search(query)        # 30ms
    reranked = reranker.rerank(...)             # 200ms
    response = llm.generate(...)                # 2000ms

    return response  # Total: 2430ms

올바른 구현 (비동기 + 병렬 처리):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
from fastapi import FastAPI

app = FastAPI()

@app.post("/search")
async def search(query: str):
    # 1. 임베딩 생성 (필수)
    embedding = await get_embedding_async(query)  # 150ms

    # 2. 벡터 + 키워드 검색 병렬 실행
    vector_task = vector_db.search_async(embedding)
    keyword_task = bm25.search_async(query)

    vector_results, keyword_results = await asyncio.gather(
        vector_task,    # 50ms
        keyword_task    # 30ms
    )  # Total: 50ms (병렬)

    # 3. 병합 및 리랭킹
    candidates = merge_results(vector_results, keyword_results)
    reranked = await reranker.rerank_async(query, candidates)  # 200ms

    # 4. LLM 생성
    response = await llm.generate_async(query, reranked)  # 2000ms

    return response  # Total: 150 + 50 + 200 + 2000 = 2400ms → 실제로는 2300ms

개선 효과: 130ms 단축 (약 5% 개선)

1.2 캐싱 전략

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import hashlib
import asyncio
from functools import lru_cache
import redis.asyncio as redis

redis_client = redis.from_url("redis://localhost")

class CachedRAG:
    def __init__(self):
        # 임베딩 캐시 (로컬 메모리)
        self.embedding_cache = {}

    @lru_cache(maxsize=1000)
    def _embedding_cache_key(self, text: str) -> str:
        return hashlib.md5(text.encode()).hexdigest()

    async def get_embedding_cached(self, text: str):
        cache_key = self._embedding_cache_key(text)

        # 1. 메모리 캐시 확인
        if cache_key in self.embedding_cache:
            return self.embedding_cache[cache_key]

        # 2. Redis 캐시 확인
        cached = await redis_client.get(f"emb:{cache_key}")
        if cached:
            embedding = json.loads(cached)
            self.embedding_cache[cache_key] = embedding
            return embedding

        # 3. API 호출
        embedding = await get_embedding_async(text)

        # 4. 캐시 저장
        self.embedding_cache[cache_key] = embedding
        await redis_client.setex(
            f"emb:{cache_key}",
            86400,  # 24시간
            json.dumps(embedding)
        )

        return embedding

    async def search_cached(self, query: str):
        # 검색 결과 캐시 (1시간)
        cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}"

        cached = await redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 검색 수행
        results = await self.search(query)

        # 캐시 저장
        await redis_client.setex(cache_key, 3600, json.dumps(results))

        return results

효과:

  • 임베딩 API 호출 40~60% 감소
  • 응답 시간 200ms → 50ms (캐시 히트 시)
  • 월 비용 $500 → $200 절감

1.3 Rate Limiting & Circuit Breaker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from circuitbreaker import circuit

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.post("/search")
@limiter.limit("10/minute")  # 사용자당 분당 10회
async def search(request: Request, query: str):
    return await rag.search(query)

# Circuit Breaker: API 장애 시 빠른 실패
@circuit(failure_threshold=5, recovery_timeout=60)
async def get_embedding_async(text: str):
    try:
        response = await openai_client.embeddings.create(...)
        return response.data[0].embedding
    except Exception as e:
        # 5회 연속 실패 시 60초간 요청 차단
        raise

2. 모니터링 및 관찰성 (Observability)

2.1 핵심 메트릭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from prometheus_client import Counter, Histogram, Gauge
import time

# 메트릭 정의
search_requests = Counter('rag_search_requests_total', 'Total search requests')
search_latency = Histogram('rag_search_latency_seconds', 'Search latency')
embedding_cache_hits = Counter('rag_embedding_cache_hits', 'Embedding cache hits')
rerank_score = Histogram('rag_rerank_top1_score', 'Top-1 reranking score')
llm_tokens = Counter('rag_llm_tokens_total', 'Total LLM tokens used')

async def search_with_metrics(query: str):
    search_requests.inc()

    start = time.time()

    try:
        # 검색 수행
        results = await rag.search(query)

        # 메트릭 기록
        search_latency.observe(time.time() - start)
        rerank_score.observe(results[0].score)

        return results
    except Exception as e:
        search_errors.inc()
        raise

Grafana 대시보드 필수 메트릭:

메트릭목표알람 조건
p99 레이턴시< 500ms> 1000ms
에러율< 0.1%> 1%
캐시 히트율> 40%< 20%
Top-1 리랭킹 점수> 0.7< 0.5 (품질 저하)
시간당 임베딩 API 호출< 10,000> 50,000 (비용 폭증)

2.2 LangSmith를 활용한 트레이싱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langsmith import Client
from langsmith.run_helpers import traceable

langsmith_client = Client()

@traceable(run_type="chain", name="RAG Search")
async def search_traced(query: str):
    with langsmith_client.trace(name="embedding"):
        embedding = await get_embedding_async(query)

    with langsmith_client.trace(name="vector_search"):
        vector_results = await vector_db.search(embedding)

    with langsmith_client.trace(name="reranking"):
        reranked = await reranker.rerank(query, vector_results)

    with langsmith_client.trace(name="llm_generation"):
        response = await llm.generate(query, reranked)

    return response

LangSmith 대시보드에서 확인 가능:

  • 각 단계별 소요 시간 (워터폴 차트)
  • 실패한 요청의 입력/출력
  • 비용 추적 (토큰 사용량)
  • 사용자 피드백 연동

2.3 품질 모니터링

사용자가 “답변이 이상해요”라고 말하기 전에 먼저 감지해야 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import numpy as np
from datetime import datetime, timedelta

class QualityMonitor:
    def __init__(self):
        self.recent_scores = []

    async def log_search(self, query: str, results: list, user_clicked: int = None):
        """검색 품질 로깅"""
        top_score = results[0].relevance_score

        # 메트릭 저장
        await db.execute("""
            INSERT INTO search_logs (query, top_score, clicked_rank, timestamp)
            VALUES (?, ?, ?, ?)
        """, (query, top_score, user_clicked, datetime.now()))

        # 최근 100개 평균 점수 추적
        self.recent_scores.append(top_score)
        if len(self.recent_scores) > 100:
            self.recent_scores.pop(0)

        # 품질 저하 감지
        avg_score = np.mean(self.recent_scores)
        if avg_score < 0.5:  # 임계값
            await alert_slack(f"⚠️ 검색 품질 저하: 평균 점수 {avg_score:.2f}")

    async def analyze_daily_quality(self):
        """일일 품질 리포트"""
        yesterday = datetime.now() - timedelta(days=1)

        stats = await db.fetch_one("""
            SELECT
                AVG(top_score) as avg_score,
                AVG(CASE WHEN clicked_rank <= 3 THEN 1 ELSE 0 END) as top3_ctr,
                COUNT(*) as total_searches
            FROM search_logs
            WHERE timestamp > ?
        """, (yesterday,))

        # Slack 알림
        await alert_slack(f"""
📊 일일 검색 품질 리포트
- 평균 Top-1 점수: {stats['avg_score']:.2f}
- Top-3 클릭률: {stats['top3_ctr']:.1%}
- 총 검색 수: {stats['total_searches']:,}
        """)

3. 비용 최적화

3.1 임베딩 비용 절감

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class EmbeddingOptimizer:
    def __init__(self):
        self.cache = {}
        self.batch_queue = []
        self.batch_size = 100

    async def get_embedding_batched(self, text: str):
        """배치 처리로 API 호출 횟수 감소"""
        # 캐시 확인
        if text in self.cache:
            return self.cache[text]

        # 배치에 추가
        future = asyncio.Future()
        self.batch_queue.append((text, future))

        # 배치가 차면 한 번에 처리
        if len(self.batch_queue) >= self.batch_size:
            await self._process_batch()

        return await future

    async def _process_batch(self):
        if not self.batch_queue:
            return

        texts = [item[0] for item in self.batch_queue]
        futures = [item[1] for item in self.batch_queue]

        # 한 번의 API 호출로 여러 개 처리
        response = await openai_client.embeddings.create(
            model="text-embedding-3-small",
            input=texts  # 배치 입력
        )

        # 결과 분배
        for i, embedding_data in enumerate(response.data):
            self.cache[texts[i]] = embedding_data.embedding
            futures[i].set_result(embedding_data.embedding)

        self.batch_queue.clear()

효과:

  • API 호출 횟수: 100회 → 1회
  • 네트워크 오버헤드 감소
  • 월 비용: $800 → $150 (약 80% 절감)

3.2 LLM 비용 절감

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class LLMOptimizer:
    def __init__(self):
        self.model_tier = {
            "simple": "gpt-4o-mini",      # $0.15 / 1M tokens
            "normal": "gpt-4o",           # $2.50 / 1M tokens
            "complex": "o1"               # $15.00 / 1M tokens
        }

    async def generate(self, query: str, context: str):
        # 쿼리 복잡도 분류
        complexity = self._classify_complexity(query)
        model = self.model_tier[complexity]

        # 컨텍스트 압축
        compressed_context = self._compress_context(context, max_tokens=2000)

        response = await openai_client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "간결하게 답변하세요."},
                {"role": "user", "content": f"컨텍스트:\n{compressed_context}\n\n질문: {query}"}
            ],
            max_tokens=300  # 불필요하게 긴 답변 방지
        )

        return response.choices[0].message.content

    def _classify_complexity(self, query: str) -> str:
        """쿼리 복잡도 분류"""
        # 간단한 휴리스틱
        if len(query.split()) < 5:
            return "simple"
        elif "비교" in query or "분석" in query:
            return "complex"
        else:
            return "normal"

    def _compress_context(self, context: str, max_tokens: int) -> str:
        """컨텍스트 압축"""
        # 토큰 수 계산 (간략화)
        current_tokens = len(context.split()) * 1.3

        if current_tokens <= max_tokens:
            return context

        # 문단 단위로 잘라서 중요한 부분만 유지
        paragraphs = context.split("\n\n")
        compressed = []
        token_count = 0

        for para in paragraphs:
            para_tokens = len(para.split()) * 1.3
            if token_count + para_tokens <= max_tokens:
                compressed.append(para)
                token_count += para_tokens
            else:
                break

        return "\n\n".join(compressed)

효과:

  • 평균 비용: $0.05 per query → $0.02 per query (60% 절감)
  • 응답 품질: 95% 유지

4. 장애 대응 및 복구

4.1 Fallback 전략

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ResilientRAG:
    async def search(self, query: str):
        try:
            # 1차: 전체 파이프라인
            return await self.full_pipeline(query)
        except EmbeddingAPIError:
            # 2차: 임베딩 실패 시 키워드 검색만
            logger.warning("Embedding API failed, fallback to BM25")
            return await self.keyword_only_search(query)
        except VectorDBError:
            # 3차: 벡터 DB 장애 시 캐시된 결과
            logger.error("Vector DB failed, using cached results")
            return await self.get_cached_results(query)
        except Exception as e:
            # 최후: 사전 정의된 답변
            logger.critical(f"Complete failure: {e}")
            return {"answer": "죄송합니다. 일시적인 오류가 발생했습니다.", "sources": []}

4.2 Health Check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@app.get("/health")
async def health_check():
    checks = {}

    # 벡터 DB 연결 확인
    try:
        await vector_db.ping()
        checks["vector_db"] = "ok"
    except:
        checks["vector_db"] = "failed"

    # 임베딩 API 확인
    try:
        await get_embedding_async("test", timeout=2)
        checks["embedding_api"] = "ok"
    except:
        checks["embedding_api"] = "failed"

    # Redis 확인
    try:
        await redis_client.ping()
        checks["redis"] = "ok"
    except:
        checks["redis"] = "failed"

    is_healthy = all(v == "ok" for v in checks.values())
    status_code = 200 if is_healthy else 503

    return JSONResponse(checks, status_code=status_code)

5. 프로덕션 체크리스트

배포 전 반드시 확인할 항목:

  • 성능 테스트: 동시 요청 100개에서 p99 < 1초
  • 비용 예측: 월간 예상 요청 수 × 쿼리당 비용 계산
  • 모니터링 설정: Prometheus + Grafana 대시보드
  • 알람 설정: 에러율, 레이턴시, 비용 임계값
  • 캐싱 전략: 임베딩, 검색 결과 캐싱 구현
  • Fallback 구현: API 장애 시 대체 전략
  • Rate Limiting: 사용자당/IP당 요청 제한
  • 보안: API 키 환경 변수 관리, HTTPS
  • 로깅: 모든 검색 쿼리 및 결과 기록
  • A/B 테스트 준비: 실험 플래그 및 메트릭 수집

This post is licensed under CC BY 4.0 by the author.