Blob Storageに配置されたオブジェクトをAzure Functionsで処理する

こんにちは、サイオステクノロジーの森谷です。
今回はAzure Blob Storageに配置されたファイルオブジェクトに対して処理を行うAzure Functionsを作成する際に気を付けたいことを紹介したいと思います。

はじめに

Azure Functionsとは?

Azure Functions
Azure Functionsは、非常駐のプロセスをイベントによってトリガーして実行するサービスです。

  • 関数のコードのみを記述し、それが実行されるインフラはMicrosoft Azureが管理するサーバレスアーキテクチャ
    • サーバのスケーリング、運用等を考えることなく、関数のロジックに注力できる
    • 関数を呼出・処理した分だけの課金で済む (※従量課金プランの場合。料金体系については後述)
  • トリガーおよび入出力のバインドをサービスの設定レベルで管理可能
    • 外部サービスとの連携を、関数のコード内に記述することなく可能で、関数のコードが抽象化しやすい
  • Function Appsとして、Azure App Serviceと共通のインフラで動作する
    • 他のAzure App Service同様にデプロイ、環境変数、診断などの機能が利用できる
    • Web Apps、Mobile Apps、API Appsと実行環境を共有できコストを削減できる (※App Service プランの場合。料金体系については後述)

Azure Functionsの2つの料金体系

Azure Functionsには従量課金プランとApp Service プランという2つのホスティングプランがあり、料金体系が異なります。
従量課金プランは受信したイベントの数に基づいて動的にホストスケーリングが行われ、関数実行中のコンピューティングリソースに対して課金される、競合サービスであるAWS LambdaやGoogle Cloud Functionsと同様の課金体系となっています。一方App Service プランは1つ以上の専用VM(負荷に応じて自動スケールも可能。プランにより最大数は異なる)を割り当て、その中で関数が実行されます。関数の実行頻度などに関わらず、専用のVMの料金が時間単位で課金されます。また従量課金プランではメモリが最大1.5GB、処理時間が10分以内などの制限がありますが、App Serviceでは割り当てられたVMのリソースを最大限利用できるほか、時間の制限なく処理が可能です。
基本的には従量課金プランを利用し、関数を継続的に利用したい、従量課金プランよりも多くのリソースを利用したいなどの場合にApp Service プランを検討することになるかと思います。

Azure Functionsを利用する

まずは今回使用するAzure Functions環境の準備をします。事前に次のリソースを作成しておきます。

  • リソースグループ
  • Functions用ストレージアカウント
    • Standard (HDD)な汎用ストレージアカウント。BLOBストレージアカウントは利用できません。
  • 入出力用ストレージアカウント
    • 汎用ストレージアカウント。BLOBストレージアカウントはサポートされていません。

まずはAzureポータルの「新規」から「Function App」を探し、作成を選択します。
Function Appの新規作成

Function Appの設定が開くので、各項目を設定します。このとき、リソースグループとStorageについて、「既存のものを使用」を選び、事前に作成したリソースグループとFunctions用ストレージアカウントを選択しましょう。
Function Appの設定

「作成」を選択してしばらく待つと、デプロイが完了しFunction Appの画面が開きます。関数にマウスオーバすると現れる+を選択すると、関数が作成できます。
関数の新規作成

最初はクイックスタートの画面が表示されますが、「カスタム関数」をクリックした場合、すべてのテンプレートが確認可能です。HTTP Trigger、Timer Triggerなど様々なトリガー、言語のテンプレートが確認できますので、以降のサンプルコードを利用する場合に心に留めておいてください。
クイックスタート
関数のテンプレート

Azure FunctionsとBlob Storageの連携

Blob Triggerは非推奨

「Blob Storageにファイルオブジェクトを配置された際に、そのBlobの内容をもとに何らかの処理がしたい」場合に、すぐに考えが及ぶのはBlob Triggerです。これはAzure Functionsを実行させるトリガーの一種で、設定したBlob Storageに対してBlobが配置された際にFunctionsを実行させるものです。トリガーする対象のBlobのパスに変数を利用し、それを関数内で利用することもできます。
一見Blob StorageのBlobを処理するのに最適な方法に思えますが、Azure Functionsのドキュメントによると、Blob Storageには次のような問題があり、推奨されていません。

  1. 処理が遅延する場合がある (従量課金プランの場合)

従量課金プランで BLOB トリガーを使用していると、関数アプリがアイドル状態になったあと、新しい BLOB の処理が最大で 10 分遅延する場合があります。
関数アプリが実行されると、BLOB は直ちに処理されます。
この初期段階の遅延を避けるために、次のオプションのいずれかを検討してください。

  • 常時接続が有効な App Service プランを使用する。
  • BLOB 名を含むキュー メッセージなど、別のメカニズムを使用して BLOB 処理をトリガーする。
  1. すべてのイベントがキャプチャされるとは限らない (従量課金プラン、App Service プラン共通)

監視対象の BLOB コンテナーに 10,000 を超える BLOB が含まれる場合は、Functions ランタイムによりログ ファイルがスキャンされ、新規または変更された BLOB が監視されます。 このプロセスによって遅延が発生することがあります。 関数は、BLOB が作成されてから数分以上経過しないとトリガーされない可能性があります。 また、 ストレージ ログは “ベスト エフォート” ベースで作成されます。 すべてのイベントがキャプチャされる保証はありません。 ある条件下では、ログが欠落する可能性があります。 より高速で信頼性の高い BLOB 処理が必要な場合は、BLOB 作成時にキュー メッセージを作成することを検討してください。 次に、BLOB トリガーの代わりにキュー トリガーを使用して BLOB を処理します。 別のオプションは、Event Grid の使用です。「Event Grid を使用して、アップロードされたイメージのサイズ変更を自動化する」のチュートリアルをご覧ください。

Queue Trigger

Blob Triggerの代替メカニズムとして、Queue Storageをトリガーに利用するQueue Triggerが公式ドキュメントでは挙げられています。流れとしてはBlobの配置後にQueue Storageに処理対象のBlobのパスを含むメッセージをキューし、それをトリガーにBlob Storageのファイルを入力としてバインドすることになります。
図1

Blobのファイル内容をログ出力するサンプル関数のバインド構成、C#スクリプトコードを以下に示します。

{
  "bindings": [
    {
      "name": "myQueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "<your-queue-name>",
      "connection": "<your-storage-account>_STORAGE"
    },
    {
      "name": "myInputBlob",
      "type": "blob",
      "direction": "in",
      "path": "<your-container-name>/{queueTrigger}",
      "connection": "<your-storage-account>_STORAGE"
    }
  ],
  "disabled": false
}
using System;

public static void Run(string myQueueItem, string myInputBlob, TraceWriter log)
{
    log.Info($"C# Queue trigger function processed: {myQueueItem}");
    log.Info(myInputBlob);
}

バインドではトリガーに”myQueueItem”、入力に”myInputBlob”という名前を与えています。また、入力パスに”queueTrigger”メタデータプロパティを含むことで、キューに与えられたメッセージと同じ名前のBLOBが入力にバインドされます。これらの設定により関数内にて変数”myQueueItem”でBlobのパス、変数”myInputBlob”でそのパスのファイルの内容をそれぞれ参照できるようになります。

Queue Triggerを使う場合、Blobを配置するクライアント側でそれを考慮し、Queue StorageにBlobのパスをキューする処理を追加しなければなりません。これはWebhook Triggerなど別のイベントをトリガーに利用した場合も同様で、何らかの方法でクライアント側から関数にBlobの配置を知らせる必要があります。
またQueue Storageの制限上、順序の保証がされないこと、同じキューが2度以上イベントをトリガーする場合があることには注意が必要です。これらの保証が必要な場合、別のAzureサービスをトリガーに利用することを検討しましょう。例として、先入れ先出し(FIFO)や配信が1回であることが保証されているキュー メッセージングが利用できるAzure Service Busなどが挙げられます。ただし、公式ドキュメントにもある通り、Queue StorageとService Busキューは順序、厳密性以外の特性も考慮して選択すべきです。
最後にQueueにBlobのパスが渡されたがそのパスにBlobが存在しない場合、バインドされた入力の内容は空になります。これはBlobが存在するが内容が空の場合と挙動が同じため、これらの区別が必要な場合、注意が必要です。

Timer Trigger

スケジュールを設定して定期的(最短で1秒ごと)に関数を実行するTimer Triggerを利用して、準リアルタイムにオブジェクトの配置を検知する方法も考えられます。この方法なら、Queue Triggerなどと異なりクライアント側からBlobの配置を関数に知らせる必要がありません。
ただし、関数の実行時点でBlobが存在するか、そのパスは何かなどの情報が不定のため、Azure Functionsの統合機能で入力をバインドすることができません。このため、次のように関数のコード内にMicrosoft Azure SDKを利用したBlob Storageアクセスのコードが含まれることになります。

{
  "bindings": [
    {
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 * * * * *"
    }
  ],
  "disabled": false
}
#r "Microsoft.WindowsAzure.Storage"

using System;
using System.IO;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

public static void Run(TimerInfo myTimer, TraceWriter log)
{
    log.Info($"C# Timer trigger function executed at: {DateTime.Now}");

    string storageConnectionString = "DefaultEndpointsProtocol=https"
        + ";AccountName=<your-storage-account>"
        + ";AccountKey=<your-access-key>"
        + ";EndpointSuffix=core.windows.net";
    CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
    CloudBlobClient serviceClient = account.CreateCloudBlobClient();

    CloudBlobContainer inContainer = serviceClient.GetContainerReference("<your-container-name>");
    inContainer.CreateIfNotExistsAsync().Wait();

    foreach (IListBlobItem item in inContainer.ListBlobs(null, false))
    {
        CloudBlockBlob blob = (CloudBlockBlob)item;
        log.Info($"Block blob of length {blob.Properties.Length}: {blob.Uri}");
        log.Info(blob.DownloadTextAsync().Result);
        blob.DeleteAsync().Wait();
    }
}

例に示す関数では、変数”storageConnectionString”に記述した接続情報を用いてBLOBに接続し、その後CloudBlobContainer.ListBlobsメソッドを用いてコンテナー内の全ファイルを検索します。そして得られたそれぞれのBLOBについて内容をダウンロードしてログ出力し、その後BLOBを削除します。
(上記コードはログの内容が混ざらないよう、非同期処理を即座にWaitしています。ファイルを加工して別のコンテナーにアップロードするなどの場合はawaitを使って並列処理したほうが効率が良いでしょう)

Timerをトリガーに利用する場合のその他の注意点として、関数の処理中に次のスケジュールの関数実行が開始されてしまう可能性を考慮した関数設計にしなければなりません。同じBlobに対して処理を何度も行わないよう、Blobを削除や移動する処理を記述する必要もあります。

まとめ

今回はAzure FunctionsによってAzure Blob Storageに配置されたファイルに対して処理を行う際の注意点を紹介しました。

トリガー 利点 注意点
Blob Trigger クライアント側で追加の処理が不要
シンプルなコードで関数を実現可能
処理が遅延する場合がある (従量課金プランの場合)
すべてのイベントがキャプチャされるとは限らない (従量課金プラン、App Service プラン共通)
Queue Trigger キューメッセージ送信後、関数の実行が保証される
リアルタイムにファイル処理が可能
シンプルなコードで関数を実現可能
クライアント側から関数にBlobの配置を通知する処理が必要 (Timer以外の他のトリガーを用いた場合も同様)
順序の保証がされない (Queue Storageの特性)
同じキューが2度以上イベントをトリガーする場合がある (Queue Triggerの特性)
キューされたパスにBlobが存在しない場合と、Blobが存在するが内容が空の場合の挙動が同じ (Blob入力バインドの特性)
Timer Trigger クライアント側で追加の処理が不要
配置された全ファイルの処理が確実に可能
入力バインドの設定を利用できず、関数のコード内にMicrosoft Azure SDKを利用したBlob Storageアクセスのコードが含まれる
関数の二重実行や同じファイルが複数回処理されることを考慮した関数設計が必要
リアルタイム処理できない (最短で1秒ごと)

Azure Functionsを呼び出すためにBlob Triggerを用いると、すべてのイベントがキャプチャされるとは限らず、処理されないファイルが現れる可能性があります。これを回避するため、Queue Triggerなどを利用してファイルの配置者が配置したことを関数に通知するか、Timer Triggerなどを用いて関数にファイルを探してもらう必要があります。

サーバレスアーキテクチャを実現する競合サービス、例えばAWS Lambdaでは、オブジェクトストレージをトリガーに利用しても、ファイルが配置された際に少なくとも1回は関数がトリガーされることが保証されており、より素直な構成でファイルの処理を関数に任せることができます。Azure Functionsの、AWS LambdaやGoogle Cloud Functionsにはない特徴はやはりトリガーだけでなく入出力のバインドも統合的に管理可能なことですので、トリガーの高機能化、入出力レパートリーの拡充があるとより独自の強みが出るのではないかな、と思いました。
Microsoft Azureもまだまだ成長中ですので、今後のアップデートが見逃せませんね。

ここまでご覧いただきありがとうございました。

Be the first to comment

コメント投稿

Your email address will not be published.


*