短期的・長期的リトライによってAzureによるメッセージキューイング時のレジリエンスを高める

みなさん、こんにちは。サイオステクノロジー武井です。

今回は表題のとおりなのですが、短期的・長期的リトライによってAzureによるメッセージキューイング時のレジリエンスを高めるということについて考えてみましたので、一筆したためたいと思います。

自己復旧の重要性

昨今、安全性の高いローカル通信や閉じられたネットワークでの通信が減少し、インターネットなどのパブリックネットワークを通じた通信が増えています。実際、最近のシステムでは、多くの場合、SaaSサービスのAPIを呼び出して情報を取得しています。

このようなリモートネットワークを通じた通信は、ネットワークの遅延、障害、SaaSサービスのダウンタイム、名前解決の問題など、さまざまな要因で失敗する可能性があります。そのため、システムを設計する際は、障害が発生する可能性を前提とし、高い回復性を持たせる必要があります。このことはAzureアプリケーションの設計原則の一つ「自己修復機能の設計」にも書かれています。

https://learn.microsoft.com/ja-jp/azure/architecture/guide/design-principles/self-healing

そのようなシステム設計で特に重要かつ効果的な戦略が、「リトライ(再試行)」です。

リトライについて

「リトライ」は多くの方がご存知かとは思いますが、何らかの処理に失敗したら、その失敗を検知して、システムが自動的に処理を再試行する仕組みです。これにより、一時的なエラーや障害を乗り越えてシステムが自動的に回復する能力が高まります。マイクロソフトが公開している設計パターンにも、「再試行パターン」として定義されています。

https://learn.microsoft.com/ja-jp/azure/architecture/patterns/retry

そしてリトライには「短期的なリトライ」「長期的なリトライ」の2種類があります。

  • 短期的なリトライ

一時的なエラー(例えば、ネットワークの一瞬の断絶や一時的なサーバーダウン)を乗り越えるために、すぐに再試行する方法です。瞬時に問題が解消することが多いため、このアプローチをとります。さらに短期手なリトライにはいくつか手法があり、固定間隔での再試行や、失敗を繰り返す度に再試行間隔を指数的に伸ばすバックオフポテンシャルを用いる方法(初回は2秒後、次回は4秒後、さらに次回は8秒後)などがあり、これにより一時的なエラーや障害からシステムの自己回復性を高めることができます。

  • 長期的なリトライ

深刻な問題や長期的な障害(例えば、大規模なサービス障害やハードウェアの故障)が原因で操作が失敗した場合に考慮されます。すぐには解決しない問題に対処するため、再試行する間隔を長く設定して慎重にアプローチします。

また、短期的なリトライと長期的なリトライを適切に組み合わせることで、システムの自己回復力を向上させることができます。具体的には、まず短期的なリトライを試み、それが失敗した場合に長期的なリトライに移行し、さらにその後に再び短期的なリトライを試みるというサイクルを繰り返します。

今回は、この仕組みをAzure Service Busを用いたメッセージキューイングに活かしてみたいと思います。

Azure Service Busで高度なリトライ!!

ということで、Azure Service Busによるメッセージキューイング処理に、この短期的なリトライと長期的なリトライを適用してみたいと思います。

短期的なリトライはシンプルです。例外やエラーが発生した場合、一定の間隔ないしはバックオフポテンシャルによって再試行を行います。リトライ処理を助けるための多くのモジュールが存在しますが、今回はresilience4jを採用します。resilience4jは、Javaでの障害に強いアプリケーションを実装するためのライブラリで、タイムアウト、リトライ、サーキットブレーカーなどの機能を提供します。これを用いることで、リトライ処理の実装が容易になります。

長期的なリトライでは、Azure Service Busのスケジュール配信機能を活用します。この機能は、メッセージを即時に送信するのではなく、指定した未来の時点で配信することができる機能です。つまり、短期的なリトライが失敗した場合、例えば数時間後や翌日など特定の時刻にメッセージを再送するようスケジュールを設定することができます。スケジュールされたメッセージは、設定した時刻になると自動的にキューまたはトピックに配信され、その後の処理で再度、短期的なリトライが行われるという流れになります。

以降はソースコードを交えて実例をご紹介していきたいと思います。その前に、Azure Service Busの主要な2つの機能について触れておきます。それは、標準的なメッセージキューとPub/Sub(Publish/Subscribe)モデルです。これらは異なる目的と特性を持ち、それに応じて先程紹介したリトライの実装方法も変わります。従って、これらの機能を別々に紹介し、ソースコードを交えてその実装のイメージを共有します。

ちなみにPub/Sub(Publish/Subscribe)モデルとは、メッセージの発行者(Publisher)と受信者(Subscriber)が多対多の関係を持つ通信パターンです。発行者はトピックにメッセージを公開(Publish)し、それを購読している受信者(Subscriber)がそのメッセージを受け取ります。このモデルの利点は、発行者と受信者がお互いに独立していることで、スケーラビリティや柔軟性が高いという特徴があります。Azure Service BusのPub/Subモデルは、この原理を基にしており、大規模な分散システムやマイクロサービスアーキテクチャでのメッセージ通信に適しています。

標準的なメッセージキューの場合

まずは標準的なメッセージキューの場合について説明します。以下のような構成図になります。

 

処理の流れは以下のようになります。

  1. メッセージ送信: あるアプリケーションからAzure Service Busのキューにメッセージを送信します。このとき、リトライ回数を表すカスタムプロパティを付与し、値を0とします。

  2. メッセージ受信: 上記キューに送信されたメッセージは、Azure Functionsによってキュートリガーで受信されます。

  3. 業務処理: Azure Functionsが受け取ったメッセージを基に特定のサービスに対して処理を開始します。しかし、このケースではresilience4jを用いたバックオフポテンシャルのリトライ戦略が適用され、結果として短期的なリトライが失敗します。

  4. スケジュール配信: 上記での短期的なリトライ失敗を受け、長期的な障害の可能性を考慮して長期的なリトライ戦略に移行します。具体的には、キューにメッセージを30分後に配信するようスケジュールします。また、リトライ回数を表すカスタムプロパティに1を足します。これにより、メッセージは30分後に再びキューに現れ、メッセージ受信者によって再処理されることで、短期・長期の両リトライ戦略が実現されます。

  5. 配信不能キューにメッセージ送信: 2〜4を繰り返し、長期的なリトライ回数(=リトライ回数のカスタムプロパティの値)が既定値を超えた場合、配信不能キューにメッセージを送信します。

  6. 配信不能キューを監視: Azure Monitorが配信不能キューを監視します。

  7. システム管理者に通知: Azure Monitorが配信不能キューのメッセージを検知したら、その内容をシステム管理者に通知します。

以下は、上記を実現するためのメッセージ受信者(Azure Functions)のソースコードになります。

package com.functions;

import com.microsoft.azure.functions.annotation.*;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.vavr.control.Try;

import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.microsoft.azure.functions.*;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

public class ServiceBusQueueTriggerFunction {

    @FunctionName("ServiceBusQueueTrigger")
    public void serviceBusQueueTrigger(
        @ServiceBusQueueTrigger(name = "message", queueName = "serivcea", connection = "AzureWebJobsMyServiceBus") String message,
        @BindingName("UserProperties") Map<String, Object> properties,
        final ExecutionContext context
    ) {

        try {
            // rejilliency4jで、バックオフポテンシャルによる短期的なリトライ処理を行う
            // 最初は1秒後にリトライ、2回目は2秒後にリトライ、3回目は4秒後にリトライをする。
            IntervalFunction intervalFunction = IntervalFunction
                    .ofExponentialBackoff(Duration.ofMillis(1000), 2.0); // 初回は1秒、倍率は2とする

            RetryConfig config = RetryConfig.custom()
                    .maxAttempts(3) // 最大試行回数は3回とする
                    .intervalFunction(intervalFunction) // バックオフを設定する
                    .build();

            Retry retry = Retry.of("myRetry", config);

            Try result = Try.ofSupplier(Retry.decorateSupplier(retry, () -> {
                System.out.println("なんかの処理");
                // 意図的に例外を発生させて短期的なリトライ処理を行う
                throw new RuntimeException("Exception");
            }));

            // 短期的なリトライ処理が失敗した場合は例外を発生させて、長期的なリトライ処理に移行する
            if (result.isFailure())
                throw new RuntimeException("Exception");

        } catch (Exception e) {
            // 短期的なリトライに失敗した場合は、長期的なリトライ処理を行う
            // 取得したメッセージのカスタムプロパティからリトライを行った回数を取得する
            int retryCount = ((Double) properties.get("retryCount")).intValue();

            // 長期的なリトライ回数が3回を超えた場合は、意図的に例外を発生させて
            // Dead Letter Queueにメッセージを送信する
            if (retryCount > 3) {
                throw new RuntimeException("Exception");
            }

            // 長期的なリトライを行うために、メッセージをスケジュール配信する
            ServiceBusMessage retryMessage = new ServiceBusMessage(message);

            // リトライ回数を増やしてカスタムプロパティに設定する
            retryCount++;
            retryMessage.getApplicationProperties().put("retryCount", retryCount);

            // 30分後にメッセージがスケジュール配信されるようにする
            OffsetDateTime scheduledEnqueueTime = OffsetDateTime.now(ZoneOffset.UTC).plus(Duration.ofMinutes(30));

            DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                    .build();

            ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                    .fullyQualifiedNamespace("sbq-accountprovisioning.servicebus.windows.net")
                    .credential(credential)
                    .sender()
                    .queueName("sbq-serivcea")
                    .buildClient();

            senderClient.scheduleMessage(retryMessage, scheduledEnqueueTime);
        }
    }
}

Pub/Sub(Publish/Subscribe)モデルの場合

pub/subモデルの場合は以下のような構成図になります。メッセージ受信者には、標準的なメッセージキューの場合と同様にAzure Functions(Java)を用いて、Azure Service Busのトピックトリガーを使います。

 

pub/subモデルの場合の処理の流れは以下のようになります。

  1. メッセージ送信: あるアプリケーションからAzure Service Busのトピックにメッセージを送信します。送信時にメッセージに以下のカスタムプロパティを設定します。

    • サービスA: true
    • サービスB: true
    • リトライ回数: 0
  2. メッセージ受信: 送信されたメッセージは各サブスクリプションに配信され、Azure Functionsがトピックトリガーを使用してこれらのメッセージを受信します。Azure Service Busのフィルター機能を活用し、サービスのカスタムプロパティがtrueの場合のみメッセージがサブスクリプションに配信されるように設定します。

    • サブスクリプションA: サービスAのプロパティがtrueの場合のみ受信
    • サブスクリプションB: サービスBのプロパティがtrueの場合のみ受信
  3. 業務処理: Azure Functionsが受け取ったメッセージに基づいてサービスで処理を実施します。この例では、サブスクリプションAのメッセージは正常に処理されますが、サービスBでは処理が失敗し、resilience4jを用いた短期的なリトライが行われた上、そのリトライにも失敗したとします。

  4. スケジュール配信: 短期的なリトライが失敗したため、長期的なリトライ戦略に移行され、30分後の配信を目指してメッセージをスケジュールします。この際、サービスAのカスタムプロパティをfalseに設定します。なぜならば、既にサブスクリプションAの処理は成功しており、再配信すると重複処理が発生する可能性があるためです。結果として、30分後にサブスクリプションBのみにメッセージが配信され、再処理が行われることとなります。また、リトライ回数を表すカスタムプロパティに1を足します。

  5. 配信不能キューにメッセージ送信: 2〜4を繰り返し、長期的なリトライ回数(=リトライ回数のカスタムプロパティの値)が既定値を超えた場合、配信不能キューにメッセージを送信します。

  6. 配信不能キューを監視: Azure Monitorが配信不能キューを監視します。

  7. システム管理者に通知: Azure Monitorが配信不能キューのメッセージを検知したら、その内容をシステム管理者に通知します。

以下は、上記を実現するためのメッセージ受信者B(Azure Functions)のソースコードになります。

package com.functions;

import com.microsoft.azure.functions.annotation.*;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.vavr.control.Try;

import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.microsoft.azure.functions.*;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

public class ServiceBusTopicTriggerFunction {

    @FunctionName("ServiceBusTopicTrigger")
    public void serviceBusTopicTrigger(
            @ServiceBusTopicTrigger(name = "message", topicName = "sbt-service", subscriptionName = "sbts-serviceb", connection = "AzureWebJobsMyServiceBus") String message,
            @BindingName("UserProperties") Map<String, Object> properties,
            final ExecutionContext context) {

        try {

            // rejilliency4jで、バックオフポテンシャルによる短期的なリトライ処理を行う
            // 最初は1秒後にリトライ、2回目は2秒後にリトライ、3回目は4秒後にリトライをする。
            IntervalFunction intervalFunction = IntervalFunction
                    .ofExponentialBackoff(Duration.ofMillis(1000), 2.0); // 初回は1秒、倍率は2とする

            RetryConfig config = RetryConfig.custom()
                    .maxAttempts(3) // 最大試行回数は3回とする
                    .intervalFunction(intervalFunction) // バックオフを設定する
                    .build();

            Retry retry = Retry.of("myRetry", config);

            Try result = Try.ofSupplier(Retry.decorateSupplier(retry, () -> {
                System.out.println("なんかの処理");
                // 意図的に例外を発生させて短期的なリトライ処理を行う
                throw new RuntimeException("Exception");
            }));

            // 短期的なリトライ処理が失敗した場合は例外を発生させて、長期的なリトライ処理に移行する
            if (result.isFailure())
                throw new RuntimeException("Exception");

        } catch (Exception e) {
            // 短期的なリトライに失敗した場合は、長期的なリトライ処理を行う
            // 取得したメッセージのカスタムプロパティからリトライを行った回数を取得する
            int retryCount = ((Double) properties.get("retryCount")).intValue();

            // 長期的なリトライ回数が3回を超えた場合は、意図的に例外を発生させて
            // Dead Letter Queueにメッセージを送信する
            if (retryCount > 3) {
                throw new RuntimeException("Exception");
            }

            // 長期的なリトライを行うために、メッセージをスケジュール配信する
            ServiceBusMessage retryMessage = new ServiceBusMessage(message);

            // 特定のサブスクリプションにだけメッセージを配信させるためのカスタムプロパティを設定する
            // このカスタムプロパティが設定差れている場合は、そのサブスクリプションにだけメッセージが配信される
            retryMessage.getApplicationProperties().put("servicea", false);
            retryMessage.getApplicationProperties().put("serviceb", true);

            // リトライ回数を増やしてカスタムプロパティに設定する
            retryCount++;
            retryMessage.getApplicationProperties().put("retryCount", retryCount);

            // 30分後にメッセージがスケジュール配信されるようにする
            OffsetDateTime scheduledEnqueueTime = OffsetDateTime.now(ZoneOffset.UTC).plus(Duration.ofMinutes(30));

            DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                    .build();

            ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                    .fullyQualifiedNamespace("sbq-accountprovisioning.servicebus.windows.net")
                    .credential(credential)
                    .sender()
                    .topicName("sbt-accountprovisioning")
                    .buildClient();

            senderClient.scheduleMessage(retryMessage, scheduledEnqueueTime);
        }

    }

}

まとめ

いかがでしたでしょうか?システムの自己回復性の重要性、またそれを実現するために効果的なリトライ戦略が欠かせないことをお伝えしたく、一筆したためました。このブログが皆さんのお役に立てたら幸いです。

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です