【FastAPI】SSEでAIの回答をリアルタイムストリーミングする

はじめに

PS-SLの佐々木です。
アドベントカレンダー20日目の記事になります。 今回の記事ではChatGPTやClaudeのようにAIの回答をリアルタイムで表示する方法について解説します。

なぜSSEが必要か

LLMを使ったチャットアプリでは、回答生成に数秒〜数十秒かかることがあります。

ユーザー: 「この資料を分析して」
     ↓
    ... 10秒待機 ...
     ↓
AI: 「分析結果は...(長文)」

全文が完成するまで何も表示されないのはUXが悪い。ChatGPTのように文字が順次表示される体験を実現するには、SSE(Server-Sent Events) が有効です。


SSEとWebSocketの違い

項目 SSE WebSocket
通信方向 サーバー → クライアント(単方向) 双方向
プロトコル HTTP 独自プロトコル
再接続 自動 手動実装が必要
実装難易度 低い 高い
ユースケース AI回答、通知、ログ チャット、ゲーム

AIの回答ストリーミングはサーバー→クライアントの単方向通信なので、SSEで十分です。


実装

1. 必要なライブラリ

pip install fastapi sse-starlette

sse-starletteはFastAPIでSSEを簡単に扱うためのライブラリです。

2. イベント型の定義

まず、ストリーミングするイベントの型を定義します。

from dataclasses import dataclass, field
from typing import Any
@dataclass
class StreamEvent:
    """SSEで送信するイベント"""

    type: str  # イベント種別
    content: str | None = None  # テキストコンテンツ
    tool: str | None = None  # ツール名
    result: dict[str, Any] | None = None  # ツール実行結果
    message: str | None = None  # エラーメッセージ等

    def to_dict(self) -> dict[str, Any]:
        """JSON変換用"""
        data: dict[str, Any] = {"type": self.type}
        if self.content is not None:
            data["content"] = self.content
        if self.tool is not None:
            data["tool"] = self.tool
        if self.result is not None:
            data["result"] = self.result
        if self.message is not None:
            data["message"] = self.message
        return data

3. イベント種別の設計

AIエージェントの処理状況を伝えるため、複数のイベント種別を用意します。

イベント 用途
thinking 処理中表示 「分析中…」
text_delta 回答の差分 「結果」「は」「以下」…
tool_call ツール呼び出し開始 検索ツール起動
tool_result ツール実行結果 5件ヒット
final_answer 最終回答 全文
error エラー タイムアウト等
done 完了 ストリーム終了

4. FastAPIエンドポイント

import json
from collections.abc import AsyncGenerator
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse

router = APIRouter()

@router.post("/chat")
async def chat(request: ChatRequest) -> EventSourceResponse:
    """SSEでAI回答をストリーミング"""

    async def event_generator() -> AsyncGenerator[dict[str, Any], None]:
        try:
            # 処理開始を通知
            yield {
                "event": "thinking",
                "data": json.dumps({"type": "thinking", "content": "処理中..."}),
            }

            # AIエージェントからイベントを受け取りながら送信
            async for event in agent.stream(request.message):
                yield {
                    "event": event.type,
                    "data": json.dumps(event.to_dict(), ensure_ascii=False),
                }

            # 完了を通知
            yield {
                "event": "done",
                "data": json.dumps({"type": "done"}),
            }

        except Exception as e:
            yield {
                "event": "error",
                "data": json.dumps({"type": "error", "message": str(e)}),
            }

    return EventSourceResponse(
        event_generator(),
        media_type="text/event-stream",
    )

ポイント:

  • EventSourceResponseAsyncGeneratorを渡す
  • yieldでイベントを送信(eventdataのdict)
  • ensure_ascii=Falseで日本語を正しく送信

5. LangGraphからのイベント変換

LangGraphのastream_eventsを使ってイベントを取得し、SSE用に変換します。

async def stream(self, message: str) -> AsyncGenerator[StreamEvent, None]:
    """LangGraphワークフローをストリーミング実行"""

    initial_state = {"messages": [HumanMessage(content=message)]}

    async for event in self._graph.astream_events(initial_state, version="v2"):
        event_type = event.get("event")
        event_name = event.get("name", "")
        event_data = event.get("data", {})

        # LLMのストリーミング出力
        if event_type == "on_chat_model_stream":
            chunk = event_data.get("chunk")
            if chunk and hasattr(chunk, "content") and chunk.content:
                yield StreamEvent(type="text_delta", content=chunk.content)

        # ツール呼び出し開始
        elif event_type == "on_tool_start":
            yield StreamEvent(
                type="tool_call",
                tool=event_name,
                input=event_data.get("input", {}),
            )

        # ツール実行完了
        elif event_type == "on_tool_end":
            output = event_data.get("output", "")
            yield StreamEvent(
                type="tool_result",
                tool=event_name,
                result=parse_tool_output(output),
            )

        # ワークフロー完了
        elif event_type == "on_chain_end" and event_name == "LangGraph":
            final_answer = extract_final_answer(event_data)
            if final_answer:
                yield StreamEvent(type="final_answer", content=final_answer)

    yield StreamEvent(type="done")

フロントエンド実装

JavaScript(fetch API)

async function streamChat(message) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);
        if (data === '[DONE]') continue;

        try {
          const event = JSON.parse(data);
          handleEvent(event);
        } catch {
          // パースエラーは無視
        }
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'thinking':
      showThinking(event.content);
      break;
    case 'text_delta':
      appendText(event.content);
      break;
    case 'tool_call':
      showToolCall(event.tool, event.input);
      break;
    case 'tool_result':
      showToolResult(event.tool, event.result);
      break;
    case 'final_answer':
      setFinalAnswer(event.content);
      break;
    case 'error':
      showError(event.message);
      break;
    case 'done':
      finishStream();
      break;
  }
}

React実装例

const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);

// SSEイベントハンドラ
const handleEvent = (event: StreamEvent) => {
  switch (event.type) {
    case 'text_delta':
      setContent(prev => prev + event.content);
      break;
    case 'final_answer':
      setContent(event.content);
      break;
    case 'done':
      setIsStreaming(false);
      break;
  }
};

実用的なTips

1. エラーハンドリング

async def event_generator():
    try:
        async for event in agent.stream(message):
            yield format_event(event)
    except asyncio.TimeoutError:
        yield format_event(StreamEvent(type="error", message="タイムアウト"))
    except Exception as e:
        logger.error(f"Stream error: {e}")
        yield format_event(StreamEvent(type="error", message=str(e)))
    finally:
        yield format_event(StreamEvent(type="done"))

2. タイムアウト設定

from asyncio import timeout

async def event_generator():
    async with timeout(120):  # 2分でタイムアウト
        async for event in agent.stream(message):
            yield format_event(event)

3. ログ出力

本番環境ではイベントログが重要です。

elif event.type == "tool_call":
    logger.info(f"[TOOL] {event.tool} called with {event.input}")
elif event.type == "error":
    logger.error(f"[ERROR] {event.message}")
elif event.type == "done":
    logger.info(f"[DONE] Total events: {event_count}")

4. CORSの設定

フロントエンドが別オリジンの場合:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"],
)

まとめ

項目 実装
ライブラリ sse-starlette
バックエンド EventSourceResponse + AsyncGenerator
フロントエンド fetch + reader.read()
イベント設計 処理状況を細かく通知

SSEを使うことで、LLMの回答がリアルタイムで表示され、ユーザーは処理状況を把握しながら待つことができます。


参考

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です