AIエージェントでロボットを制御する【langchain・Streamlit】

挨拶

ども!過去のデモを執筆する元気を奮い立たせている龍ちゃんです。今季は、【生成AI】という大きなキーワードで取り組みを行ってきました。その過程で【Streamlit】【Qumcum】【AIエージェント】の3つを掛け合わせて、ロボットをAIエージェントで操作するデモをいろいろなところで使用してきたので技術的な内容について解説していきます。

システムの概要については、こちらでラフに紹介しています。

システム概要

内容としては、「人間の入力をAIエージェントが解釈してロボット制御」を行います。

エージェントをロボット制御に活用することで、以下のようなメリットが存在します。

  • ユーザの入力がロボットの複合的な動作も判断して処理
  • ユーザのあいまいな表現に関してもロボットの制御として処理

それでは、作成の話に入っていこうと思います。

設計

システムはすべてPython上で動作しています。ロボットは、CRETARIA社が発売しているQumcumを使用しています。こちらもPythonで制御することができるAPIが出ています。

コア

使用しているサービスについては、以下の内容です。

サービス・製品名説明
Qumcum PROBluetoothで接続することで、API経由で遠隔でロボット制御を実現できる
モーターの角度を調整することができれば様々な表現をが可能
Azure OpenAI ServiceAzureが提供しているAIサービス

エージェントの作成には、langchainを使用しています。langchainはバージョンが非常に大切になるので、ライブラリはバージョンを含めて記載しておきます。

langchain==0.2.16
langchain-community==0.2.16
langchain-core==0.2.39
langchain-openai==0.1.23

streamlit==1.38.0

Streamlit上での処理フロー

Streamlit上でデモアプリを管理するために、Qumcumとの接続処理やLangchainを用いたエージェントの処理フローをしっかり構成しました。

流れとしては、大きく分けて二つです。

  1. Streamlit上でQumcumとの接続処理(エージェントページへのアクセス許可)
  2. StreamlitからLangchainを介してAzure OpenAI Serviceへのアクセス

LangchainからQumcumの動作を制御するため、エージェントへアクセスする際にはQumcumとの接続を確約している必要があります。そのため、Streamlit上の制御でQumcumとの接続処理が実行されるまでエージェントのページにはアクセスできないように構成します。

ページ構成としては、トップページとエージェント処理ページの二つ作成します。

ページ名説明
トップページ:/Qumcumとの接続を管理する
エージェントページ:/agentLangchain(エージェント)のアクセスを管理する
Qumcumとの接続が確約されるまでアクセスできない

実装

今回の実装は、ロボットの動作作成からエージェントの実装まで多岐にわたります。また、デモとしてまとめるためにStremlitからアクセスできるように作成しています。分割して解説を書いていきます。

Qumcum処理

ここでは、ロボットとの接続処理と立ち姿リセット処理をまとめています。QumcumとのAPI接続には、公式が提供しているAPIをダウンロードして構成する必要があります。

環境変数としてROBOT_IDを取得して接続を処理しています。

import os
from dotenv import load_dotenv
import qumcum_ble as qumcum

load_dotenv()

def robot_connection():
    ROBOT_ID = os.getenv("ROBOT_ID")
    [err, _] = qumcum.get_info()
    if err == -1:
        qumcum.connect(ROBOT_ID)
        [err, _] = qumcum.get_info()
        if err == -1:
            return False
        else:
            robot_reset()
            return True
    else:
        return True

def robot_disconnect():
    qumcum.motor_power_off()
    qumcum.end()

def robot_reset():
    qumcum.motor_power_on(500)
    qumcum.motor_angle_multi_time(0, 90, 90, 90, 90, 90, 180, 600)
    qumcum.motor_start(True)
    qumcum.motor_power_off()

Qumcumはサーボモータを調整すればいろんな動作を作成することができます。Qumcumをポージングさせるなんてこともできたりします。Qumcumの動作は結構長くなるので「こんな感じで書くよ~」ってイメージのために、気を付けの姿勢制御のみ記載しています。

こちらは、別の機会に「Qumcum動き集」として執筆していきたいと思います。

Langcahin:エージェント処理

ここでは、langchain v0.2でのtool callingの書き方について説明しています。デコレーターとして@toolを付けることでエージェントに受け渡すことができます。

import os
from langchain_openai import AzureChatOpenAI
from langchain.agents import tool
from langchain_core.prompts import MessagesPlaceholder, ChatPromptTemplate
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.memory import ConversationBufferWindowMemory
from dotenv import load_dotenv

load_dotenv(verbose=True)

# Azure OpenAI Serviceの設定
AOAI_ENDPOINT = os.environ.get("AOAI_ENDPOINT")  # Azure OpenAI Serviceのエンドポイント
AOAI_API_VERSION = os.environ.get("AOAI_API_VERSION")  # Azure OpenAI ServiceのAPIバージョン
AOAI_API_KEY = os.environ.get("AOAI_API_KEY")  # Azure OpenAI ServiceのAPIキー
AOAI_CHAT_MODEL_NAME = os.environ.get("AOAI_CHAT_MODEL_NAME")  # Azure OpenAI Serviceのデプロイモデル名

CUSTOM_SYSTEM_PROMPT = """
		あなたは、指定された指示に従って、クムクムを操作するアシスタントです。
		操作するためのツールは、以下の通りです。
		以下のツールを呼び出す回数は、最大20回までです。
			- banzai: クムクムが両腕を上げます。
			- all_up_dance_tool: クムクムが両手足を上げ下げするダンスを踊ります。引数には、踊る回数とスピード(デフォルトで1000)を入れてください。
			- walk_right: クムクムが右に曲がって一歩歩きます。
			- walk_left: クムクムが左に曲がって一歩歩きます。
			- walk_forward: クムクムがまっすぐ前に一歩歩きます。
			- denied: クムクムが拒否動作をします。
			- talk: クムクムが引数に入れた言葉を喋ります。
		上記のどれにも当てはまらない場合は、以下のツールを使ってその旨を伝えてください。
			- impossible_move: クムクムが不可能な動きを指示された時に断るためのツールです。引数にその旨を入れてください。			
	"""

# LLMの初期化
llm = AzureChatOpenAI(
    azure_endpoint=AOAI_ENDPOINT,
    api_key=AOAI_API_KEY,
    api_version=AOAI_API_VERSION,
    openai_api_type="azure",
    azure_deployment=AOAI_CHAT_MODEL_NAME,
)

@tool
def banzai():
    """
    クムクムが両腕を上げるためのツールです。
    """
    # クムクムに両腕を上げる指示を記述
    return "バンザイ"

@tool
def all_up_dance_tool(num, speed):
    """
    クムクムが両手足を上げ下げするダンスを踊るためのツールです。
    """
    # クムクムにダンスを踊る指示を記述
    return "ダンスを踊りました。"

@tool
def walk_right():
    """
    クムクムが右に90度曲がるためのツールです。
    """
    # クムクムの右に曲がる処理を記述
    return "一歩右に曲がりました。"

@tool
def walk_left():
    """
    クムクムが左に90度曲がるためのツールです。
    """
    # クムクムの左に曲がる処理を記述
    return "一歩左に曲がりました。"

@tool
def walk_forward():
    """
    クムクムがまっすぐ前に一歩進むためのツールです。
    """
    # クムクムが一歩進む処理を記述
    return "一歩進みました。"

@tool
def denied():
    """
    クムクムが動作拒否をするツールです。
    """
    # クムクムの拒否動作を記述
    return "首を振りました。"

@tool
def talk(word):
    """
    クムクムに喋らせるためのツールです。
    """
    # クムクムに喋らせる指示を記述
    return "「" + word + "」"

@tool
def impossible_move(message):
    """
    クムクムが不可能な動きを指示された時に断るためのツールです。
    """
    # クムクムの不可能であることを動作または音声で通知
    return message

tools = [
    banzai,
    all_up_dance_tool,
    walk_right,
    walk_left,
    walk_forward,
    denied,
    talk,
    impossible_move,
]

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", CUSTOM_SYSTEM_PROMPT),
        MessagesPlaceholder(variable_name="chat_history"),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

agent = create_tool_calling_agent(llm, tools, prompt)

exucutor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    memory=ConversationBufferWindowMemory(
        return_messages=True, memory_key="chat_history", k=0
    ),
)

all_up_dance_tooltalkimpossible_moveでは、引数を受け取っています。こちらに関しての説明もプロンプト内で行うことでエージェントが判断して代入した状態で実行をしてくれます。

Streamlit:ページ制御

まずはディレクトリ構成について記載しておきます。Streamlitでマルチページを作成する方法は二つあります。今回は状態によって利用制御を行いたいので、プログラムでページを管理する方法を採用しています。

./
├── contents             # Streamlit ページ用ディレクトリ
├── utils                # Python処理をまとめる
├── main.py              # Streamlit起点ページ
├── qumcum_ble.py        # Qumcum APIライブラリ
└── requirements.txt

contentsの内部には、top_page.py(ロボットの接続処理)とrobot_move_agent_page.py(ロボットのエージェント制御)の2つ用意します。main.pyでページとメニューバーの制御を行います。

import streamlit as st

def main():

    # 各Stateの初期化
    if "robot_connection" not in st.session_state:
        st.session_state.robot_connection = False

    # ページの設定
    top_page = st.Page(
        page="contents/top_page.py", 
        title="ロボットとの接続", 
        icon=":material/home:",
        default=True
    )

    robot_move_page = st.Page(
        page="contents/robot_move_agent_page.py",
        title="ロボットを動かす",
        icon=":material/apps:",
    )

    # 接続が成功している場合は全てのページを表示
    if st.session_state.robot_connection:
        pg = st.navigation(
            {
                "INFO": [top_page],
                "AIエージェント": [robot_move_page],
            }
        )

    # 接続が失敗している場合はTopページのみ表示
    else:
        pg = st.navigation(
            {
                "INFO": [top_page],
            }
        )
    pg.run()

if __name__ == "__main__":
    main()

StreamlitのSession Stateを利用して状態の管理を行います。ロボットの接続状態によって表示されるメニューが変更されます。引数としてdefaultを渡すことで、特定のページをトップページにすることができます。

top_page.py:Streamlit・Qumcum接続処理

ここでは、ボタンを押したらロボットとの接続を管理することができるページとなっています。ロボットの接続はsession staterobot_connectionで管理しています。接続に成功するとTrueを代入します。

import streamlit as st
from utils.robot_connection import robot_connection, robot_disconnect
from dotenv import load_dotenv

load_dotenv()

st.title("ロボットとの接続の確立")

if st.session_state.robot_connection:
    st.success("Connected to the robot")
    if st.button("Disconnect"):
        robot_disconnect()
        st.session_state.robot_connection = False
        st.success("Disconnected from the robot")
        st.rerun()
else:
    st.error("ロボットと接続されていません。")
    if st.button("Try to connect"):
        connection = robot_connection()
        if not connection:
            st.error("やはり接続が確立することができません。実機の確認をしてください。")
        else:
            st.success("接続が確立されました。")
            st.session_state.robot_connection = True
        st.rerun()

robot_move_agent_page.py:Streamlit・Langchain接続処理

こちらでは、単純な入力フォームとチャット形式で入出力を表示しています。ユーザーの入力を起点にlangchainのエージェントを実行しています。

# chatbot.py
import streamlit as st
from langchain.schema import HumanMessage, AIMessage
from dotenv import load_dotenv

import utils.aiagent_qumcum_control as agent

load_dotenv()

# ページの設定
st.set_page_config(page_title="ロボットを動かす", page_icon="😆")
st.header("ロボットを動かす")

# チャット履歴の初期化
robot_messages = []

# ユーザーの入力を監視
if user_input := st.chat_input("聞きたいことを入力してね!"):
    robot_messages.append(HumanMessage(content=user_input))
    with st.spinner("GPT is typing ..."):
        response = agent.exucutor.invoke({"input": user_input, "chat_history": []})
        output = response["output"]
    robot_messages.append(AIMessage(content=output))

# チャット履歴の表示
for message in robot_messages:
    if isinstance(message, AIMessage):
        with st.chat_message("assistant"):
            st.markdown(message.content)
    elif isinstance(message, HumanMessage):
        with st.chat_message("user"):
            st.markdown(message.content)

デモ

課題

今回の実装における問題点についてまとめていきます。

  • Streamlitでの動作が同期処理 Streamlitの表現はすべて同期処理になっています。そのため、Langchainの処理が完了するまで結果を取得することができていません。長い処理をエージェントが実行している場合は、事項中は画面がフリーズします。デモとしては気まずい時間が流れますね。
  • デモ画面がチャットだけだと味気ない 現行のデモの場合だと、実行結果のテキスト表示と入力フォームしかありません。口頭で一から説明するのもなかなか大変だと感じています。ここは見せ方をもっと工夫する必要がありそうです。

この問題を解決するために、LangchainとStremlitについてもっと深堀していく必要がありそうです。

終わり

今回は、LangchainとQumcumを組み合わせてAIエージェントでロボットを制御するデモの実装について解説しました。Streamlitを使用することで、ユーザーフレンドリーなインターフェースを実現することができました。

課題として挙げた同期処理の問題や、UIの改善点については今後も継続的に取り組んでいく予定です。このデモを通じて、AIエージェントとロボットの組み合わせの可能性を示すことができたと思います。

今後も新しい技術を積極的に取り入れながら、より良いデモの開発に取り組んでいきたいと思います。最後までお読みいただき、ありがとうございました!

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です