はじめに
PS-SLの佐々木です。
アドベントカレンダー20日目の記事になります。 今回の記事ではChatGPTやClaudeのようにAIの回答をリアルタイムで表示する方法について解説します。
なぜSSEが必要か
LLMを使ったチャットアプリでは、回答生成に数秒〜数十秒かかることがあります。
ユーザー: 「この資料を分析して」
↓
... 10秒待機 ...
↓
AI: 「分析結果は...(長文)」
全文が完成するまで何も表示されないのはUXが悪い。ChatGPTのように文字が順次表示される体験を実現するには、SSE(Server-Sent Events) が有効です。
SSEとWebSocketの違い
| 項目 | SSE | WebSocket |
|---|---|---|
| 通信方向 | サーバー → クライアント(単方向) | 双方向 |
| プロトコル | HTTP | 独自プロトコル |
| 再接続 | 自動 | 手動実装が必要 |
| 実装難易度 | 低い | 高い |
| ユースケース | AI回答、通知、ログ | チャット、ゲーム |
AIの回答ストリーミングはサーバー→クライアントの単方向通信なので、SSEで十分です。
実装
1. 必要なライブラリ
pip install fastapi sse-starlette
sse-starletteはFastAPIでSSEを簡単に扱うためのライブラリです。
2. イベント型の定義
まず、ストリーミングするイベントの型を定義します。
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StreamEvent:
"""SSEで送信するイベント"""
type: str # イベント種別
content: str | None = None # テキストコンテンツ
tool: str | None = None # ツール名
result: dict[str, Any] | None = None # ツール実行結果
message: str | None = None # エラーメッセージ等
def to_dict(self) -> dict[str, Any]:
"""JSON変換用"""
data: dict[str, Any] = {"type": self.type}
if self.content is not None:
data["content"] = self.content
if self.tool is not None:
data["tool"] = self.tool
if self.result is not None:
data["result"] = self.result
if self.message is not None:
data["message"] = self.message
return data
3. イベント種別の設計
AIエージェントの処理状況を伝えるため、複数のイベント種別を用意します。
| イベント | 用途 | 例 |
|---|---|---|
thinking |
処理中表示 | 「分析中…」 |
text_delta |
回答の差分 | 「結果」「は」「以下」… |
tool_call |
ツール呼び出し開始 | 検索ツール起動 |
tool_result |
ツール実行結果 | 5件ヒット |
final_answer |
最終回答 | 全文 |
error |
エラー | タイムアウト等 |
done |
完了 | ストリーム終了 |
4. FastAPIエンドポイント
import json
from collections.abc import AsyncGenerator
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
router = APIRouter()
@router.post("/chat")
async def chat(request: ChatRequest) -> EventSourceResponse:
"""SSEでAI回答をストリーミング"""
async def event_generator() -> AsyncGenerator[dict[str, Any], None]:
try:
# 処理開始を通知
yield {
"event": "thinking",
"data": json.dumps({"type": "thinking", "content": "処理中..."}),
}
# AIエージェントからイベントを受け取りながら送信
async for event in agent.stream(request.message):
yield {
"event": event.type,
"data": json.dumps(event.to_dict(), ensure_ascii=False),
}
# 完了を通知
yield {
"event": "done",
"data": json.dumps({"type": "done"}),
}
except Exception as e:
yield {
"event": "error",
"data": json.dumps({"type": "error", "message": str(e)}),
}
return EventSourceResponse(
event_generator(),
media_type="text/event-stream",
)
ポイント:
EventSourceResponseにAsyncGeneratorを渡すyieldでイベントを送信(eventとdataのdict)ensure_ascii=Falseで日本語を正しく送信
5. LangGraphからのイベント変換
LangGraphのastream_eventsを使ってイベントを取得し、SSE用に変換します。
async def stream(self, message: str) -> AsyncGenerator[StreamEvent, None]:
"""LangGraphワークフローをストリーミング実行"""
initial_state = {"messages": [HumanMessage(content=message)]}
async for event in self._graph.astream_events(initial_state, version="v2"):
event_type = event.get("event")
event_name = event.get("name", "")
event_data = event.get("data", {})
# LLMのストリーミング出力
if event_type == "on_chat_model_stream":
chunk = event_data.get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
yield StreamEvent(type="text_delta", content=chunk.content)
# ツール呼び出し開始
elif event_type == "on_tool_start":
yield StreamEvent(
type="tool_call",
tool=event_name,
input=event_data.get("input", {}),
)
# ツール実行完了
elif event_type == "on_tool_end":
output = event_data.get("output", "")
yield StreamEvent(
type="tool_result",
tool=event_name,
result=parse_tool_output(output),
)
# ワークフロー完了
elif event_type == "on_chain_end" and event_name == "LangGraph":
final_answer = extract_final_answer(event_data)
if final_answer:
yield StreamEvent(type="final_answer", content=final_answer)
yield StreamEvent(type="done")
フロントエンド実装
JavaScript(fetch API)
async function streamChat(message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') continue;
try {
const event = JSON.parse(data);
handleEvent(event);
} catch {
// パースエラーは無視
}
}
}
}
}
function handleEvent(event) {
switch (event.type) {
case 'thinking':
showThinking(event.content);
break;
case 'text_delta':
appendText(event.content);
break;
case 'tool_call':
showToolCall(event.tool, event.input);
break;
case 'tool_result':
showToolResult(event.tool, event.result);
break;
case 'final_answer':
setFinalAnswer(event.content);
break;
case 'error':
showError(event.message);
break;
case 'done':
finishStream();
break;
}
}
React実装例
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
// SSEイベントハンドラ
const handleEvent = (event: StreamEvent) => {
switch (event.type) {
case 'text_delta':
setContent(prev => prev + event.content);
break;
case 'final_answer':
setContent(event.content);
break;
case 'done':
setIsStreaming(false);
break;
}
};
実用的なTips
1. エラーハンドリング
async def event_generator():
try:
async for event in agent.stream(message):
yield format_event(event)
except asyncio.TimeoutError:
yield format_event(StreamEvent(type="error", message="タイムアウト"))
except Exception as e:
logger.error(f"Stream error: {e}")
yield format_event(StreamEvent(type="error", message=str(e)))
finally:
yield format_event(StreamEvent(type="done"))
2. タイムアウト設定
from asyncio import timeout
async def event_generator():
async with timeout(120): # 2分でタイムアウト
async for event in agent.stream(message):
yield format_event(event)
3. ログ出力
本番環境ではイベントログが重要です。
elif event.type == "tool_call":
logger.info(f"[TOOL] {event.tool} called with {event.input}")
elif event.type == "error":
logger.error(f"[ERROR] {event.message}")
elif event.type == "done":
logger.info(f"[DONE] Total events: {event_count}")
4. CORSの設定
フロントエンドが別オリジンの場合:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
まとめ
| 項目 | 実装 |
|---|---|
| ライブラリ | sse-starlette |
| バックエンド | EventSourceResponse + AsyncGenerator |
| フロントエンド | fetch + reader.read() |
| イベント設計 | 処理状況を細かく通知 |
SSEを使うことで、LLMの回答がリアルタイムで表示され、ユーザーは処理状況を把握しながら待つことができます。
