LLMLLM API 개발 · 5중급

완성 프로젝트 — RAG 기반 AI 챗봇 서비스 만들기

LLMFastAPIRAG챗봇프로젝트Python

지금까지 배운 것을 하나로

이번 편은 시리즈의 최종 프로젝트입니다.

flowchart LR
    subgraph SKILLS["활용 기술"]
        S1["편 1\nOpenAI API 기초"]
        S2["편 2\n스트리밍 + 비동기"]
        S3["편 3\nFunction Calling"]
        S4["편 4\nRAG + ChromaDB"]
    end

    SKILLS -->|"통합"| PROJ["문서 기반\nAI 챗봇 서비스"]

프로젝트 구조

chatbot/
├── main.py          # FastAPI 앱
├── rag.py           # RAG 파이프라인
├── chat.py          # 챗봇 로직
├── documents/       # 인덱싱할 문서
│   ├── faq.txt
│   └── manual.txt
├── .env
└── requirements.txt
# requirements.txt
openai>=1.0.0
chromadb>=0.4.0
fastapi>=0.100.0
uvicorn>=0.23.0
python-dotenv>=1.0.0
pydantic>=2.0.0

RAG 모듈 (rag.py)

import os
from pathlib import Path
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI

client = OpenAI()
chroma_client = chromadb.PersistentClient(path="./chroma_db")  # 디스크 저장
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key=os.getenv("OPENAI_API_KEY"),
    model_name="text-embedding-3-small"
)

COLLECTION_NAME = "chatbot_docs"

def get_or_create_collection():
    return chroma_client.get_or_create_collection(
        name=COLLECTION_NAME,
        embedding_function=openai_ef
    )

def chunk_text(text: str, size: int = 400, overlap: int = 50) -> list[str]:
    chunks, start = [], 0
    while start < len(text):
        chunk = text[start:start + size]
        chunks.append(chunk.strip())
        start += size - overlap
    return [c for c in chunks if len(c) > 30]

def index_file(file_path: str):
    path = Path(file_path)
    text = path.read_text(encoding="utf-8")
    collection = get_or_create_collection()
    
    # 이미 인덱싱된 문서 확인
    existing = collection.get(where={"source": path.name})
    if existing["ids"]:
        print(f"이미 인덱싱됨: {path.name}")
        return
    
    chunks = chunk_text(text)
    collection.add(
        ids=[f"{path.stem}_chunk_{i}" for i in range(len(chunks))],
        documents=chunks,
        metadatas=[{"source": path.name, "chunk_index": i} for i in range(len(chunks))]
    )
    print(f"✅ {path.name}: {len(chunks)}개 청크 인덱싱")

def search(query: str, top_k: int = 4) -> list[dict]:
    collection = get_or_create_collection()
    results = collection.query(
        query_texts=[query],
        n_results=top_k,
        include=["documents", "metadatas", "distances"]
    )
    return [
        {
            "text": doc,
            "source": meta["source"],
            "similarity": round(1 - dist, 3)
        }
        for doc, meta, dist in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0]
        )
        if 1 - dist > 0.3  # 유사도 임계값
    ]

챗봇 로직 (chat.py)

flowchart TB
    MSG["사용자 메시지"] --> RAG["RAG 검색"]
    RAG -->|"관련 문서"| CTX["시스템 프롬프트\n+ 문서 컨텍스트"]
    CTX --> HIST["대화 기록\n(최근 10턴)"]
    HIST --> LLM["GPT-4o-mini\n(스트리밍)"]
    LLM --> RESP["스트리밍 응답"]
    RESP --> SAVE["대화 기록 저장"]
from openai import AsyncOpenAI
from typing import AsyncGenerator
import rag

async_client = AsyncOpenAI()

SYSTEM_PROMPT = """당신은 친절한 고객 지원 AI 어시스턴트입니다.

아래 제공된 문서를 기반으로만 답변하세요.
문서에 없는 내용은 솔직하게 "해당 정보가 없습니다"라고 말하세요.
답변은 간결하고 명확하게 유지하세요."""

# 세션별 대화 기록 (실제 서비스에서는 Redis 또는 DB 사용)
conversation_history: dict[str, list] = {}

def get_messages(session_id: str, user_message: str) -> list[dict]:
    if session_id not in conversation_history:
        conversation_history[session_id] = []
    
    # RAG 검색
    relevant_docs = rag.search(user_message)
    
    context = ""
    if relevant_docs:
        context = "\n\n[참고 문서]\n" + "\n\n".join([
            f"출처: {doc['source']}\n{doc['text']}"
            for doc in relevant_docs
        ])
    
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT + context}
    ]
    
    # 최근 10턴만 포함 (컨텍스트 윈도우 관리)
    recent_history = conversation_history[session_id][-10:]
    messages.extend(recent_history)
    messages.append({"role": "user", "content": user_message})
    
    return messages, relevant_docs

async def stream_response(session_id: str, user_message: str) -> AsyncGenerator[str, None]:
    messages, relevant_docs = get_messages(session_id, user_message)
    
    full_response = []
    
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.7,
        stream=True
    )
    
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            full_response.append(content)
            yield content
    
    # 대화 기록 저장
    assistant_reply = "".join(full_response)
    conversation_history[session_id].append(
        {"role": "user", "content": user_message}
    )
    conversation_history[session_id].append(
        {"role": "assistant", "content": assistant_reply}
    )
    
    # 출처 정보 전달
    if relevant_docs:
        sources = list(set(d["source"] for d in relevant_docs))
        yield f"\n\n---\n📄 출처: {', '.join(sources)}"

FastAPI 서버 (main.py)

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import rag
import chat
from pathlib import Path
import uuid

app = FastAPI(title="RAG 챗봇 API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

class ChatRequest(BaseModel):
    message: str
    session_id: str = None

@app.on_event("startup")
async def startup():
    # 시작 시 문서 자동 인덱싱
    docs_dir = Path("./documents")
    if docs_dir.exists():
        for file in docs_dir.glob("*.txt"):
            rag.index_file(str(file))

@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
    session_id = request.session_id or str(uuid.uuid4())
    
    if not request.message.strip():
        raise HTTPException(400, "메시지를 입력해주세요")
    
    return StreamingResponse(
        chat.stream_response(session_id, request.message),
        media_type="text/plain",
        headers={"X-Session-Id": session_id}
    )

@app.delete("/chat/{session_id}")
async def clear_history(session_id: str):
    chat.conversation_history.pop(session_id, None)
    return {"message": "대화 기록 초기화 완료"}

@app.get("/health")
async def health():
    return {"status": "ok"}

실행 및 테스트

# 서버 실행
uvicorn main:app --reload --port 8000

# 테스트 (cURL)
curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "연차는 몇 일인가요?"}' \
  --no-buffer
# Python 클라이언트 테스트
import httpx

async def test_chat():
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/chat",
            json={"message": "재택근무 정책 알려줘", "session_id": "test-123"}
        ) as response:
            async for chunk in response.aiter_text():
                print(chunk, end="", flush=True)

개선 방향

flowchart TB
    BASIC["현재 구현"] --> NEXT

    subgraph NEXT["다음 단계 개선"]
        N1["대화 기록 → Redis 영속화"]
        N2["하이브리드 검색\n(벡터 + BM25)"]
        N3["리랭킹 모델\n(Cohere Rerank)"]
        N4["사용자 인증\n(JWT)"]
        N5["비용 추적\n(토큰 사용량 로깅)"]
    end

전체 시리즈 복습

flowchart LR
    E1["편 1\nAPI 기초\n첫 호출"] 
    E2["편 2\n스트리밍\n실시간 UX"]
    E3["편 3\nFunction Calling\n도구 연결"]
    E4["편 4\nRAG\n문서 검색"]
    E5["편 5\n통합 프로젝트\n챗봇 서비스"]

    E1 --> E2 --> E3 --> E4 --> E5
핵심 기술구현한 것
1OpenAI client감정 분류기
2stream, asyncio배치 처리 + FastAPI 스트리밍
3tools, tool_calls날씨 + 계산 에이전트
4ChromaDB, RAG문서 Q&A 시스템
5통합완성 챗봇 서비스

이것으로 LLM API 개발 시리즈를 마칩니다.

LLM 기초 → 프롬프트 엔지니어링 → LLM API 개발까지, 이제 여러분은 직접 AI 서비스를 기획하고 구현할 준비가 됐습니다.

궁금한 점이 있으신가요?

협업·의뢰는 아래로, 가벼운 소통은 인스타그램 @bluefox._.hi도 환영이에요.