A2Aプロトコルでマルチエージェントアプリを作ってみた|ライター×校閲者ワークフロー

こんにちは、サイオステクノロジーの遠藤です。

前回のブログ「A2A(Agent2Agent)プロトコル入門|MCPとの違いと活用メリット」では、A2Aプロトコルの概要を解説しました。

今回は、実際にA2Aプロトコルを使ってマルチエージェントアプリを作ってみたので、その実装を紹介します。


5秒でわかる:この記事の内容

項目内容
やったことA2Aプロトコルで「ライター×校閲者」マルチエージェントアプリを構築
得られるものA2Aエージェントの実装パターン、Python SDKの使い方
対象読者A2Aを実際に試したい人、マルチエージェント開発に興味がある人

関連記事:より深く理解するために

A2A自体について理解したい方は、前回のブログを先に読むことをおすすめします。

前回ブログ: A2A(Agent2Agent)プロトコル入門|MCPとの違いと活用メリット


TL;DR

  • 「ライターエージェント」と「校閲者エージェント」をA2Aで連携させるデモを実装
  • Python SDK(a2a-sdk)を使えば、数十行でA2Aサーバーが構築可能
  • Azure OpenAI(または他のLLM)と組み合わせて、実用的なワークフローを構築

こんな人に読んでほしい

  • A2Aプロトコルを実際に動かしてみたい人
  • マルチエージェントアプリの実装パターンを知りたい人
  • Python + Azure OpenAIでAIアプリを作っている人
  • 「エージェント間連携って実際どう実装するの?」と思っている人

作るもの:編集部ワークフロー

今回作るのは、ブログ記事の執筆→校閲を自動化するワークフローです。

  1. オーケストレーターがライターエージェントにテーマを送信
  2. ライターエージェントがAzure OpenAIを使って初稿を生成
  3. オーケストレーターが初稿を受け取り、校閲者エージェントに送信
  4. 校閲者エージェントが誤字脱字・技術的正確性をチェックし、修正案を返す

実装の全体像


Step 1: 環境セットアップ

前提条件

このデモではuv(Pythonパッケージマネージャー)を使用します。uvの導入がまだの方は、以下の記事を参考にセットアップしてください。

参考: 【Python】uv入門 – pipより高速なパッケージ管理ツール

依存関係のインストール

# pyproject.toml
[project]
name = "a2a-demo"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "a2a-sdk[http-server]>=0.3.0",
    "openai>=1.0.0",
    "httpx>=0.28.0",
    "uvicorn>=0.34.0",
    "python-dotenv>=1.0.0",
]

環境変数の設定

AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4o
AZURE_OPENAI_API_VERSION=2024-02-15-preview

Step 2: ライターエージェントの実装

agent.py – LLM呼び出しロジック

import os
from typing import Literal
from openai import AzureOpenAI
from pydantic import BaseModel


class WriterResponse(BaseModel):
    """ライターエージェントのレスポンス形式"""
    status: Literal["input_required", "completed", "error"] = "input_required"
    content: str


class WriterAgent:
    """ブログ記事の初稿を書くエージェント"""

    SYSTEM_PROMPT = """あなたは技術ブログのライターです。
指定されたテーマについて、読みやすく分かりやすいブログ記事の初稿を書いてください。

## 記事の構成
1. 導入(テーマの背景と記事の目的)
2. 本文(技術的な説明、具体例)
3. まとめ(要点の整理)

## 注意事項
- 技術的に正確な内容を心がける
- 初心者にも分かりやすい表現を使う
- 適切な見出しを付ける
- 500〜800文字程度で簡潔にまとめる"""

    def __init__(self):
        self.client = AzureOpenAI(
            azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            api_key=os.environ["AZURE_OPENAI_API_KEY"],
            api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-15-preview"),
        )
        self.deployment_name = os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"]

    async def invoke(self, theme: str) -> WriterResponse:
        """テーマに基づいてブログ記事の初稿を生成する"""
        if not theme or theme.strip() == "":
            return WriterResponse(
                status="input_required",
                content="記事のテーマを指定してください。",
            )

        response = self.client.chat.completions.create(
            model=self.deployment_name,
            messages=[
                {"role": "system", "content": self.SYSTEM_PROMPT},
                {"role": "user", "content": f"テーマ: {theme}"},
            ],
            temperature=0.7,
            max_tokens=2000,
        )

        return WriterResponse(
            status="completed",
            content=response.choices[0].message.content,
        )

agent_executor.py – A2Aエグゼキューター

ここがA2Aプロトコルの核心部分です。AgentExecutorを継承して、executeメソッドを実装します。

"""Writer Agent Executor - A2Aプロトコル用のエグゼキューター"""

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import Part, TaskState, TextPart
from a2a.utils import new_agent_text_message, new_task

from writer_agent.agent import WriterAgent


class WriterAgentExecutor(AgentExecutor):
    """ライターエージェントのA2Aエグゼキューター"""

    def __init__(self):
        self.agent = WriterAgent()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task

        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, task.id, task.context_id)

        # 処理中ステータスを送信
        await updater.update_status(
            TaskState.working,
            new_agent_text_message("記事を執筆中です...", task.context_id, task.id),
        )

        # ライターエージェントを実行
        result = await self.agent.invoke(query)

        if result.status == "completed":
            # 成果物として記事を追加
            await updater.add_artifact(
                [Part(root=TextPart(text=result.content))],
                name="draft_article",
            )
            await updater.complete()
        else:
            await updater.update_status(
                TaskState.input_required,
                new_agent_text_message(result.content, task.context_id, task.id),
                final=True,
            )

ポイント:

  • RequestContextからユーザー入力を取得
  • TaskUpdaterでタスクの状態を更新
  • 結果はArtifactとして返す

main.py – サーバー起動

"""Writer Agent Server - ライターエージェントのA2Aサーバー"""

import uvicorn
from dotenv import load_dotenv
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill

from writer_agent.agent import WriterAgent
from writer_agent.agent_executor import WriterAgentExecutor

load_dotenv()


def main():
    skill = AgentSkill(
        id="write_article",
        name="ブログ記事ライター",
        description="指定されたテーマでブログ記事の初稿を執筆します",
        tags=["blog", "writing", "article"],
        examples=[
            "Pythonの非同期処理について記事を書いて",
            "Dockerの基本概念を解説する記事を書いて",
        ],
    )

    agent_card = AgentCard(
        name="Writer Agent",
        description="技術ブログの初稿を執筆するエージェント",
        url="http://localhost:10001/",
        version="1.0.0",
        default_input_modes=WriterAgent.SUPPORTED_CONTENT_TYPES,
        default_output_modes=WriterAgent.SUPPORTED_CONTENT_TYPES,
        capabilities=AgentCapabilities(streaming=False),
        skills=[skill],
    )

    request_handler = DefaultRequestHandler(
        agent_executor=WriterAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )

    server = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler,
    )

    print("Starting Writer Agent on http://localhost:10001")
    uvicorn.run(server.build(), host="0.0.0.0", port=10001)


if __name__ == "__main__":
    main()

ポイント:

  • AgentSkillでエージェントの能力を定義
  • AgentCardでメタデータを公開
  • A2AStarletteApplicationでHTTPサーバーを構築

Step 3: 校閲者エージェントの実装

校閲者エージェントも同様の構造です。違いはシステムプロンプトと出力形式です。

agent.py(抜粋)

class ReviewerAgent:
    """ブログ記事を校閲するエージェント"""

    SYSTEM_PROMPT = """あなたは技術ブログの校閲者です。
与えられた記事を以下の観点でチェックし、修正案を提示してください。

## チェック項目
1. **誤字脱字**: タイポや文法ミスがないか
2. **技術的正確性**: 技術的な説明が正確か
3. **読みやすさ**: 文章が分かりやすいか
4. **トーン&マナー**: 技術ブログとして適切なトーンか

## 出力形式(JSON)
{
    "issues": ["問題点1", "問題点2", ...],
    "suggestions": ["改善提案1", "改善提案2", ...],
    "corrected_article": "修正後の記事全文"
}"""

校閲者はJSONで構造化された結果を返し、問題点・改善提案・修正後の記事を明確に分離しています。


Step 4: オーケストレーターの実装

オーケストレーターは、A2Aクライアントとして両エージェントを呼び出します。

"""Orchestrator - ライターと校閲者エージェントを連携させるクライアント"""

import asyncio
from uuid import uuid4
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import MessageSendParams, SendMessageRequest


async def get_agent_client(base_url: str) -> tuple[A2AClient, httpx.AsyncClient]:
    """エージェントのA2Aクライアントを取得する"""
    httpx_client = httpx.AsyncClient(timeout=httpx.Timeout(120.0))
    resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
    agent_card = await resolver.get_agent_card()
    client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)
    return client, httpx_client


async def send_message(client: A2AClient, message: str) -> str:
    """エージェントにメッセージを送信して結果を取得する"""
    payload = {
        "message": {
            "role": "user",
            "parts": [{"kind": "text", "text": message}],
            "messageId": uuid4().hex,
        },
    }
    request = SendMessageRequest(
        id=str(uuid4()),
        params=MessageSendParams(**payload)
    )
    response = await client.send_message(request)
    return extract_result_text(response)


async def run_editorial_workflow(theme: str):
    """編集ワークフローを実行する"""
    print(f"テーマ: {theme}")

    # Step 1: ライターエージェントに接続
    writer_client, writer_http = await get_agent_client("http://localhost:10001")

    # Step 2: ライターに記事を依頼
    draft_article = await send_message(writer_client, theme)
    print("--- 初稿 ---")
    print(draft_article)

    # Step 3: 校閲者エージェントに接続
    reviewer_client, reviewer_http = await get_agent_client("http://localhost:10002")

    # Step 4: 校閲者に記事をレビュー依頼
    review_result = await send_message(reviewer_client, draft_article)
    print("--- 校閲結果 ---")
    print(review_result)

    # クリーンアップ
    await writer_http.aclose()
    await reviewer_http.aclose()

ポイント:

  • A2ACardResolverでAgent Cardを取得
  • A2AClientでメッセージを送信
  • 結果はArtifactから抽出

Step 5: 実行してみる

エージェントを起動(ターミナル2つ)

# ターミナル1: ライターエージェント
cd demo
PYTHONPATH=. uv run python -m writer_agent

# ターミナル2: 校閲者エージェント
cd demo
PYTHONPATH=. uv run python -m reviewer_agent

オーケストレーターを実行

uv run python orchestrator/main.py "grpcとはなにか"

実行結果

============================================================
A2A Protocol Demo: ライター × 校閲者 ワークフロー
============================================================

テーマ: grpcとはなにか

[Step 1] ライターエージェントに接続中...
✓ ライターエージェントに接続しました

[Step 2] 記事の初稿を依頼中...
✓ 初稿を受け取りました

--- 初稿(抜粋) ---
# gRPCとはなにか?初心者でもわかる解説

## 導入:gRPCの背景とこの記事の目的

近年、マイクロサービスやクラウドアプリケーションの普及に伴い、
サービス間の通信技術がますます重要になっています。
そんな中で注目されているのが「gRPC」という通信プロトコルです。
gRPCはGoogleが開発した高速で効率的なRPCフレームワークで、
多くの企業や開発者に支持されています。

### 双方向ストリーミング

gRPCは一方向または双方向のストリーミング通信もサポート。
これにより、リアルタイムのデータ送受信や大容量データの分割送信が可能です。
--- 初稿ここまで ---

[Step 3] 校閲者エージェントに接続中...
✓ 校閲者エージェントに接続しました

[Step 4] 記事の校閲を依頼中...
✓ 校閲結果を受け取りました

--- 校閲結果 ---
【発見した問題点】
  1. gRPCはGoogleが開発したオープンソースであることを明記したほうが良い
  2. 「gRPCは一方向または双方向のストリーミング通信もサポート」の文が
     やや簡潔すぎて意味が取りづらい
  3. 対応言語をもう少し具体的に挙げると親切
  4. 全体的に文章は読みやすいが、技術用語の説明がもう少し丁寧だと初心者に親切

【改善提案】
  1. gRPCがオープンソースとして広く利用されていることを明示する
  2. ストリーミング通信の種類(クライアント/サーバー/双方向)を
     具体的に説明し、用途例を簡単に示す
  3. C#, Node.js, PHPなどの対応言語も補足する
  4. 専門用語には簡単な注釈を加え、初心者が理解しやすいようにする

【修正後の記事(抜粋)】
### ストリーミング通信

gRPCは以下の3種類のストリーミング通信をサポートしています。

- クライアントストリーミング:クライアントからサーバーへ連続したデータを送信
- サーバーストリーミング:サーバーからクライアントへ連続したデータを送信
- 双方向ストリーミング:クライアントとサーバーが同時にデータを送受信

これにより、リアルタイムのデータ送受信や大容量データの分割送信が可能になります。
--- 校閲結果ここまで ---

============================================================
ワークフロー完了!
============================================================

校閲者エージェントが初稿の曖昧な表現を検出し、より具体的な説明に改善してくれました。特に「ストリーミング通信」のセクションでは、3種類のストリーミングを箇条書きで明確に説明する形に修正されています。


Agent Cardを確認してみる

各エージェントのAgent Cardは、以下のURLで確認できます:

# ライターエージェント
curl http://localhost:10001/.well-known/agent.json | jq

# 校閲者エージェント
curl http://localhost:10002/.well-known/agent.json | jq
{
  "name": "Writer Agent",
  "description": "技術ブログの初稿を執筆するエージェント",
  "url": "http://localhost:10001/",
  "version": "1.0.0",
  "skills": [
    {
      "id": "write_article",
      "name": "ブログ記事ライター",
      "description": "指定されたテーマでブログ記事の初稿を執筆します"
    }
  ],
  "capabilities": {
    "streaming": false
  }
}

これがA2Aの「発見性」です。クライアントはこのAgent Cardを取得することで、エージェントの能力を動的に把握できます。


まとめ

この記事で作ったもの

コンポーネント役割
ライターエージェントテーマを受け取り、ブログ記事の初稿を生成
校閲者エージェント記事を受け取り、問題点・改善案・修正版を返す
オーケストレーター両エージェントを連携させてワークフローを実行

A2Aの実装パターン

  1. エージェントロジック(agent.py): LLM呼び出しなどのビジネスロジック
  2. エグゼキューター(agent_executor.py): A2Aプロトコルとの橋渡し
  3. サーバー(main.py): Agent CardとHTTPサーバーの定義

まとめ

A2Aプロトコルは「エージェントのマイクロサービス化」を実現する有力なアプローチです。MCPでツールを接続し、A2Aでエージェントを連携させる。この組み合わせで、より複雑で柔軟なAIシステムが構築できます。

ぜひ皆さんも試してみてください!


参考リンク

公式ドキュメント

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です