世界一わかりみの深いDapr

こんにちは、サイオステクノロジー武井です。今回は、イケてるOSSであるDaprについて一筆したためました。

Daprとは?

Daprとは、Distributed Application Runtimeの略であり、本当にざっくり一言で言えば、分散アプリケーションサービスを開発する際のインフラレイヤーを抽象化するためのものです。

ここは説明が非常に難しいので、この章では、ざっくりとした理解で構いません。順を追ってDaprの本質に迫っていきたいと思います。

ここでDaprを理解するために一つの例を上げてみましょう。

例えば、とあるアプリケーションで、ユーザーの情報を保存するために、データベースを使用しているとします。当然従来のアプリケーションでは、当然MySQLにはMySQL用の実装を、PostgreSQLにはPostgreSQL用の実装をする必要があります。これをDaprを使用することで、データベースの種類に関係なく、同じコードでデータベースにアクセスすることができます。つまり、Daprは、データベースの種類を抽象化してくれるのです。

他にも、Daprは、Publish and Subscribe、State Management、Bindingsなど分散アプリケーションを開発する際に必要な機能を提供しているのですが、これを実現するアーキテクチャとして最も重要なのが、Daprのサイドカーです。

このサイドカーの仕組みをわかりやすく伝えるために、データベースにアクセスする際の例を用いて、「Daprを使わない場合」と「Daprを使う場合」の違いを見てみましょう。

Daprを使わない場合

従来のアプリケーションでは、データベースにアクセスするためのコードは、アプリケーションの中に直接書かれています。例えば、MySQLにアクセスするためのコードは、MySQL用のライブラリを使用して書かれています。その構成は以下の通りとなります。

例えば、ここでMySQLをPostgreSQLに変更したい場合、MySQL用のコードを削除して、PostgreSQL用のコードを書き直す必要があります。書き直した後の構成は以下の通りとなります。

先ほどとは違い、アプリ内の「MySQL用のコード」が「PostgreSQL用のコード」に変わっていますね。このように、Daprを使わない場合、データベースの種類を変更するたびに、アプリケーションのコードを変更する必要があります。これが、Daprを使う場合と比べて、非常に面倒であることがわかります。

Daprを使う場合

Daprを使う場合、アプリケーションのコードは、DaprのAPIを使用してデータベースにアクセスします。Daprは、データベースの種類を抽象化してくれるため、アプリケーションのコードは、データベースの種類に関係なく同じコードでアクセスすることができます。

MySQLにアクセスするためのDaprを使う場合の構成は以下の通りとなります。

「Daprを使わない場合」と大きく異なるのは、サイドカーが存在していることです。アプリケーションは、DaprのAPIを使用してサイドカーにアクセスし、サイドカーがデータベースにアクセスするという構成になっています。この構成のメリットは、データベースの種類を変更する際に、アプリケーションのコードを変更する必要がないことです。例えば、MySQLからPostgreSQLに変更したい場合、サイドカーの設定を変更するだけで、アプリケーションのコードはそのままで済みます。変更後の構成は以下の通りとなります。

アプリ側は全く変更してないのがわかりますでしょうか?変更したのは、サイドカーの設定だけです。この設定では、サイドカーにてPostgreSQLにアクセスするように変更しています。

このように、Daprを使う場合、データベースの種類を変更する際に、アプリケーションのコードを変更する必要がないため、非常に便利であることがわかります。

一方で、このサイドカーが対応していないデータベースを使用したい場合は、サイドカーの設定を変更するだけでは対応できないため、アプリケーションのコードを変更する必要があります。

この例では、データベースへのアクセスを例に挙げましたが、Daprは、Publish and Subscribe、State Management、Bindingsなど、分散アプリケーションを開発する際に必要な機能を提供しているため、これらの機能も同様に抽象化されていることがわかります。つまり、Daprを使うことで、分散アプリケーションの開発が非常に楽になるということです。

サイドカーの実行形態について

Daprのサイドカーの実行形態は大きく分けると、以下の2つがあります。

  • サイドカーコンテナ
  • サイドカープロセス

サイドカーコンテナ

サイドカーコンテナは、Daprのサイドカーがコンテナとして実行される形態です。これは、Kubernetesなどのコンテナオーケストレーションツールを使用している場合に一般的に使用されます。Daprのサイドカーは、アプリケーションと同じPod内のコンテナとして実行されます。アプリケーションとDaprはlocalhostで通信するため、低レイテンシで連携できます。図中の「Scheduler」「Placement」については、Daprのコントロールプレーンコンポーネントであり、サイドカーコンテナとは別に実行されます。本記事では、サイドカーコンテナの説明に焦点を当てるため、これらのコントロールプレーンコンポーネントに関する説明は省略しています。

サイドカープロセス

Daprのサイドカーは、VMやローカル環境ではアプリケーションと同じホスト上の別プロセスとして実行されます。アプリケーションは、HTTPまたはgRPCを通じてローカルのdaprdプロセスに接続します。この形態は、Kubernetes以外の環境でDaprを利用する場合によく使われる実行方法です。

以下の図は、開発環境でDaprを利用する際のサイドカープロセスの構成例です。dapr CLIというDapr専用のコマンドラインツールを使用すると、アプリケーションとサイドカープロセスを同時に起動できます。

RedisとZipkinは、Daprのサイドカーが利用する外部コンポーネントの例です。Redisは、DaprのState ManagementやPublish and Subscribeなどの機能でよく利用されるデータストアです。またZipkinは、Daprの分散トレーシング機能で使用されるトレーシングシステムです。これらのコンポーネントは、dapr CLIによって起動されます。

本記事で紹介するサンプルアプリケーションは、ここで説明したように、dapr CLIによって起動されるサイドカープロセスの構成で動作することを前提としています。ただし、実行形態が異なる場合でも基本的な考え方は同じであり、他の実行形態にも応用できます。そのため、本記事ではサイドカープロセスの形態を前提として説明していきます。

Daprを使うための環境準備

先程説明しましたように、Daprを使うための環境は様々なです。例えば、Kubernetes環境でDaprを利用する場合は、Kubernetesクラスターを用意し、Daprをインストールする必要があります。一方で、ローカル環境でDaprを利用する場合は、dapr CLIをインストールするだけで済みます。

本記事の中で説明するサンプルアプリケーションは、ローカル環境でDaprを利用することを前提としています。そのため、dapr CLIをインストールする必要があります。dapr CLIとは、Daprをローカル環境で利用するためのコマンドラインツールです。dapr CLIを使用することで、Daprのサイドカープロセスを簡単に起動したり、Daprのコンポーネントを管理したりすることができます。

そのインストール手順は以下のURLに記載されていますので、それを見てさっくりとインストールしてみてください。

https://docs.dapr.io/getting-started/install-dapr-cli/

そのインストール方法はめっちゃ簡単です。一例としてLinuxにDapr CLIをインストールする手順は以下の通りです。

$ wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash

そして、次に、初期化が必要になります。これは、ローカル環境でDaprを使うための様々なコンポーネントを起動するためのコマンドです。初期化のコマンドは以下の通りです。

$ dapr init

このコマンドにより、Daprのサイドカープロセスであるdaprdがインストールされ、そしてDaprを動作させるために必要な基本コンポーネントであるRedisやZipkinなども起動されます。試しに以下のコマンドを実行してみてください。

$ docker ps
CONTAINER ID   IMAGE                COMMAND                  CREATED       STATUS                 PORTS                                                                                                                                                                                        NAMES
6822763b7ba6   redis:6              "docker-entrypoint.s…"   2 weeks ago   Up 13 days             0.0.0.0:6379->6379/tcp, [::]:6379->6379/tcp                                                                                                                                                  dapr_redis
d5bf3b79e44d   daprio/dapr:1.17.0   "./scheduler --etcd-…"   2 weeks ago   Up 13 days             0.0.0.0:2379->2379/tcp, [::]:2379->2379/tcp, 0.0.0.0:50006->50006/tcp, [::]:50006->50006/tcp, 0.0.0.0:58081->8080/tcp, [::]:58081->8080/tcp, 0.0.0.0:59091->9090/tcp, [::]:59091->9090/tcp   dapr_scheduler
c79f84b94e2f   daprio/dapr:1.17.0   "./placement"            2 weeks ago   Up 13 days             0.0.0.0:50005->50005/tcp, [::]:50005->50005/tcp, 0.0.0.0:58080->8080/tcp, [::]:58080->8080/tcp, 0.0.0.0:59090->9090/tcp, [::]:59090->9090/tcp                                                dapr_placement
dbf0fab9540b   openzipkin/zipkin    "start-zipkin"           2 weeks ago   Up 13 days (healthy)   0.0.0.0:9411->9411/tcp, [::]:9411->9411/tcp                                                                                                                                                  dapr_zipkin

様々なコンテナが起動していることがわかりますね。これらのコンテナは、DaprのコントロールプレーンコンポーネントであるSchedulerやPlacement、そしてDaprの分散トレーシング機能で使用されるZipkinなどです。それぞれのコンテナの説明は割愛します。ここでは、

これで、ローカル環境でDaprを利用するための環境が整いました。

Daprの様々なコンポーネント

Daprにはさまざまなコンポーネントが用意されています。これらのコンポーネントは、Daprの機能を実現するためのものであり、アプリケーションはDaprを介してそれらを利用できます。

例えば、Daprには状態を保存するための State Management コンポーネントがあります。このコンポーネントを利用するには、設定ファイルで対象のコンポーネントを定義します。すると、Daprサイドカー(daprd)がそのコンポーネントを読み込み、対応するインフラへ接続できるようになります。

その結果、アプリケーションがサイドカーに対して状態を保存するAPIを呼び出すと、サイドカーはState Managementコンポーネントを通じて、RedisやAzure Cosmos DBなどのバックエンドストレージに状態を保存できるようになります。

本章では、Daprの代表的なコンポーネントであるState Management、Publish and Subscribe、Service Invocation、Bindingsについて、簡単に説明していきます。これらのコンポーネントは、Daprを使用する際に非常に重要な役割を果たすため、理解しておくことが重要です。

そして、以降では、これらのコンポーネントを実際に利用するためのサンプルコードも紹介しながら、Daprの機能を説明していきます。サンプルコードはGitHubのリポジトリに公開していますので、ぜひ参考にしてみてください。

https://github.com/noriyukitakei/dapr-sample

State Management

まずは、State Managementコンポーネントについて説明します。State Managementコンポーネントは、Key/Value形式の状態を保存するためのコンポーネントです。Daprは、State Managementコンポーネントを通じて、RedisやAzure Cosmos DBなどのバックエンドストレージに状態を保存できるようになります。

Key/Value形式のデータの代表例で言えば、ユーザーのセッション情報や、IoTデバイスの状態(例: 温度センサーの最新の温度値)などが挙げられます。これらのデータは、アプリケーションの状態を管理するために頻繁に使用されます。

システム構成

State Managementコンポーネントを利用する際のシステム構成は以下の通りとなります。

サイドカーは、Redis用の設定ファイルを読み込むことで、Redisに接続できるようになります。アプリケーションは、DaprのAPIを使用してサイドカーに状態を保存するリクエストを送ります。サイドカーは、そのリクエストを受け取ると、State Managementコンポーネントを通じて、Redisに状態を保存します。

ファイル構成

State Managementコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ(https://github.com/noriyukitakei/dapr-sample)の中の、State Managementディレクトリの中に配置されています。

StateManagement
├── app.py
├── Infrastructure
│   └── components
│       ├── redis
│       │   └── statestore_redis.yaml
│       └── sqlite
│           └── statestore_sqlite.yaml
├── README.md
└── requirements.txt

ではこれらのファイルを一つずつ紐解くことで、State Managementコンポーネントを利用するための構成を理解していきましょう。

ソースコードの説明

■ Infrastructure/components/redis/statestore_redis.yaml
Infrastructure/components/redis/statestore_redis.yamlは、DaprのState ManagementコンポーネントをRedisに接続するための設定ファイルです。このファイルには、DaprがRedisに接続するための情報が記載されています。

apiVersion: dapr.io/v1alpha1  # Daprのリソース定義のバージョン
kind: Component              # 設定ファイルの種別
metadata:
  name: statestore           # アプリから参照するコンポーネント名
spec:
  type: state.redis          # コンポーネントの種別
  version: v1                # コンポーネントのバージョン
  metadata:
    - name: redisHost
      value: localhost:6379  # Redisの接続先
    - name: redisPassword
      value: ""              # パスワード(未設定)

このファイルは一言で言うと、「DaprのState ManagementでRedisを使う」ことを定義しているものです。アプリケーションはRedisを直接操作するのではなく、DaprのAPIを通じて状態を保存します。

例えばアプリケーションから次のように呼び出すと、

client.save_state("statestore", "user1", data)

Daprは内部的に先程のコンポーネント定義を参照して、Redisに接続し、状態を保存します。つまり以下のような流れです。

  1. アプリケーションがDaprのAPIを呼び出す。そのとき、コンポーネント名として「statestore」を指定する。
  2. Daprがコンポーネント定義の中から、metadata.nameが「statestore」であるコンポーネント定義を探す。
  3. Daprがコンポーネント定義を参照して、Redisに接続するための情報(ここでは接続先がlocalhost:6379、パスワードが空)を取得する。
  4. Daprが取得した情報を基に、Redisに接続する。
  5. Daprがアプリケーションから受け取った状態をRedisに保存する。
  6. Daprが保存の結果をアプリケーションに返す。

■ app.py
app.pyは、DaprのState Managementコンポーネントを利用するためのアプリケーションコードです。このコードは、DaprのPython SDKを使用して、状態を保存するためのAPIを呼び出しています。

では、コードの内容を見てみましょう。

for i in range(1, 10):
    json_data = [{"key": str(i), "value": {"orderId": i}}]
    requests.post("http://localhost:3611/v1.0/state/statestore", json=json_data)
    time.sleep(1)

このコードは、1から9までの数字をキーとし、その値としてorderIdを持つJSONオブジェクト(以下参照)を作成し、DaprのState Management APIにPOSTリクエストを送っています。

[
  {
    "key": "1",
    "value": {
      "orderId": 1
    }
  },
  {
    "key": "2",
    "value": {
      "orderId": 2
    }
  },
  ⋯以下略⋯
]

リクエストのURLには、先程のコンポーネント定義で指定した「statestore」が含まれています。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisに状態を保存します。

リクエストのURLの構成は以下の通りです。

  • http://localhost:3611: Daprサイドカーのエンドポイント
    ※ このポート番号については、Dapr CLIを使用してサイドカープロセスを起動する際に指定したポート番号になります。後ほど説明しますが、Dapr CLIを使用してサイドカープロセスを起動する際に、–app-portオプションでアプリケーションのポート番号を指定することができます。例えば、アプリケーションのポート番号を3611に指定した場合、Daprサイドカーはlocalhost:3611で待ち受けるようになります。
  • /v1.0: Dapr APIのバージョン
  • /state: State Management APIを呼び出すことを示すパス
  • /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス
    result = requests.get(f"http://localhost:3611/v1.0/state/statestore/{i}")
    print(result.json())
    time.sleep(1)

このコードは、先程保存した状態をDaprのState Management APIにGETリクエストを送って取得しています。リクエストのURLには、先程のコンポーネント定義で指定した「statestore」が含まれていることに加えて、最後に/{i}が追加されています。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisからキーが{i}である状態を取得します。リクエストのURLの構成は以下の通りです。

  • http://localhost:3611: Daprサイドカーのエンドポイント
  • /v1.0: Dapr APIのバージョン
  • /state: State Management APIを呼び出すことを示すパス
  • /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス
  • /{i}: 取得したい状態のキーを指定するためのパス
    requests.delete(f"http://localhost:3611/v1.0/state/statestore/{i}")
    time.sleep(1)

このコードは、先程保存した状態をDaprのState Management APIにDELETEリクエストを送って削除しています。リクエストのURLの構成は、先程のGETリクエストと同様であり、最後に/{i}が追加されていることがわかります。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisからキーが{i}である状態を削除します。リクエストのURLの構成は以下の通りです。

  • http://localhost:3611: Daprサイドカーのエンドポイント
  • /v1.0: Dapr APIのバージョン
  • /state: State Management APIを呼び出すことを示すパス
  • /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス
  • /{i}: 削除したい状態のキーを指定するためのパス

実行方法

では次に、State Managementコンポーネントを利用するためのアプリケーションコードを実行してみましょう。実行する前に、Dapr CLIを使用してサイドカープロセスを起動する必要があります。ここではその手順を説明します。

$ git clone https://github.com/noriyukitakei/dapr-sample
$ cd dapr-sample/StateManegement

まずは、先程紹介したGitHubリポジトリからサンプルコードをクローンして、StateManagementディレクトリに移動します。

$ pip install -r requirements.txt

次に、Pythonの依存関係をインストールします。requirements.txtには、このアプリケーションコードを実行するために必要なPythonパッケージが記載されています。

ちなみにRedisについては、dapr initコマンドを実行した際に、DaprのState ManagementやPublish and Subscribeなどの機能でよく利用されるデータストアであるRedisも起動されます。ですので、特にRedisを起動するためのコマンドを実行する必要はありません。

$ dapr run --app-id statemanagement --dapr-http-port 3611 --components-path Infrastructure/components/redis -- python app.py

dapr runコマンドを使用して、アプリケーションコードを実行します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「statemanagement」というIDを指定しています。このIDは、Daprサイドカーがアプリケーションを識別するために使用されます。State Managementでは利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。

–dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3611を指定しています。これにより、Daprサイドカーはlocalhost:3611で待ち受けるようになります。

–components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/components/redisを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。

図解すると以下のような対応関係になります。

このコマンドを実行すると、Daprサイドカーが起動し、そしてアプリケーションコードも実行されます。アプリケーションコードは、DaprのState Management APIを呼び出して、状態を保存、取得、削除するリクエストを送ります。Daprサイドカーは、そのリクエストを受け取ると、State Managementコンポーネントを通じて、Redisに状態を保存、取得、削除します。

保存先をSQLiteに変更してみる

では、Daprのメリットを体感して頂くために、状態の保存先をRedisからSQLiteに変更してみましょう。Daprを使わない場合は、アプリケーションコードを変更する必要がありますが、Daprを使う場合は、コンポーネント定義ファイルを変更するだけで済みます。

つまりこんな感じです。

変更後のコンポーネント定義ファイルは以下の通りとなります。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.sqlite
  version: v1
  metadata:
    - name: connectionString
      value: "sqlite/data.db"

まず、Redis用のコンポーネント定義ファイルと比較すると、typeがstate.sqliteに変わっています。これにより、DaprはState ManagementコンポーネントとしてSQLiteを使用するようになります。そして、spec.metadataの内容も変わっています。SQLiteに接続するための情報を記載する必要があるため、connectionStringという名前のメタデータを追加しています。これにより、DaprはSQLiteに接続するための情報を取得できるようになります。

このファイルは、Infrastructure/components/sqlite/statestore_sqlite.yamlに配置されています。

先程のdapr runコマンドの–components-pathオプションで、このファイルが配置されているディレクトリを指定することで、Daprサイドカーはこのファイルを読み込むようになります。

$ dapr run --app-id statemanagement --dapr-http-port 3611 --components-path Infrastructure/components/sqlite -- python app.py

いかがでしょうか?アプリケーションコードを一切変更することなく、状態の保存先をRedisからSQLiteに変更することができましたね。これがDaprのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。

Publish and Subscribe

次は、Publish and Subscribeコンポーネントについて説明します。DaprのPublish and Subscribeコンポーネントについて説明する前に、まずはPublish and Subscribeの概念について簡単に説明します。

例えば、あるアプリケーションで、ユーザーが新しい注文を作成したとします。そのとき、注文が作成されたことを他のサービスに通知したい場合があります。例えば、在庫管理サービスや配送サービスなどです。このような場合に、Publish and Subscribeの仕組みが役立ちます。

Publish and Subscribeを使わない場合で考えてみましょう。ユーザーが新しい注文を作成したとき、注文サービスは在庫管理サービスや配送サービスに対して、HTTPリクエストを送って通知します。注文サービスは、配送サービス、在庫管理サービスの両方から正常にレスポンスが返ってきたら、注文が正常に処理されたと判断します。

もし、ここで配送サービスがダウンしている場合、注文サービスは配送サービスに通知することができません。そのため、注文サービスは失敗して、ユーザーは注文を作成することができません。

そこでPublish and Subscribeの仕組みを使うと、注文サービスは、注文が作成されたことを「メッセージブローカー」に対して通知します。そして、配送サービスや在庫管理サービスは、そのメッセージブローカーから注文が作成されたことを受け取ります。これにより、注文サービスは配送サービスや在庫管理サービスの状態に関係なく、注文が作成されたことを通知することができます。そして、配送サービスが仮にダウンしていたとしても、注文サービスは注文が作成されたことをメッセージブローカーに通知することができるため、ユーザーは注文を作成することができ、かつ在庫管理サービスは注文が作成されたことを受け取ることができます。配送サービスが復旧したときに、配送サービスも注文が作成されたことを受け取ることができます。

このように、Publish and Subscribeの仕組みを使うことで、サービス間の疎結合な連携が可能になります。DaprのPublish and Subscribeコンポーネントは、このようなPublish and Subscribeの仕組みを提供するためのコンポーネントです。Daprを使用することで、アプリケーションは、DaprのAPIを通じて、メッセージブローカーに対してメッセージを公開したり、メッセージブローカーからメッセージを受け取ったりすることができます。

システム構成

DaprでPublish and Subscribeコンポーネントを利用する際のシステム構成は以下の通りとなります。メッセージブローカーとしてRedisを使用する場合の構成例を示しています。

① まず、Publisherが、DaprのHTTP/gRPC APIを通じて、サイドカーに対してメッセージを公開するリクエストを送ります。
② サイドカーは、そのリクエストを受け取ると、Publish and Subscribeコンポーネントを通じて、Redisなどのメッセージブローカーに対して、メッセージを送信します。
③ サイドカーは、Redisに接続し、Redisからメッセージを受け取ります。
④ サイドカーは、そのリクエストを受け取ると、事前にコンポーネントで指定したSubscriberのエンドポイントに対して、メッセージを送信します。

もちろん、PublisherとSubscriberは、メッセージを格納するためのデータストアに何を使っているのかを知る必要はありません。RedisだろうがRabbitMQだろうが、Publisherは、DaprのAPIを通じてサイドカーに対してメッセージを公開するだけで済みますし、Subscriberは、DaprのAPIを通じてサイドカーからメッセージを受け取るだけで済みます。これが、DaprのPublish and Subscribeコンポーネントのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。

ファイル構成

Publish and Subscribeコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ(https://github.com/noriyukitakei/dapr-sample)の中の、PubSubディレクトリの中に配置されています。

PubSub
├── Infrastructure
│   └── components
│       └── pubsub.yaml
├── publisher
│   ├── app.py
│   └── requirements.txt
└── subscriber
    ├── app.py
    └── requirements.txt

ソースコードの説明

■ Infrastructure/components/pubsub.yaml
Infrastructure/components/pubsub.yamlは、DaprのPublish and SubscribeコンポーネントをRedisに接続するための設定ファイルです。このファイルには、DaprがRedisに接続するための情報が記載されています。

apiVersion: dapr.io/v1alpha1 # Daprのリソース定義のバージョン
kind: Component             # 設定ファイルの種別
metadata:
  name: orderpubsub         # アプリから参照するコンポーネント名
spec:
  type: pubsub.redis         # コンポーネントの種別
  version: v1               # コンポーネントのバージョン
  metadata:
    - name: redisHost         # Redisのホスト名とポート
      value: localhost:6379
    - name: redisPassword     # Redisのパスワード
      value: ""

StateManagementと同じところは説明を省略します。ここで新たに説明する必要があるのは、spec.typeがpubsub.redisになっていることです。これにより、DaprはPublish and SubscribeコンポーネントとしてRedisを使用するようになります。

順番が前後しますが、metadata.nameが「orderpubsub」であることも重要です。これにより、アプリケーションは「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照して、メッセージを公開したり、メッセージを受け取ったりすることができます。

■ publisher/app.py
publisher/app.pyは、DaprのPublish and Subscribeコンポーネントを利用してメッセージを公開するためのアプリケーションコードです。このコードは、

 トピックにメッセージを発行する
for i in range(1, 10):
    order = {"orderId": i}
    requests.post("http://localhost:3613/v1.0/publish/orderpubsub/orders", json=order)
    logging.info("送信データ: " + json.dumps(order))
    time.sleep(1)

このコードは、1から9までの数字をorderIdとするJSONオブジェクト(以下参照)を作成し、DaprのPublish and Subscribe APIにPOSTリクエストを送っています。

{
  "orderId": 1
}

リクエストのURLには、先程のコンポーネント定義で指定した「orderpubsub」が含まれています。これにより、Daprは「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照して、Redisにメッセージを公開します。リクエストのURLの構成は以下の通りです。

  • http://localhost:3613: Daprサイドカーのエンドポイント
  • /v1.0: Dapr APIのバージョン
  • /publish: Publish and Subscribe APIを呼び出すことを示すパス
  • /orderpubsub: 先程のコンポーネント定義で指定した「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照するためのパス
  • /orders: メッセージのトピックを指定するためのパス

トピックとは、メッセージを分類するための名前のことです。Publisherは、メッセージを公開するときに、どのトピックに公開するかを指定します。そして、Subscriberは、どのトピックからメッセージを受け取るかを指定します。これにより、PublisherとSubscriberは、特定のトピックに対してメッセージを公開したり、受け取ったりすることができます。

■ subscriber/app.py
subscriber/app.pyは、DaprのPublish and Subscribeコンポーネントを利用してメッセージを受け取るためのアプリケーションコードです。

ソースコードの説明に入る前に、Subscriberがメッセージを受信するまでの流れを説明します。

  1. まず、Subscriberのサイドカーは、/dapr/subscribeエンドポイントを通じて、Subscriberがどのトピックからメッセージを受け取るかをDaprに通知します。
  2. Subscriberのサイドカーは、Redisからメッセージを受け取ると、Subscriberのエンドポイントである「/トピック名」(この例では/orders)に対して、メッセージを送信します。

では、上記を踏まえてソースコードの主要な部分を説明します。

from flask import Flask, request, jsonify
import json

app = Flask(__name__)

最初にFlaskをインポートして、Flaskアプリケーションのインスタンスを作成しています。Flaskは、PythonでWebアプリケーションを作成するためのフレームワークです。

先ほど説明したように、Subscriberのサイドカーは、Subscriberの特定のHTTPエンドポイントに対してアクセスしてくるので、SubscriberはHTTPサーバーを立てる必要があります。Flaskは、そのHTTPサーバーを簡単に立てることができるため、ここではFlaskを使用しています。Flask以外のHTTPサーバーフレームワークを使用しても問題ありません。

@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
    subscriptions = [
        {"pubsubname": "orderpubsub", "topic": "orders", "route": "orders"}
    ]
    return jsonify(subscriptions)

このコードは、Subscriberのサイドカーが/dapr/subscribeエンドポイントにアクセスしたときに呼び出される関数を定義しています。この関数は、Subscriberがどのトピックからメッセージを受け取るかをDaprに通知するためのものです。

つまり、この関数は、Daprに対して、Subscriberが「orderpubsub」という名前のPublish and Subscribeコンポーネントの「orders」というトピックからメッセージを受け取ることを通知しています。そして、Daprは、Subscriberのサイドカーが「orders」というエンドポイントに対して、メッセージを送信するようになります。

@app.route("/orders", methods=["POST"])
def orders_subscriber():
    event_orderid = request.json["data"]["orderId"]
    print("受信データ: " + json.dumps(event_orderid), flush=True)
    return json.dumps({"success": True}), 200, {"ContentType": "application/json"}

このコードは、Subscriber側のサイドカーから「/orders」エンドポイントにリクエストが送られたときに実行される関数を定義しています。

このエンドポイントは、/dapr/subscribe エンドポイントでDaprに登録した「orders」トピックに対応しており、そのトピックにメッセージが発行されると、Daprのサイドカーによってこの関数が呼び出されます。

つまり、この関数は「orders」トピックに発行されたメッセージを受信し、処理するためのエンドポイントとして動作します。

この関数の中では、リクエストのJSONボディからorderIdを抽出し、それをコンソールに出力しています。そして、最後に、HTTPレスポンスとして、成功を示すJSONオブジェクトを返しています。

app.run(port=6104)

このコードは、Flaskアプリケーションをポート6104で起動しています。これにより、Subscriberはポート番号6104でHTTPリクエストを受け付けるようになります。Publisherがメッセージを公開すると、Daprのサイドカーはこのポートに対してリクエストを送るため、Subscriberはこのポートでリクエストを受け取る必要があります。

実行方法

では次に、Publish and Subscribeコンポーネントを利用するためのアプリケーションコードを実行してみましょう。

まずは必要なライブラリをインストールします。

$ pip install -r publisher/requirements.txt
$ pip install -r subscriber/requirements.txt

次に、Dapr CLIを使用してSubscriber及びSubscriberのサイドカープロセスを起動します。

$ dapr run --app-id pubsub --app-port 6104 --dapr-http-port 3614 --components-path Infrastructure/components -- python subscriber/app.py

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「pubsub」というIDを指定しています。このIDは、Publish and Subscribeでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。

–app-portオプションでSubscriberのアプリケーションのポート番号を指定します。ここでは6104を指定しています。これにより、Subscriberはlocalhost:6104で待ち受けるようになります。

–dapr-http-portオプションでSubscriberのDaprサイドカーのHTTPポートを指定します。ここでは3614を指定しています。これにより、Daprサイドカーはlocalhost:3614で待ち受けるようになります。

–components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。

— python subscriber/app.pyは、SubscriberのDaprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、subscriber/app.pyを実行するように指定しています。つまり、Subscriberのアプリケーションを実行するように指定しています。

次に、Publisher及びPublisherのサイドカープロセスを起動します。

$ dapr run --app-id pubsub --dapr-http-port 3613 --components-path Infrastructure/components -- python publisher/app.py

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「pubsub」というIDを指定しています。このIDは、Publish and Subscribeでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。

–dapr-http-portオプションでPublisherのDaprサイドカーのHTTPポートを指定します。ここでは3613を指定しています。これにより、Daprサイドカーはlocalhost:3613で待ち受けるようになります。

–components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。

— python publisher/app.pyは、PublisherのDaprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、publisher/app.pyを実行するように指定しています。つまり、Publisherのアプリケーションを実行するように指定しています。

このコマンドを実行すると、Publisherがメッセージを公開し、Subscriberがそのメッセージを受信する様子を確認することができます。Subscriberのコンソールには、Publisherが公開したメッセージが以下のように表示されるはずです。

== APP == 受信データ: 1
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 2
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 3
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 4
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 5
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 6
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 7
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 8
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -
== APP == 受信データ: 9
== APP == 127.0.0.1 - - [28/Mar/2026 01:52:45] "POST /orders HTTP/1.1" 200 -

Bindings

Bindingsコンポーネントとは、アプリに入ってくるデータと、アプリから出ていくデータを処理するためのコンポーネントです。

システム構成

例えば、RabbitMQにデータが入ってきたときに、そのデータを処理して、処理した結果をPostgreSQLに保存したいとします。このような場合に、Bindingsコンポーネントが役立ちます。

前のコンポーネントと同じように、Bindingsコンポーネントも、アプリ側はRabbtiMQやPostgreSQLのことを知らなくても、DaprのAPIを通じて、RabbitMQからデータを受け取ったり、PostgreSQLにデータを保存したりすることができます。これも、Daprのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。

① Publisher(キューにメッセージを登録する側)が、RabbitMQに対して、メッセージを登録します。
② RabbitMQは、サイドカーにメッセージを登録します。
③ サイドカーは、RabbitMQからメッセージを受け取ると、アプリが待ち受けている特定のHTTPエンドポイントに対して、メッセージを送信します。
④ アプリは、そのHTTPエンドポイントでリクエストを受け取ると、処理を実行します。処理が完了したら、アプリは、DaprのAPIを通じて、サイドカーに対して、処理した結果を保存するリクエストを送ります。
⑤ サイドカーは、コンポーネント定義ファイルを読み取り、PostgreSQLに対して、処理した結果を保存するリクエストを送ります。

サイドカーにデータが入っていくほうをInput Binding、サイドカーからデータが出ていくほうをOutput Bindingと呼びます。

Input Bindingは、ここで紹介したRabbitMQを始め、Azure Event HubsやAWS Kinesisなどのメッセージングサービス、HTTPやgRPCなどのプロトコル、ファイルシステムやFTPなどのストレージサービスなど、様々なものが用意されています。Output Bindingも、ここで紹介したPostgreSQLを始め、Azure Cosmos DBやAWS DynamoDBなどのデータベースサービス、HTTPやgRPCなどのプロトコル、ファイルシステムやFTPなどのストレージサービスなど、様々なものが用意されています。詳細は以下の公式ドキュメントを参照してください。

https://docs.dapr.io/reference/components-reference/supported-bindings/

ファイル構成

Bindingsコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ(https://github.com/noriyukitakei/dapr-sample)の中の、Bindingsディレクトリの中に配置されています。

Bindings
├── app.py
├── Infrastructure
│   ├── components
│   │   ├── binding-mq.yaml
│   │   └── binding-sqldb.yaml
│   └── db
│       ├── docker-compose.yml
│       ├── Dockerfile
│       └── temperatures.sql
├── publish_temperatures.py
└── requirements.txt

ソースコードの説明

■ Infrastructure/components/binding-mq.yaml

apiVersion: dapr.io/v1alpha1 # Daprのリソース定義のバージョン
kind: Component            # 設定ファイルの種別
metadata:
  name: mq                # アプリから参照するコンポーネント名
spec:
  type: bindings.rabbitmq # コンポーネントの種別
  metadata:
    - name: host # RabbitMQのホスト名とポート
      value: "amqp://guest:guest@localhost:5672"
    - name: queueName # RabbitMQのキュー名
      value: "dapr-queue"
    - name: direction # データの流れを指定するためのメタデータ
      value: "input"

他のコンポーネントと同じところは説明を省略します。ここで新たに説明する必要があるのは、spec.typeがbindings.rabbitmqになっていることです。これにより、DaprはBindingsコンポーネントとしてRabbitMQを使用するようになります。

順番が前後しますが、metadata.nameが「mq」であることも重要です。これにより、アプリケーションは「mq」という名前のBindingsコンポーネントを参照して、メッセージを受信したり、メッセージを送信したりすることができます。

spec.metadataは、RabbitMQに接続するための情報を記載しています。RabbitMQのホスト名とポートを指定するためのhost、RabbitMQのキュー名を指定するためのqueueName、データの流れを指定するためのdirectionという3つのメタデータを定義しています。directionは、データの流れを指定するためのメタデータであり、inputを指定すると、サイドカーはRabbitMQからメッセージを受信するためのBindingsコンポーネントとして動作します。

■ Infrastructure/components/binding-sqldb.yaml

apiVersion: dapr.io/v1alpha1 # Daprのリソース定義のバージョン
kind: Component           # 設定ファイルの種別
metadata:
  name: sqldb             # アプリから参照するコンポーネント名
spec:
  type: bindings.postgres # コンポーネントの種別
  version: v1
  metadata:
    - name: url # PostgreSQLの接続情報
      value: "user=postgres password=docker host=localhost port=5432 dbname=temperatures"
    - name: direction # データの流れを指定するためのメタデータ
      value: "output"

このファイルは、DaprのBindingsコンポーネントをPostgreSQLに接続するための設定ファイルです。このファイルには、DaprがPostgreSQLに接続するための情報が記載されています。

spec.typeがbindings.postgresになっていることにより、DaprはBindingsコンポーネントとしてPostgreSQLを使用するようになります。

metadata.nameが「sqldb」であることも重要です。これにより、アプリケーションは「sqldb」という名前のBindingsコンポーネントを参照して、メッセージを受信したり、メッセージを送信したりすることができます。

spec.metadataは、PostgreSQLに接続するための情報を記載しています。PostgreSQLの接続情報を指定するためのurl、データの流れを指定するためのdirectionという2つのメタデータを定義しています。directionは、データの流れを指定するためのメタデータであり、outputを指定すると、サイドカーはPostgreSQLにメッセージを送信するためのBindingsコンポーネントとして動作します。

■ db/docker-compose.yml
このファイルは、PostgreSQLをDockerコンテナで起動するためのdocker-composeファイルです。このファイルを使用して、PostgreSQLを簡単に起動することができます。

■ db/Dockerfile
このファイルは、PostgreSQLのDockerイメージを作成するためのDockerfileです。このファイルを使用して、PostgreSQLのDockerイメージを作成することができます。

FROM postgres
COPY temperatures.sql /docker-entrypoint-initdb.d/

このDockerfileは、公式のPostgreSQLイメージをベースにしています。そして、temperatures.sqlというSQLファイルを、PostgreSQLの初期化スクリプトが配置されるディレクトリである/docker-entrypoint-initdb.d/にコピーしています。これにより、PostgreSQLが起動するときに、このSQLファイルが実行されて、temperaturesテーブルが作成されます。

■ db/temperatures.sql
このファイルは、PostgreSQLの初期化スクリプトです。このファイルには、PostgreSQLが起動するときに実行されるSQL文が記載されています。ここでは、temperaturesテーブルを作成するSQL文が記載されています

\c temperatures;
create table temperatures ( sensorid text, timestamp timestamptz, temperature float ); select * from temperatures;

このSQL文は、まずtemperaturesデータベースに接続するための\c temperatures;というコマンドを実行しています。そして、temperaturesテーブルを作成するためのcreate table文を実行しています。最後に、temperaturesテーブルの中身を確認するためのselect文を実行しています。

■ publish_temperatures.py
このファイルは、RabbitMQに対して、温度センサーのデータを送信するためのアプリケーションコードです。

def publish() -> None:

    data = {
        "sensorid": "sensor-1",
        "timestamp": "2026-03-07T10:00:00Z",
        "temperature": 22.5,
    }

まず、関数publishを定義しています。この関数は、RabbitMQに対して、温度センサーのデータを送信するためのものです。

このコードは、温度センサーのデータを表すJSONオブジェクトを作成しています。このJSONオブジェクトには、sensorid、timestamp、temperatureという3つのフィールドが含まれています。sensoridは、センサーのIDを表す文字列です。timestampは、センサーのデータが記録された日時を表す文字列です。temperatureは、センサーのデータである温度を表す数値です。

    params = pika.URLParameters("amqp://guest:guest@localhost:5672")
    connection = pika.BlockingConnection(params)
    channel = connection.channel()

このコードは、pikaライブラリを使用して、RabbitMQに接続しています。pika.URLParametersを使用して、RabbitMQの接続情報を指定しています。そして、pika.BlockingConnectionを使用して、RabbitMQに接続しています。最後に、connection.channel()を使用して、RabbitMQのチャネルを作成しています。

    channel.queue_declare(queue="dapr-queue")

このコードは、RabbitMQのチャネルを使用して、dapr-queueという名前のキューを宣言しています。これにより、dapr-queueという名前のキューがRabbitMQに作成されます。

    body = json.dumps(data)

    channel.basic_publish(
        exchange="",
        routing_key="dapr-queue",
        body=body,
        properties=pika.BasicProperties(delivery_mode=2),
    )

このコードは、RabbitMQのチャネルを使用して、dapr-queueという名前のキューに対して、温度センサーのデータを送信しています。exchangeには空文字列を指定することで、デフォルトのエクスチェンジを使用しています。routing_keyには、dapr-queueという名前のキューを指定しています。bodyには、温度センサーのデータをJSON形式で表した文字列を指定しています。そして、propertiesには、メッセージのプロパティを指定しています。ここでは、delivery_mode=2を指定することで、メッセージが永続化されるようにしています。

    connection.close()

このコードは、RabbitMQへの接続を閉じています。

■ app.py
このファイルは、DaprのBindingsコンポーネントを利用して、RabbitMQからデータを受け取って、PostgreSQLにデータを保存するためのアプリケーションコードです。

app = Flask(__name__)

 Triggered by Dapr input binding
@app.route("/mq", methods=["POST"])
def process_batch():

このコードは、Flaskを使用して、HTTPサーバーを立てています。そして、/mqというエンドポイントに対してPOSTリクエストが送られたときに呼び出される関数process_batchを定義しています。このエンドポイントの/mqというパスは、先程のコンポーネント定義で指定した「mq」という名前のBindingsコンポーネントを参照するためのパスです。これにより、Daprは「mq」という名前のBindingsコンポーネントを参照して、RabbitMQからメッセージを受信すると、このエンドポイントに対してリクエストを送るようになります。

    data = request.get_json(silent=True)

    sql_output(data)

    print("Finished processing batch", flush=True)

    return json.dumps({"success": True}), 200, {"ContentType": "application/json"}

関数process_batchの中身です。まず、リクエストのJSONボディを取得しています。そして、sql_output関数を呼び出して、RabbitMQから受け取ったデータをPostgreSQLに保存しています。最後に、HTTPレスポンスとして、成功を示すJSONオブジェクトを返しています。

def sql_output(reading):

    # expected keys: sensorid, timestamp, temperature
    sensorid = reading.get("sensorid")
    timestamp = reading.get("timestamp")
    temperature = reading.get("temperature")

先程の関数process_batchの中で呼び出されているsql_output関数です。この関数は、RabbitMQから受け取ったデータをPostgreSQLに保存するためのものです。まず、関数sql_outputを定義しています。この関数は、RabbitMQから受け取ったデータを表すJSONオブジェクトを引数として受け取ります。そして、そのJSONオブジェクトから、sensorid、timestamp、temperatureという3つのフィールドを取得しています。

    sqlCmd = (
        "insert into temperatures (sensorid, timestamp, temperature) values "
        + "('%s', '%s', %s)" % (sensorid, timestamp, temperature)
    )

このコードは、PostgreSQLに対して実行するSQL文を作成しています。ここでは、temperaturesテーブルに対して、sensorid、timestamp、temperatureの値を挿入するためのinsert文を作成しています。

    payload = {"operation": "exec", "metadata": {"sql": sqlCmd}}

このコードは、DaprのAPIに送るリクエストを定義しています。リクエストの形式は、利用するコンポーネントによって異なるのですが、PostgreSQLは以下の形式になります。

{
  "operation": "exec",
  "metadata": {
    "sql": "INSERT INTO foo (id, c1, ts) VALUES ($1, $2, $3)",
    "params": "[1, \"demo\", \"2020-09-24T11:45:05Z07:00\"]"
  }
}

このリクエストは、operationにexecを指定することで、SQL文を実行することを示しています。そして、metadataの中に、sqlというフィールドを定義して、その中に実行するSQL文を指定しています。ここでは、先程作成したsqlCmdという変数に格納されているSQL文を指定しています。

paramsというフィールドも定義されているのですが、今回は使用しません。paramsは、SQL文の中でプレースホルダを使用する場合に、そのプレースホルダに対応する値を指定するためのフィールドです。今回は、SQL文の中でプレースホルダを使用していないため、paramsは必要ありません。

よって今回送付するリクエストは以下のとおりとなります。

{
  "operation": "exec",
  "metadata": {
    "sql": "INSERT INTO temperatures (sensorid, timestamp, temperature) VALUES ('sensor-1', '2026-03-07T10:00:00Z', 22.5)"
  }
} 
    resp = requests.post("http://localhost:3617/v1.0/bindings/sqldb", json=payload)
    return resp

requestsライブラリを使用して、DaprのAPIに対してリクエストを送っています。リクエストのURLには、先程のコンポーネント定義で指定した「sqldb」が含まれています。これにより、Daprは「sqldb」という名前のBindingsコンポーネントを参照して、PostgreSQLに対してSQL文を実行するようになります。リクエストのURLの構成は以下の通りです。リクエストボディには、先程作成したpayloadという変数に格納されているJSONオブジェクトを指定しています。

  • http://localhost:3617: Daprサイドカーのエンドポイント
  • /v1.0: Dapr APIのバージョン
  • /bindings: Bindings APIを呼び出すことを示すパス
  • /sqldb: 先程のコンポーネント定義で指定した「sqldb」という名前のBindingsコンポーネントを参照するためのパス
app.run(port=6107)

Flaskアプリケーションをポート6107で起動しています。これにより、アプリはlocalhost:6107でHTTPリクエストを受け付けるようになります。RabbitMQからメッセージが送られてくると、Daprのサイドカーはこのポートに対してリクエストを送るため、アプリはこのポートでリクエストを受け取る必要があります。

実行方法

まずは必要なライブラリをインストールします。

$ pip install -r requirements.txt

次に、PostgreSQLとRabbbitMQのコンテナを起動します。

$ cd Infrastructure/db
$ docker-compose up -d

Dapr CLIを使用してアプリケーションコードとサイドカープロセスを起動します。

$ dapr run --app-id bindings --app-port 6107 --dapr-http-port 3617 --components-path Infrastructure/components -- python app.py

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「bindings」というIDを指定しています。このIDは、Bindingsコンポーネントでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。

–app-portオプションでアプリケーションのポート番号を指定します。ここでは6107を指定しています。これにより、アプリはlocalhost:6107で待ち受けるようになります。

–dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3617を指定しています。これにより、Daprサイドカーはlocalhost:3617でHTTPリクエストを受け付けるようになります。

–components-pathオプションでDaprコンポーネントの定義ファイルが格納されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。

— python app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。

このコマンドを実行した状態で、publish_temperatures.pyを実行してみましょう。

$ python publish_temperatures.py

このコードを実行すると、RabbitMQに温度センサーのデータが送信されます。そして、Daprのサイドカーがそのデータを受け取って、アプリケーションコードに送ります。アプリケーションコードは、そのデータをPostgreSQLに保存します。

本当にデータがPostgreSQLに保存されたかを確認してみましょう。以下のコマンドを実行して、PostgreSQLのコンテナに接続します。

$ docker exec -i postgres psql --username postgres --dbname temperatures -c "select * from temperatures;"
 sensorid |       timestamp        | temperature 
----------+------------------------+-------------
 sensor-1 | 2026-03-07 10:00:00+00 |        22.5
(1 row)

すると、temperaturesテーブルの中に、先程publish_temperatures.pyを実行したときに送信したデータが保存されていることが確認できます。

Service Invocation

Service Invocationは、Daprの機能の一つであり、アプリケーションが他のアプリケーションに対してリクエストを送るときに、便利な機能がたくさんあります。

  1. アプリケーションは、他のアプリケーションのこと(IPアドレスなど)を知らなくても、DaprのAPIを通じて、他のアプリケーションに対してリクエストを送ることができます。
  2. アプリケーション同士の通信が失敗しても、サイドカーが自動的にリトライしてくれるため、通信の信頼性が高まります。
  3. サイドカーが、アプリケーション同士の通信をmTLSによって暗号化してくれるため、通信のセキュリティが高まります。
  4. サイドカーが、アプリケーション同士の通信をモニターして、ログやメトリクスなどを収集してくれるため、通信の可観測性が高まります。

つまり、サイドカーがいろんなことをしてくれるので、アプリケーション側では何も考えずに、DaprのAPIを通じて、他のアプリケーションに対してリクエストを送ることができます。これが、Service Invocationの大きなメリットの一つです。

様々な利便性があるService Invocationですが、1の機能に焦点をあてて、具体的な事例(センサーデータを送るアプリケーションから、センサーのデータを受け取って処理するアプリケーションに対してリクエストを送る)を通じて、Service Invocationの使い方を説明していきます。

システム構成

ということで、センサーデータを送るアプリケーションから、センサーのデータを受け取って処理するアプリケーションに対してリクエストを送るシステムを例に挙げて説明します。構成図は以下のとおりです。

① アプリケーションIDが「telemetry-sender」のアプリケーションは、DaprのAPIを使って、アプリケーションIDが「telemetry-collector」のアプリケーションにリクエストを送ります。
このときのURLは http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry です。
ここで「3615」は telemetry-sender 側のDaprサイドカーのHTTPポートを表しています。
また、このURLのパスは「telemetry-collector」というアプリケーションの「telemetry」というエンドポイントを呼び出すことを意味します。

② サイドカーはこのリクエストを受け取ると、mDNS(multicast DNS)を使って、アプリケーションID「telemetry-collector」がどこで動いているか(IPアドレス)を調べます。

③ すると、telemetry-collector 側のサイドカー内のmDNSリゾルバーが応答し、「自分が telemetry-collector である」と名乗って、自身のIPアドレスなどの情報を返します。

④ telemetry-sender 側のサイドカーは、その情報をもとに telemetry-collector のサイドカーへリクエストを送ります。

⑤ telemetry-collector 側のサイドカーは、受け取ったリクエストを実際のアプリケーション(telemetry-collector)に転送します。

このように、アプリケーションID「telemetry-sender」のアプリケーションは、アプリケーションID「telemetry-collector」のアプリケーションのIPアドレスやポート番号を知らなくても、DaprのAPIを通じて、アプリケーションID「telemetry-collector」のアプリケーションに対してリクエストを送ることができます。これが、Service Invocationの大きなメリットの一つです。

ちなみに本記事で紹介している構成(Self-hosted)では、mDNSを使っていますが、AWSのECSやAzureのAKSなどのコンテナオーケストレーションサービスを使用している場合は、mDNSの代わりに、高度なDNSリゾルバーが使用されます。

ファイル構成

Service Invocationコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ(https://github.com/noriyukitakei/dapr-sample)の中の、ServiceInvocationディレクトリの中に配置されています。

ServiceInvocation
├── telemetry-collector
│   ├── app.py
│   └── requirements.txt
└── telemetry-sender
    ├── app.py
    └── requirements.txt

ソースコードの説明

■ telemetry-collector/app.py

app = Flask(__name__)

このコードは、Flaskを使用して、HTTPサーバーを立てています。

@app.route("/telemetry", methods=["POST"])
def telemetry():
    data = request.json

    print("Collector received:", data, flush=True)

    return (
        json.dumps({"ok": True}),
        200,
        {"ContentType": "application/json"},
    )

このコードは、/telemetryというエンドポイントに対してPOSTリクエストが送られたときに呼び出される関数を定義しています。このエンドポイントは、先程の構成図の中で、アプリケーションID「telemetry-sender」のアプリケーションが呼び出すエンドポイントになります。

アプリケーションID「telemetry-sender」が呼び出すエンドポイントは、先ほど説明した通り、http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry になります。このURLのパスの最後の部分である「telemetry」が、telemetry-collector 側のアプリケーションのエンドポイントになります。なので、telemetry-collector 側のアプリケーションは、このエンドポイントを定義する必要があります。

telemetry関数の中では、リクエストのJSONボディを取得して、コンソールに出力しています。そして、HTTPレスポンスとして、成功を示すJSONオブジェクトを返すという非常にシンプルな処理をしています。

app.run(port=6106)

Flaskアプリケーションをポート6106で起動しています。これにより、アプリはlocalhost:6106でHTTPリクエストを受け付けるようになります。telemetry-senderからリクエストが送られてくると、Daprのサイドカーはこのポートに対してリクエストを送るため、アプリはこのポートでリクエストを受け取る必要があります。

■ telemetry-sender/app.py

payload = {"sensorId": "sensor-1", "temperatureC": 23}

このコードは、telemetry-collectorに送るデータを表すJSONオブジェクトを作成しています。このJSONオブジェクトには、sensorIdとtemperatureCという2つのフィールドが含まれています。センサーのIDを表すsensorIdと、センサーのデータである温度を表すtemperatureCです。

response = requests.post(
    url="http://127.0.0.1:3615/v1.0/invoke/telemetry-collector/method/telemetry",
    data=json.dumps(payload),
    headers={
        "content-type": "application/json",
    },
)

このコードは、requestsライブラリを使用して、DaprのAPIに対してリクエストを送っています。リクエストのURLには、先程の構成図の中で、アプリケーションID「telemetry-sender」のアプリケーションが呼び出すURLである http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry が指定されています。リクエストボディには、先程作成したpayloadという変数に格納されているJSONオブジェクトを指定しています。

実行方法

まずは必要なライブラリをインストールします。

$ pip install -r telemetry-sender/requirements.txt
$ pip install -r telemetry-collector/requirements.txt

次に、telemetry-collectorのアプリケーションコードとサイドカープロセスを起動します。

$ dapr run --app-id telemetry-collector --app-port 6106 --dapr-http-port 3616 -- python telemetry-collector/app.py

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「telemetry-collector」というIDを指定しています。このIDは、Service Invocationでは非常に重要になります。なぜなら、アプリケーションID「telemetry-sender」のアプリケーションが、アプリケーションID「telemetry-collector」のアプリケーションにリクエストを送るときに、このIDを使用して、どのアプリケーションにリクエストを送るかを指定するからです。このアプリケーションIDを元にして、Daprのサイドカーは、mDNSを使って、アプリケーションID「telemetry-collector」がどこで動いているか(IPアドレス)を調べます。

–app-portオプションでアプリケーションのポート番号を指定します。ここでは6106を指定しています。これにより、アプリはlocalhost:6106で待ち受けるようになります。

–dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3616を指定しています。これにより、Daprサイドカーはlocalhost:3616でHTTPリクエストを受け付けるようになります。

— python telemetry-collector/app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。

次に、telemetry-senderのアプリケーションコードとサイドカープロセスを起動します。

$ dapr run --app-id telemetry-sender --dapr-http-port 3615 -- python telemetry-sender/app.py

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「telemetry-sender」というIDを指定しています。

–dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3615を指定しています。これにより、Daprサイドカーはlocalhost:3615でHTTPリクエストを受け付けるようになります。

— python telemetry-sender/app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。

このコマンドを実行すると、telemetry-sender側のコンソールに以下のような出力がされます。

== APP == Sender sent: {'sensorId': 'sensor-1', 'temperatureC': 23}
== APP == Response status: 200

そして、telemetry-collector側のコンソールには以下のような出力がされます。

== APP == Collector received: {'sensorId': 'sensor-1', 'temperatureC': 23}
== APP == 127.0.0.1 - - [03/Apr/2026 13:53:42] "POST /telemetry HTTP/1.1" 200 -

上記のようなものが出力されていれば、telemetry-senderのアプリケーションが、telemetry-collectorのアプリケーションに対してリクエストを送ることができていることが確認できます。

Secrets Management

Secrets Managementは、Daprの機能の一つであり、アプリケーションがシークレットを安全に管理するための機能です。クラウドのサービスで、シークレットを扱うサービスはたくさんあります。AWSのSecrets ManagerやAzureのKey Vaultなどが有名です。これらのサービスを利用することで、シークレットを安全に管理することができます。

ただし、これらのサービスを使うためには、それぞれの専用のAPIを呼び出したり、SDKを使用したりする必要があります。これらのサービスを利用するためのコードを書くのは、面倒なことが多いです。DaprのSecrets Managementを利用することで、これらのサービスを利用するためのコードを書く必要がなくなります。DaprのAPIを通じて、シークレットを取得することができるようになります。

アプリからはDaprのサイドカーに、「シークレットを取得したい」というリクエストを送ります。すると、サイドカーは、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。そして、シークレットストアからシークレットが返ってくると、サイドカーは、そのシークレットをアプリに返します。

システム構成

では、シークレットを取得するアプリケーションの例を通じて、Secrets Managementの使い方を説明していきます。構成図は以下のとおりです。

今回ご紹介する例では、AWS Secrets ManagerやAzure Key Vaultではなく、ローカルに配置したJSONファイルをシークレットストアとして使用します。Daprは、ローカルに配置したJSONファイルをシークレットストアとして使用することができます。これにより、AWS Secrets ManagerやAzure Key Vaultなどのクラウドのサービスを利用しなくても、Secrets Managementの機能を試すことができます。

ローカルに配置したJSONファイルをシークレットストアとして使用する方式は、Daprの公式ドキュメントにおいては本番環境では推奨されていません。今回の説明のために、ローカルに配置したJSONファイルをシークレットストアとして使用する方式を紹介していますが、実際のプロジェクトでSecrets Managementの機能を利用する際には、AWS Secrets ManagerやAzure Key Vaultなどのクラウドのサービスを利用することをおすすめします。

ということで先ほどの構成図に基づいて、シークレットを取得するアプリケーションの例を通じて、Secrets Managementの使い方を説明していきます。

① アプリケーションは、DaprのAPIを使って、シークレットを取得するためのリクエストをサイドカーに送ります。
このときのURLは http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret です。このURLの構成を説明します。

② サイドカーはこのリクエストを受け取ると、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。

ファイル構成

Secrets Managementコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ(https://github.com/noriyukitakei/dapr-sample)の中の、SecretsManagementディレクトリの中に配置されています。

SecretsManagement
├── app.py
├── Infrastructure
│   └── components
│       ├── local-secretstore.yaml
│       └── secrets.json
├── README.md
└── requirements.txt

ソースコードの説明

では、Secrets Managementの機能を利用するためのソースコードを説明していきます。

■ Infrastructure/components/local-secretstore.yaml

apiVersion: dapr.io/v1alpha1 # DaprのAPIバージョン
kind: Component # コンポーネントの種類
metadata:
  name: local-secretstore # コンポーネントの名前
spec:
  type: secretstores.local.file # コンポーネントの種類を指定
  version: v1
  metadata:
    - name: secretsFile # シークレットストアとして使用するJSONファイルのパスを指定するためのメタデータ
      value: "./Infrastructure/components/secrets.json" # シークレットストアとして使用するJSONファイルのパス

このファイルは、DaprのSecrets ManagementコンポーネントをローカルのJSONファイルをシークレットストアとして使用するように設定するためのコンポーネント定義ファイルです。

metadata.nameは、アプリケーションがこのコンポーネントを参照するための名前になります。先ほどご紹介したシステム構成の中で、アプリケーションが送るリクエストのURLは http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret でした。このURLの中の「local-secretstore」という部分は、metadata.nameで指定した名前になります。つまり、metadata.nameが「local-secretstore」であることによって、アプリケーションは「local-secretstore」という名前のコンポーネント定義ファイルを参照して、シークレットストアとして使用するJSONファイルのパスを知ることができるようになります。

spec.typeは、コンポーネントの種類を指定するためのフィールドです。ここでは、Daprが提供しているローカルのJSONファイルをシークレットストアとして使用するためのコンポーネントであるsecretstores.local.fileを指定しています。Azure Key Vaultの場合はsecretstores.azure.keyvault、AWS Secrets Managerの場合はsecretstores.aws.secretsmanagerを指定します。

metadataの中には、spec.typeで指定したコンポーネントの種類に応じたメタデータを定義します。ローカルのJSONファイルをシークレットストアとして使用するためのコンポーネントであるsecretstores.local.fileの場合は、secretsFileという名前のメタデータを定義して、そのvalueにシークレットストアとして使用するJSONファイルのパスを指定します。

■ Infrastructure/components/secrets.json

{
  "my-secret": "hello-from-file"
}

このファイルは、ローカルに配置したJSONファイルをシークレットストアとして使用するためのJSONファイルです。このファイルには、シークレットの名前と値のペアが定義されています。ここでは、my-secretという名前のシークレットに対して、hello-from-fileという値が定義されています。

■ app.py

URL = "http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret"
print(requests.get(URL).text)

このコードは、DaprのAPIを使って、シークレットを取得するためのリクエストをサイドカーに送っています。リクエストのURLには、先程の構成図の中で、アプリケーションが送るリクエストのURLである http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret が指定されています。リクエストを送ると、サイドカーは、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。そして、シークレットストアからシークレットが返ってくると、サイドカーは、そのシークレットをアプリに返します。最後に、取得したシークレットをコンソールに出力しています。

実行方法

まずは必要なライブラリをインストールします。

$ pip install -r requirements.txt

次に、Dapr CLIを使用してアプリケーションコードとサイドカープロセスを起動します。

$ dapr run --app-id secretsmanagement --dapr-http-port 3618 --components-path Infrastructure/components -- python app.py
...略...
== APP == {"my-secret":"hello-from-file"}

上記のコマンドの詳細を説明します。

–app-idオプションでアプリケーションのIDを指定します。ここでは「secretsmanagement」というIDを指定しています。

–dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3618を指定しています。これにより、Daprサイドカーはlocalhost:3618でHTTPリクエストを受け付けるようになります。

–components-pathオプションでDaprコンポーネントの定義ファイルが格納されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。

— python app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。

このコマンドを実行すると、コンソールに{“my-secret”:”hello-from-file”}と出力されます。これは、DaprのAPIを通じて、シークレットを取得するためのリクエストをサイドカーに送った結果、サイドカーがあらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送って、そのシークレットをアプリに返した結果になります。つまり、DaprのSecrets Managementの機能が正常に動作していることが確認できます。

デモアプリ

ひと通り、Daprの基本的な機能であるPub/Sub、Bindings、Service Invocation、Secrets Managementについて説明してきました。これらの機能を組み合わせることで、ちょっぴり本格的なマイクロサービスを作ってみましょう。

このアプリのソースコードは以下のリポジトリで公開しております。

https://github.com/noriyukitakei/dapr-demo

機能概要

このデモアプリは、センサーから送られてくるデータをリアルタイムに処理し、可視化やアラート通知を行うシステムです。

まず、センサーのシミュレーターが温度データを生成します。このデータは一度メッセージング基盤に送られ、そこからシステム全体に配信されます。

データを受け取ったゲートウェイの役割を持つサービスは、その内容を保存するとともに、異常値かどうかを判断します。
もし異常な値であれば、別のサービスに通知され、メールなどでアラートが送信されます。

一方で、通常のデータは蓄積され、ダッシュボードサービスから参照できるようになります。ユーザーはブラウザなどからセンサーの状態を確認でき、現在の状況をリアルタイムに把握することができます。

つまりこのシステムは、

  • データの収集
  • データの保存
  • 異常検知
  • 通知
  • 可視化

といった一連の処理を、複数のサービスに分けて実現しています。

システム構成

デモアプリのシステム構成は以下のとおりです。

このシステムは3つのマイクロサービスで構成されています。デバイスからセンサーを受信するSensor Gateway Service、異常値を検知してアラートを送るAlert Service、センサーの状態を可視化するDashboard Serviceです。

このデモアプリでは、センサーのシミュレーターは、Sensor Gateway Serviceに対してセンサーデータを送ります。Sensor Gateway Serviceは、そのデータを保存するとともに、異常値かどうかを判断します。もし異常な値であれば、Alert Serviceに通知され、メールなどでアラートが送信されます。一方で、通常のデータは蓄積され、Dashboard Serviceから参照できるようになります。

では、システム構成図の中の①~⑰の流れに沿って、システム全体の動きを説明していきます。

① センサーのシミュレーターであるSensor Simulatorが、センサーデータを生成する。アラートが上がるようなある一定温度以上のデータを一定確率で生成して、MQTTに送信する。

② MQTTに送られたデータをSensor Gateway Serviceが受信する。Sensor Gateway Serviceは、DaprのInput Bindings機能を使って、MQTTからデータを受け取るように設定されている。

③ Sensor Gateway Serviceのサイドカーは、MQTTからデータを受け取ると、そのデータをSensor Gateway Serviceに送る。受け取るためのエンドポイントはInput Bindingsのコンポーネント定義で指定されている。

④ Sensor Gateway Serviceのサイドカーは、Secrets Managementの機能を使って、機密情報を格納しているデータストア(今回はローカルのJSONファイル)から、アラートのしきい値などを取得する。

⑤ Sensor Gateway Serviceは、④で受け取ったしきい値を元に、受け取ったセンサーデータが異常値かどうかを判断する。

⑥ Sensor Gateway Serviceは、受け取ったセンサーデータをRedisに保存する。これはState Managementの機能を使って実現している。

⑦ Sensor Gateway Serviceは、受け取ったセンサーデータが正常値であればtelemetryトピックに、異常値であればalertトピックに送信する。これはPub/Subの機能を使って実現している。

⑧ RabbitMQは、alertトピックに送られた異常値のデータをAlert Serviceに配信する。Alert Serviceは、DaprのPub/Sub機能を使って、alertトピックからデータを受け取るように設定されている。

⑨ Alert Serviceのサイドカーは、alertトピックからデータを受け取ると、そのデータをAlert Serviceに送る。/alertsというエンドポイントに対してPOSTリクエストを送るように設定されている。このエンドポイントは、Alert Serviceのアプリケーションコードの中で定義されている。

⑩ Alert Serviceは、受け取ったデータを元に、メールなどでアラートを送る。Output Bindingsの機能を使って、メールを送るように設定されている。

⑪ Alert Serviceのサイドカーは、Output Bindingsの機能を使って、指定されたSMTPサーバーに対してメールを送信する。このメールサーバーは、maildev(https://github.com/maildev/maildev)というローカルで動かすことができるメールサーバーを使用している。

⑫ 管理者は、DashBoard Serviceが提供しているエンドポイントに対して、センサーの状態を確認するためのリクエストを送る。

⑬ Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送る。リクエストの内容は、センサーの状態を確認するためのものである。リクエストのURLは、センサーのIDがsensor-001とした場合、 http://localhost:3610/v1.0/invoke/sensorgateway/method/state/sensor-001 である。このURLのパスのsensorgatewayという部分は、Sensor Gateway ServiceのアプリケーションIDである。つまり、Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送るときに、Sensor Gateway ServiceのアプリケーションIDを指定している。state/sensor-001という部分は、Sensor Gateway Serviceのアプリケーションコードの中で定義されているエンドポイントになる。つまり、Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送るときに、Sensor Gateway Serviceのアプリケーションコードの中で定義されているエンドポイントを指定している。

⑭ DashBoard Serviceのサイドカーは、mDNSを使って、Sensor Gateway Serviceのサイドカーがどこで動いているか(IPアドレス)を調べて、Sensor Gateway Serviceのサイドカーにリクエストを送る。

⑮ Sensor Gateway Serviceのサイドカーは、受け取ったリクエストをSensor Gateway Serviceの/state/{sensor_id}というエンドポイントに転送する。

⑯ Sensor Gateway Serviceは、受け取ったリクエストを元に、Sate Gateway ServiceのState Managementの機能を使って、Redisに保存されているセンサーの状態を取得する。

⑰ Sensor Gateway Serviceのサイドカーは、Redisにアクセスしてセンサーの状態を取得した後、その状態をDashboard Serviceに返す。

ファイルの構成

デモアプリのファイル構成は以下のとおりです。

.
├── AlertService
│   ├── alertservice
│   │   └── app.py
│   └── requirements.txt
├── DashboardService
│   ├── dashboard
│   │   └── app.py
│   └── requirements.txt
├── Infrastructure
│   ├── components
│   │   ├── mqtt-binding.yaml
│   │   ├── notify-binding.yaml
│   │   ├── pubsub.yaml
│   │   ├── secretstore.yaml
│   │   └── statestore.yaml
│   ├── docker-compose.yml
│   ├── mosquitto.conf
│   └── secrets.json
├── SensorGatewayService
│   ├── requirements.txt
│   └── sensorgateway
│       ├── app.py
│       └── settings.py
├── Simulation
│   ├── requirements.txt
│   └── simulate.py
└── common
    ├── pyproject.toml
    └── smartfarm_common
        └── models.py

それぞれのファイルの内容をサービスごとに説明します。

Sensor Gateway Service

■ sensorgateway/app.py
センサーのデータを受信して処理するサービスです。センサーデータを受信して、異常値かどうかを判断し、データを保存したり、異常値であればアラートを送ったりします。

  • Input Bindingsを使ってMQTTからデータ受信
  • State Managementを使ってデータ保存(Redis)
  • Pub/Subでイベント発行(RabbitMQに正常データ、異常データをそれぞれ別のトピックに送る)
  • DashBoard Serviceからのリクエストに応じて、センサーの状態を返すエンドポイントを提供

■ sensorgateway/settings.py
アラートのしきい値など各種設定を管理するファイルです。DaprのSecrets Management機能を使って、ローカルのJSONファイルから設定値を取得するようになっています。

Alert Service

■ alertservice/app.py
アラート通知を担当するサービスです。異常なセンサーデータを検知した際に、通知処理を行います。

  • メッセージキュー(RabbitMQ)から異常データ(alertトピックに送られたデータ)を受信
  • 異常時にOutput Bindingsの機能を使って、メールで通知

Dashboard Service

■ dashboard/app.py
ユーザー向けにデータを表示するサービスです。以下の機能を提供します。

  • /sensors/{sensor_id}というAPIエンドポイントを提供して、センサーの状態を返却

Infrastructure

■ components/mqtt-binding.yaml
MQTTからデータを受信するためのInput Bindingsのコンポーネント定義ファイルです。

■ components/notify-binding.yaml
メールを送るためのOutput Bindingsのコンポーネント定義ファイルです。maildevというローカルで動かすことができるメールサーバーに対して、SMTPプロトコルを使ってメールを送るように設定されています。

■ components/pubsub.yaml
RabbitMQをPub/Subのコンポーネントとして使用するためのコンポーネント定義ファイルです。

■ components/secretstore.yaml
ローカルのJSONファイルをSecrets Managementのシークレットストアとして使用するためのコンポーネント定義ファイルです。

■ components/statestore.yaml
RedisをState Managementのステートストアとして使用するためのコンポーネント定義ファイルです。

■ docker-compose.yml
RabbitMQ、mosquitto(MQTTブローカー)、maildev(SMTPサーバー)をDockerコンテナで起動するためのdocker-composeファイルです。

■ mosquitto.conf
mosquitto(MQTTブローカー)の設定ファイルです。MQTTのポート番号や、認証の設定などが記述されています。

■ secrets.json
DaprのSecrets Managementの機能を使って、ローカルのJSONファイルをシークレットストアとして使用するためのJSONファイルです。アラートのしきい値などの設定値が定義されています。

Simulation

■ simulate.py
センサーのシミュレーターです。センサーデータを生成して、MQTTに送る役割を持っています。

common

■ smartfarm_common/models.py
このファイルは、Sensor Gateway Service、Alert Service、Dashboard Serviceの3つのサービスで共通して使用するモデルクラスを定義しています。センサーデータの構造などを定義しています。

実行方法

このデモアプリを実行するための方法を説明します。以下の手順に従ってください。

事前準備

既にDapr CLIをインストールしている場合は、Dapr CLIのインストールはスキップしていただいて構いません。まだインストールしていない場合は、以下のコマンドを実行して、Dapr CLIをインストールしてください。

$ wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
$ dapr init

リポジトリのクローンとディレクトリ移動

このデモアプリが公開されているGitHubリポジトリをクローンして、プロジェクトのディレクトリに移動します。

$ git clone https://github.com/noriyukitakei/dapr-demo.git
$ cd dapr-demo

必要なDockerコンテナの起動

デモアプリの稼働に必要な3つのコンテナ(RabbitMQ、mosquitto、maildev)をDocker Composeを使って起動します。

$ cd Infrastructure
$ docker-compose up -d

Sensor Gateway Serviceの起動

Pythonの仮想環境を作成して有効にします。

$ cd ../SensorGatewayService
$ python -m venv .venv
$ source .venv/bin/activate

必要なライブラリをインストールします。

$ pip install -r requirements.txt

Sensor Gateway Serviceのアプリケーションとサイドカープロセスを起動します。

$ dapr run --app-id sensorgateway --app-port 6100 --dapr-http-port 3610 --components-path ../Infrastructure/components -- python -m uvicorn sensorgateway.app:app --port 6100

Alert Serviceの起動

Sensor Gateway Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、Alert Serviceを起動します。

Pythonの仮想環境を作成して有効にします。

$ cd [GitHubリポジトリのクローン先ディレクトリ]/AlertService
$ python -m venv .venv
$ source .venv/bin/activate

必要なライブラリをインストールします。

$ pip install -r requirements.txt

Alert Serviceのアプリケーションとサイドカープロセスを起動します。

$ dapr run --app-id alertservice --app-port 6101 --dapr-http-port 3611 --components-path ../Infrastructure/components -- python -m uvicorn alertservice.app:app --port 6101

Dashboard Serviceの起動

Sensor Gateway ServiceとAlert Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、Dashboard Serviceを起動します。

Pythonの仮想環境を作成して有効にします。

$ cd [GitHubリポジトリのクローン先ディレクトリ]/DashboardService
$ python -m venv .venv
$ source .venv/bin/activate

必要なライブラリをインストールします。

$ pip install -r requirements.txt

Dashboard Serviceのアプリケーションとサイドカープロセスを起動します。

$ dapr run --app-id dashboardservice --app-port 6102 --dapr-http-port 3612 --components-path ../Infrastructure/components -- python -m uvicorn dashboard.app:app --port 6102

シミュレーターの起動と動作確認

Sensor Gateway ServiceとAlert ServiceとDashboard Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、センサーのシミュレーターを起動します。

Pythonの仮想環境を作成して有効にします。

$ cd [GitHubリポジトリのクローン先ディレクトリ]/Simulation
$ python -m venv .venv
$ source .venv/bin/activate

必要なライブラリをインストールします。

$ pip install -r requirements.txt

シミュレーターを起動します。すると、定期的にセンサーデータが生成されてMQTTに送られます。

$ python simulate.py
Publishing telemetry to mqtt://localhost:1883 topic=farm/telemetry
sent: {'sensorId': 'sensor-003', 'temperature': 29.89, 'humidity': 60.44, 'timestamp': '2026-04-07T00:44:04.142516+00:00'}
sent: {'sensorId': 'sensor-002', 'temperature': 24.09, 'humidity': 60.69, 'timestamp': '2026-04-07T00:44:05.145853+00:00'}
sent: {'sensorId': 'sensor-002', 'temperature': 33.83, 'humidity': 41.07, 'timestamp': '2026-04-07T00:44:06.147417+00:00'}
sent: {'sensorId': 'sensor-003', 'temperature': 22.22, 'humidity': 35.16, 'timestamp': '2026-04-07T00:44:07.149945+00:00'}

Alert Serviceのコンソールには、異常値が検知されたときに以下のような出力がされます。

== APP == WARNING:alertservice:ALERT received: Temperature 42.27C exceeds threshold 40.0C
== APP == INFO:     127.0.0.1:53585 - "POST /alerts HTTP/1.1" 200 OK
== APP == WARNING:alertservice:ALERT received: Temperature 44.84C exceeds threshold 40.0C
== APP == INFO:     127.0.0.1:53660 - "POST /alerts HTTP/1.1" 200 OK

メールが送付されているはずですので、見てみましょう。maildevは、http://localhost:1080でWeb UIが提供されているので、ブラウザでアクセスしてみてください。すると、Alert Serviceから送られたメールが届いていることが確認できます。

DashBoard Serviceにアクセスして、センサーの状態を確認してみましょう。センサーのIDがsensor-001とした場合、以下のURLにアクセスしてみてください。

$ curl http://localhost:6102/sensors/sensor-001
{"found":true,"state":{"sensorId":"sensor-001","temperature":31.88,"humidity":45.29,"timestamp":"2026-04-07T00:51:40.901649Z","updatedAt":"2026-04-07T00:51:40.906840Z"}}

上記のようなレスポンスが返ってくれば、Dashboard ServiceがSensor Gateway Serviceに対してリクエストを送って、センサーの状態を取得することができていることが確認できます。

まとめ

今回は、Daprの基本的な機能であるPub/Sub、Bindings、Service Invocation、Secrets Managementについて、わかりみ深く説明しました。これらの機能を組み合わせることで、ちょっぴり本格的なマイクロサービスを作ることができることも紹介しました。

Daprは、マイクロサービスを開発するための便利な機能を提供しているので、ぜひ活用してみてください。Daprを使うことで、マイクロサービスの開発がより簡単になり、開発者はビジネスロジックの実装に集中することができるようになります。Daprは、マイクロサービスの開発を加速させるための強力なツールであると言えるでしょう。

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。
エンジニア募集中!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です