ChatGPT처럼 글자가 흘러나오게 만들기
일반 API 호출은 응답이 완성될 때까지 기다립니다. 긴 응답은 5~10초 이상 화면이 멈춥니다.
flowchart LR
subgraph BLOCKING["일반 호출 (차단)"]
B1["요청"] -->|"5~10초 대기"| B2["전체 응답 수신"]
B2 --> B3["화면에 한꺼번에 출력"]
end
subgraph STREAMING["스트리밍 호출"]
S1["요청"] -->|"즉시 시작"| S2["토큰 1"]
S2 --> S3["토큰 2"]
S3 --> S4["..."]
S4 --> S5["마지막 토큰"]
end
스트리밍을 쓰면 첫 토큰이 즉시 출력되어 사용자가 더 빠르다고 느낍니다.
스트리밍 기본 구현
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "파이썬의 장점을 설명해줘"}],
stream=True # 스트리밍 활성화
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # 마지막 줄바꿈
stream=True 하나로 전환됩니다. 응답은 작은 chunk 단위로 순서대로 도착합니다.
스트리밍 청크 구조
flowchart TB
subgraph CHUNK["스트리밍 청크"]
C1["chunk.id\n요청 ID"]
C2["chunk.choices[0].delta.content\n이번 청크의 텍스트 조각"]
C3["chunk.choices[0].finish_reason\nnull → 계속 / 'stop' → 완료"]
end
for chunk in stream:
delta = chunk.choices[0].delta
finish = chunk.choices[0].finish_reason
if delta.content:
print(delta.content, end="", flush=True)
if finish == "stop":
print("\n[스트리밍 완료]")
스트리밍 텍스트 수집
스트리밍하면서 전체 텍스트를 나중에 사용하고 싶을 때:
def stream_and_collect(messages: list) -> str:
full_response = []
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
print()
return "".join(full_response)
result = stream_and_collect([
{"role": "user", "content": "간단한 파이썬 정렬 코드 예시"}
])
# 이후 result를 DB에 저장하거나 후처리
비동기 처리: 여러 요청 동시에
flowchart LR
subgraph SYNC["동기 처리"]
R1["요청 1\n(3초)"]
R2["요청 2\n(3초)"]
R3["요청 3\n(3초)"]
R1 --> R2 --> R3
T1["총 9초"]
end
subgraph ASYNC["비동기 처리"]
A1["요청 1"]
A2["요청 2"]
A3["요청 3"]
A1 & A2 & A3 -->|"동시 실행"| T2["총 ~3초"]
end
여러 텍스트를 동시에 처리할 때 비동기를 쓰면 N배 빠릅니다.
비동기 API 호출
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def analyze_one(text: str, idx: int) -> dict:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
messages=[
{"role": "system", "content": "감정을 positive/negative/neutral로 분류해줘. 단어 하나만 출력."},
{"role": "user", "content": text}
]
)
return {
"idx": idx,
"text": text[:30],
"sentiment": response.choices[0].message.content.strip()
}
async def analyze_batch(texts: list[str]) -> list[dict]:
tasks = [analyze_one(text, i) for i, text in enumerate(texts)]
results = await asyncio.gather(*tasks) # 동시 실행
return list(results)
# 실행
reviews = [
"정말 좋은 제품이에요!",
"배송이 너무 느렸습니다.",
"그냥 보통이에요.",
"다시는 구매 안 합니다.",
"최고예요, 추천합니다!"
]
results = asyncio.run(analyze_batch(reviews))
for r in sorted(results, key=lambda x: x["idx"]):
print(f"{r['idx']+1}. [{r['sentiment']}] {r['text']}")
비동기 스트리밍
async def stream_async(prompt: str):
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
print()
asyncio.run(stream_async("FastAPI란 무엇인가요?"))
동시 요청 수 제한 (Rate Limit 관리)
OpenAI API는 분당 요청 수(RPM)와 분당 토큰 수(TPM)에 제한이 있습니다.
flowchart TD
subgraph SEMAPHORE["Semaphore로 동시 요청 제한"]
S["asyncio.Semaphore(10)\n최대 10개 동시"]
T1["Task 1"]
T2["Task 2"]
T3["..."]
T4["Task 100"]
end
T1 & T2 & T3 & T4 -->|"순서 대기"| S
S -->|"10개씩 처리"| API["OpenAI API"]
async def analyze_with_limit(texts: list[str], concurrency: int = 10):
semaphore = asyncio.Semaphore(concurrency)
async def limited_call(text, idx):
async with semaphore: # 최대 concurrency개만 동시 실행
return await analyze_one(text, idx)
tasks = [limited_call(t, i) for i, t in enumerate(texts)]
return await asyncio.gather(*tasks)
FastAPI + 스트리밍: 웹 API 만들기
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio
app = FastAPI()
client = AsyncOpenAI()
async def generate_stream(prompt: str):
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield content
@app.get("/chat")
async def chat(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/plain"
)
이 패턴으로 프론트엔드에서 실시간 타이핑 효과를 구현할 수 있습니다.
성능 비교 측정
import time
async def benchmark():
texts = ["리뷰 텍스트"] * 20
# 동기 처리
start = time.time()
for text in texts:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}]
)
sync_time = time.time() - start
# 비동기 처리
start = time.time()
await analyze_batch(texts)
async_time = time.time() - start
print(f"동기: {sync_time:.1f}초")
print(f"비동기: {async_time:.1f}초")
print(f"속도 향상: {sync_time/async_time:.1f}배")
정리
| 개념 | 내용 | 사용 상황 |
|---|---|---|
| 스트리밍 | 토큰 단위 실시간 출력 | 챗봇, 글쓰기 보조 |
| AsyncOpenAI | 비동기 API 클라이언트 | 대량 처리, 서버 |
| asyncio.gather | 여러 요청 동시 실행 | 배치 분류, 번역 |
| Semaphore | 동시 요청 수 제한 | Rate limit 관리 |
| StreamingResponse | FastAPI 스트리밍 응답 | 웹 서비스 구축 |
다음 편에서는 Function Calling / Tool Use — LLM이 외부 함수를 호출해 실시간 데이터를 처리하는 강력한 기능을 배웁니다.