지금까지 배운 것을 하나로
이번 편은 시리즈의 최종 프로젝트입니다.
프로젝트 구조
chatbot/
├── main.py # FastAPI 앱
├── rag.py # RAG 파이프라인
├── chat.py # 챗봇 로직
├── documents/ # 인덱싱할 문서
│ ├── faq.txt
│ └── manual.txt
├── .env
└── requirements.txt
# requirements.txt
openai>=1.0.0
chromadb>=0.4.0
fastapi>=0.100.0
uvicorn>=0.23.0
python-dotenv>=1.0.0
pydantic>=2.0.0
RAG 모듈 (rag.py)
import os
from pathlib import Path
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
client = OpenAI()
chroma_client = chromadb.PersistentClient(path="./chroma_db") # 디스크 저장
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small"
)
COLLECTION_NAME = "chatbot_docs"
def get_or_create_collection():
return chroma_client.get_or_create_collection(
name=COLLECTION_NAME,
embedding_function=openai_ef
)
def chunk_text(text: str, size: int = 400, overlap: int = 50) -> list[str]:
chunks, start = [], 0
while start < len(text):
chunk = text[start:start + size]
chunks.append(chunk.strip())
start += size - overlap
return [c for c in chunks if len(c) > 30]
def index_file(file_path: str):
path = Path(file_path)
text = path.read_text(encoding="utf-8")
collection = get_or_create_collection()
# 이미 인덱싱된 문서 확인
existing = collection.get(where={"source": path.name})
if existing["ids"]:
print(f"이미 인덱싱됨: {path.name}")
return
chunks = chunk_text(text)
collection.add(
ids=[f"{path.stem}_chunk_{i}" for i in range(len(chunks))],
documents=chunks,
metadatas=[{"source": path.name, "chunk_index": i} for i in range(len(chunks))]
)
print(f"✅ {path.name}: {len(chunks)}개 청크 인덱싱")
def search(query: str, top_k: int = 4) -> list[dict]:
collection = get_or_create_collection()
results = collection.query(
query_texts=[query],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
return [
{
"text": doc,
"source": meta["source"],
"similarity": round(1 - dist, 3)
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
if 1 - dist > 0.3 # 유사도 임계값
]
챗봇 로직 (chat.py)
from openai import AsyncOpenAI
from typing import AsyncGenerator
import rag
async_client = AsyncOpenAI()
SYSTEM_PROMPT = """당신은 친절한 고객 지원 AI 어시스턴트입니다.
아래 제공된 문서를 기반으로만 답변하세요.
문서에 없는 내용은 솔직하게 "해당 정보가 없습니다"라고 말하세요.
답변은 간결하고 명확하게 유지하세요."""
# 세션별 대화 기록 (실제 서비스에서는 Redis 또는 DB 사용)
conversation_history: dict[str, list] = {}
def get_messages(session_id: str, user_message: str) -> list[dict]:
if session_id not in conversation_history:
conversation_history[session_id] = []
# RAG 검색
relevant_docs = rag.search(user_message)
context = ""
if relevant_docs:
context = "\n\n[참고 문서]\n" + "\n\n".join([
f"출처: {doc['source']}\n{doc['text']}"
for doc in relevant_docs
])
messages = [
{"role": "system", "content": SYSTEM_PROMPT + context}
]
# 최근 10턴만 포함 (컨텍스트 윈도우 관리)
recent_history = conversation_history[session_id][-10:]
messages.extend(recent_history)
messages.append({"role": "user", "content": user_message})
return messages, relevant_docs
async def stream_response(session_id: str, user_message: str) -> AsyncGenerator[str, None]:
messages, relevant_docs = get_messages(session_id, user_message)
full_response = []
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.7,
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
yield content
# 대화 기록 저장
assistant_reply = "".join(full_response)
conversation_history[session_id].append(
{"role": "user", "content": user_message}
)
conversation_history[session_id].append(
{"role": "assistant", "content": assistant_reply}
)
# 출처 정보 전달
if relevant_docs:
sources = list(set(d["source"] for d in relevant_docs))
yield f"\n\n---\n📄 출처: {', '.join(sources)}"
FastAPI 서버 (main.py)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import rag
import chat
from pathlib import Path
import uuid
app = FastAPI(title="RAG 챗봇 API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
session_id: str = None
@app.on_event("startup")
async def startup():
# 시작 시 문서 자동 인덱싱
docs_dir = Path("./documents")
if docs_dir.exists():
for file in docs_dir.glob("*.txt"):
rag.index_file(str(file))
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
if not request.message.strip():
raise HTTPException(400, "메시지를 입력해주세요")
return StreamingResponse(
chat.stream_response(session_id, request.message),
media_type="text/plain",
headers={"X-Session-Id": session_id}
)
@app.delete("/chat/{session_id}")
async def clear_history(session_id: str):
chat.conversation_history.pop(session_id, None)
return {"message": "대화 기록 초기화 완료"}
@app.get("/health")
async def health():
return {"status": "ok"}
실행 및 테스트
# 서버 실행
uvicorn main:app --reload --port 8000
# 테스트 (cURL)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "연차는 몇 일인가요?"}' \
--no-buffer
# Python 클라이언트 테스트
import httpx
async def test_chat():
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/chat",
json={"message": "재택근무 정책 알려줘", "session_id": "test-123"}
) as response:
async for chunk in response.aiter_text():
print(chunk, end="", flush=True)
개선 방향
전체 시리즈 복습
| 편 | 핵심 기술 | 구현한 것 |
|---|---|---|
| 1 | OpenAI client | 감정 분류기 |
| 2 | stream, asyncio | 배치 처리 + FastAPI 스트리밍 |
| 3 | tools, tool_calls | 날씨 + 계산 에이전트 |
| 4 | ChromaDB, RAG | 문서 Q&A 시스템 |
| 5 | 통합 | 완성 챗봇 서비스 |
이것으로 LLM API 개발 시리즈를 마칩니다.
LLM 기초 → 프롬프트 엔지니어링 → LLM API 개발까지, 이제 여러분은 직접 AI 서비스를 기획하고 구현할 준비가 됐습니다.