LLMLLM API 개발 · 2기초

스트리밍과 비동기 — 실시간 응답으로 UX 개선하기

LLMOpenAI스트리밍비동기asyncioPython

ChatGPT처럼 글자가 흘러나오게 만들기

일반 API 호출은 응답이 완성될 때까지 기다립니다. 긴 응답은 5~10초 이상 화면이 멈춥니다.

flowchart LR
    subgraph BLOCKING["일반 호출 (차단)"]
        B1["요청"] -->|"5~10초 대기"| B2["전체 응답 수신"]
        B2 --> B3["화면에 한꺼번에 출력"]
    end

    subgraph STREAMING["스트리밍 호출"]
        S1["요청"] -->|"즉시 시작"| S2["토큰 1"]
        S2 --> S3["토큰 2"]
        S3 --> S4["..."]
        S4 --> S5["마지막 토큰"]
    end

스트리밍을 쓰면 첫 토큰이 즉시 출력되어 사용자가 더 빠르다고 느낍니다.


스트리밍 기본 구현

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "파이썬의 장점을 설명해줘"}],
    stream=True  # 스트리밍 활성화
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

print()  # 마지막 줄바꿈

stream=True 하나로 전환됩니다. 응답은 작은 chunk 단위로 순서대로 도착합니다.


스트리밍 청크 구조

flowchart TB
    subgraph CHUNK["스트리밍 청크"]
        C1["chunk.id\n요청 ID"]
        C2["chunk.choices[0].delta.content\n이번 청크의 텍스트 조각"]
        C3["chunk.choices[0].finish_reason\nnull → 계속 / 'stop' → 완료"]
    end
for chunk in stream:
    delta = chunk.choices[0].delta
    finish = chunk.choices[0].finish_reason
    
    if delta.content:
        print(delta.content, end="", flush=True)
    
    if finish == "stop":
        print("\n[스트리밍 완료]")

스트리밍 텍스트 수집

스트리밍하면서 전체 텍스트를 나중에 사용하고 싶을 때:

def stream_and_collect(messages: list) -> str:
    full_response = []
    
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True
    )
    
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            full_response.append(content)
            print(content, end="", flush=True)
    
    print()
    return "".join(full_response)

result = stream_and_collect([
    {"role": "user", "content": "간단한 파이썬 정렬 코드 예시"}
])
# 이후 result를 DB에 저장하거나 후처리

비동기 처리: 여러 요청 동시에

flowchart LR
    subgraph SYNC["동기 처리"]
        R1["요청 1\n(3초)"]
        R2["요청 2\n(3초)"]
        R3["요청 3\n(3초)"]
        R1 --> R2 --> R3
        T1["총 9초"]
    end

    subgraph ASYNC["비동기 처리"]
        A1["요청 1"]
        A2["요청 2"]
        A3["요청 3"]
        A1 & A2 & A3 -->|"동시 실행"| T2["총 ~3초"]
    end

여러 텍스트를 동시에 처리할 때 비동기를 쓰면 N배 빠릅니다.


비동기 API 호출

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def analyze_one(text: str, idx: int) -> dict:
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        temperature=0,
        messages=[
            {"role": "system", "content": "감정을 positive/negative/neutral로 분류해줘. 단어 하나만 출력."},
            {"role": "user", "content": text}
        ]
    )
    return {
        "idx": idx,
        "text": text[:30],
        "sentiment": response.choices[0].message.content.strip()
    }

async def analyze_batch(texts: list[str]) -> list[dict]:
    tasks = [analyze_one(text, i) for i, text in enumerate(texts)]
    results = await asyncio.gather(*tasks)  # 동시 실행
    return list(results)

# 실행
reviews = [
    "정말 좋은 제품이에요!",
    "배송이 너무 느렸습니다.",
    "그냥 보통이에요.",
    "다시는 구매 안 합니다.",
    "최고예요, 추천합니다!"
]

results = asyncio.run(analyze_batch(reviews))
for r in sorted(results, key=lambda x: x["idx"]):
    print(f"{r['idx']+1}. [{r['sentiment']}] {r['text']}")

비동기 스트리밍

async def stream_async(prompt: str):
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)
    print()

asyncio.run(stream_async("FastAPI란 무엇인가요?"))

동시 요청 수 제한 (Rate Limit 관리)

OpenAI API는 분당 요청 수(RPM)와 분당 토큰 수(TPM)에 제한이 있습니다.

flowchart TD
    subgraph SEMAPHORE["Semaphore로 동시 요청 제한"]
        S["asyncio.Semaphore(10)\n최대 10개 동시"]
        T1["Task 1"]
        T2["Task 2"]
        T3["..."]
        T4["Task 100"]
    end

    T1 & T2 & T3 & T4 -->|"순서 대기"| S
    S -->|"10개씩 처리"| API["OpenAI API"]
async def analyze_with_limit(texts: list[str], concurrency: int = 10):
    semaphore = asyncio.Semaphore(concurrency)
    
    async def limited_call(text, idx):
        async with semaphore:  # 최대 concurrency개만 동시 실행
            return await analyze_one(text, idx)
    
    tasks = [limited_call(t, i) for i, t in enumerate(texts)]
    return await asyncio.gather(*tasks)

FastAPI + 스트리밍: 웹 API 만들기

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio

app = FastAPI()
client = AsyncOpenAI()

async def generate_stream(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            yield content

@app.get("/chat")
async def chat(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/plain"
    )

이 패턴으로 프론트엔드에서 실시간 타이핑 효과를 구현할 수 있습니다.


성능 비교 측정

import time

async def benchmark():
    texts = ["리뷰 텍스트"] * 20
    
    # 동기 처리
    start = time.time()
    for text in texts:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": text}]
        )
    sync_time = time.time() - start
    
    # 비동기 처리
    start = time.time()
    await analyze_batch(texts)
    async_time = time.time() - start
    
    print(f"동기: {sync_time:.1f}초")
    print(f"비동기: {async_time:.1f}초")
    print(f"속도 향상: {sync_time/async_time:.1f}배")

정리

개념내용사용 상황
스트리밍토큰 단위 실시간 출력챗봇, 글쓰기 보조
AsyncOpenAI비동기 API 클라이언트대량 처리, 서버
asyncio.gather여러 요청 동시 실행배치 분류, 번역
Semaphore동시 요청 수 제한Rate limit 관리
StreamingResponseFastAPI 스트리밍 응답웹 서비스 구축

다음 편에서는 Function Calling / Tool Use — LLM이 외부 함수를 호출해 실시간 데이터를 처리하는 강력한 기능을 배웁니다.

궁금한 점이 있으신가요?

협업·의뢰는 아래로, 가벼운 소통은 인스타그램 @bluefox._.hi도 환영이에요.