Pluggableな非同期処理

◆ Live配信スケジュール ◆
サイオステクノロジーでは、Microsoft MVPの武井による「わかりみの深いシリーズ」など、定期的なLive配信を行っています。
⇒ 詳細スケジュールはこちらから
⇒ 見逃してしまった方はYoutubeチャンネルをご覧ください
【5/21開催】Azure OpenAI ServiceによるRAG実装ガイドを公開しました
生成AIを活用したユースケースで最も一番熱いと言われているRAGの実装ガイドを公開しました。そのガイドの紹介をおこなうイベントです!!
https://tech-lab.connpass.com/event/315703/

こんにちは、サイオステクノロジー技術部の武井です。今回は、弊社が提供するサービス「サイオスAzure課金管理サービス」(詳しくはこちら)に導入されている非同期処理の実装をご紹介したいと思います。(非同期処理の詳細については、こちらの記事「Azure Storageキューによる非同期処理」をご参考下さい)

サイオスAzure課金管理サービスの非同期処理

サイオスAzure課金 管理サービスでは、メール送信、ユーザー作成、仮想マシン作成など時間のかかるものは非同期で 行っております。メール送信、ユーザー作成、仮想マシン作成など、様々な種類の非同期処理を効率よく実装するための仕組みを今回作成しました。ポイントは以下になります。

  • 実装者に非同期処理のしくみを意識させない。
  • 後から追加した機能が、他の処理に与える影響を最小限にするために、プラガブルな実装にする。

Spring Boot及び非同期処理を実現するためのFutureフレームワークを使って実現しました。

詳しい仕組み

詳しい仕組みは以下のとおりです。

Screen Shot 2017-10-16 at 21.40.16

  1. Workerプロセスの中に複数のスレッドが起動しており、キューにメッセージが入ってきたら、取り出す。
  2. メッセージのJSONの中の「serviceId」の値より、あらかじめ「serviceA」という名前で Application ContextにDependency Injection(以降、DI)されているインスタンス(Serviceインターフェースを実装)を取り出す。
  3. 先程取得したインスタンスのexecメソッドを実行する。

実装者は、Serviceインターフェースを実装したクラスを作成し、Bean名をサービスIDとしてDIするだけで非同期処理に新しい機能を追加できます。つまり、非同期処理のメカニズム自体を意識する必要はありません。

実装方法

以下、実装方法になります。実際のコードをご紹介します。

まず、パッケージ構成は以下になります。

Screen Shot 2017-10-16 at 21.44.50

■ JsonBase.java

キューに入るJSONの中で、サービスIDの部分だけを定義しているクラスになります。メッセージのJSONの「serviceId」のフィールドは、このクラスのインスタンスにマッピングされます。他のフィールドは無視したいので@JsonIgnoreProperties(ignoreUnknown = true)のアノテーションを付与しています。

@JsonIgnoreProperties(ignoreUnknown = true)
public class JsonBase {
	
	private String serviceId;

	public String getServiceId() {
		return serviceId;
	}

	public void setServiceId(String serviceId) {
		this.serviceId = serviceId;
	}

}

■ MailJson.java

キューに入るJSONの全体を定義しているクラスになります。JsonBase.javaを継承しています。このクラスはSerivce(=キューを処理する機能)と1対1であり、実際にServiceが処理するJSONをマッピングするクラスを定義します。

public class MailJson extends JsonBase {

	private Data data;

	public Data getData() {
		return data;
	}

	public void setData(Data data) {
		this.data = data;
	}

	public class Data {
		private String subject; // メールのタイトル
		private String to; // メールの宛先
		private String from; // メールの送信元
		private String body; // メールの本文

		public String getSubject() {
			return subject;
		}

		public void setSubject(String subject) {
			this.subject = subject;
		}

		public String getTo() {
			return to;
		}

		public void setTo(String to) {
			this.to = to;
		}

		public String getFrom() {
			return from;
		}

		public void setFrom(String from) {
			this.from = from;
		}

		public String getBody() {
			return body;
		}

		public void setBody(String body) {
			this.body = body;
		}

	}
}

■ Service.java

キューを処理するクラスのインターフェースを定義します。execメソッドだけを定義しています。引数は、キューの中に入っているJSONになります。

public interface Service {
	
	/**
	 * メッセージを処理するメソッドで、引数にはメッセージの内容(JSON)を指定する
	 * @param message メッセージ
	 */
	public void exec(String message);
}

■ SendMailService.java

Service.javaを実装するクラスで、キューを処理するクラスになります。このクラスは、非同期で処理をさせたい機能単位(メール送信、仮想マシン作成、ユーザー作成)で定義します。ここでは例として、メール送信処理を例にします。

public class SendMailService implements Service {

	private static ObjectMapper objectMapper = new ObjectMapper();
	
	@Override
	public void exec(String message) {
		try {
			// ここでは、JSONの内容を標準出力に出しているだけだが、実際にはメール送信処理等、ロジックを記載する
			MailJson result = objectMapper.readValue(message,MailJson.class);
			System.out.println("ServiceId:" + result.getServiceId());
			System.out.println("Subject:" + result.getData().getSubject());
			System.out.println("From:" + result.getData().getFrom());
			System.out.println("To:" + result.getData().getTo());
			System.out.println("Body:" + result.getData().getBody());
		} catch (IOException e) {
			e.printStackTrace();
		}		
	}
}

■ WorkerServiceConfig.java

Serviceインターフェースの実装クラスをDIするためのJava Configを記載します。大切なのはBean名を定義する@Bean(name = “SendMailService”)の部分です。nameに定義する名前は、かならずJSONのserviceIdと同じ名前にします。これは、後に紹介するRetrieveQueue.javaが、JSONのserviceIdと同じ名前のBeanをApplication Contextから取得しようとするからです。

@Configuration
public class WorkerServiceConfig {

	// Bean名には、メッセージのJSONの「serviceId」にある値と同じものを指定する
	@Bean(name = "SendMailService")
	public Service getSendMailService() {

		return new SendMailService();

	}
}

■ AzureBillingManagerWorkerApplication.java

Workerプロセスのメインクラスになります。ここから先の処理はSpring Boot本家のホームページの以下のURLを参考にしています。詳しくは以下をご参照下さい。

https://spring.io/guides/gs/async-method/

AsyncConfigurerSupportを継承し、getAsyncExecutor()でスレッド処理の定義をしています。

@SpringBootApplication
@EnableAsync
@Import(WorkerServiceConfig.class)
public class AzureBillingManagerWorkerApplication extends AsyncConfigurerSupport {

	public static void main(String[] args) {
		SpringApplication.run(AzureBillingManagerWorkerApplication.class, args);
	}

	@Override
	public Executor getAsyncExecutor() {
		// スレッドプールの設定を記載する
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(2);
		executor.setMaxPoolSize(2);
		executor.setQueueCapacity(500);
		executor.setThreadNamePrefix("Thread-");
		executor.initialize();
		return executor;
	}
}

■ AppRunner.java

AzureBillingManagerWorkerApplication.javaから呼ばれて、スレッドを生成するクラスです。

@Component
public class AppRunner implements CommandLineRunner {

	private final RetrieveQueue retrieveQueue;

	public AppRunner(RetrieveQueue retrieveQueue) {
		this.retrieveQueue = retrieveQueue;
	}

	@Override
	public void run(String... arg0) throws Exception {
		// 2つのスレッドを生成している
		CompletableFuture<String> f1 = retrieveQueue.exec();
		CompletableFuture<String> f2 = retrieveQueue.exec();
		
		// スレッドが全て終了するまでメイン処理を終了しない
		while (!(f1.isDone() && f2.isDone())) {
			Thread.sleep(10);
		}
	}
}

■ RetrieveQueue.java

キューを監視し、キューにメッセージ(JSON)が入ってきたら、JSONのserviceIdに対応するBeanをApplication Contextから取得し、実行するクラスです。今回非同期で処理させたいのはこのクラスになります。非同期で処理したいメソッドに@Asyncをつけるのがポイントになります。

@Service
public class RetrieveQueue {

	private static final Logger logger = LoggerFactory.getLogger(RetrieveQueue.class);
	private static final String connection = "XXXXXX";
	private static final String queueName = "mail-sender";
	private static ObjectMapper objectMapper = new ObjectMapper();

	@Autowired
	private ApplicationContext applicationContext;

	@Async
	public CompletableFuture<String> exec() {

		try {
			// CONNECTION STRINGの情報をもとにキューの入れ物に接続する
			CloudStorageAccount storageAccount = CloudStorageAccount.parse(connection);

			// キューのクライアントを生成する
			CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

			// キュー名をセットする
			CloudQueue queue = queueClient.getQueueReference(queueName);

			// 無限ループによりキューを監視する
			while (true) {

				// キューからメッセージを取得する
				CloudQueueMessage retrievedMessage = queue.retrieveMessage();

				if (retrievedMessage != null) {
					// JSONからサービスIDを取得して、Javaオブジェクトにマップする
					JsonBase result = objectMapper.readValue(retrievedMessage.getMessageContentAsString(),
							JsonBase.class);
					
					// サービスIDに対応したBeanをApplication Contextから取得する
					com.sios.azure.billing.worker.service.Service service = (com.sios.azure.billing.worker.service.Service) applicationContext
							.getBean(result.getServiceId());
					
					// サービスに定義された処理を実行する
					service.exec(retrievedMessage.getMessageContentAsString());
					
					// キューを削除する。
					queue.deleteMessage(retrievedMessage);
				}
			}
		} catch (Exception e) {
			logger.error("Exception occurred while thread is running...", e);
		}

		return CompletableFuture.completedFuture("Thread is done!!");
	}
}

以上が、サイオスAzure課金管理サービスで実装されている非同期処理になります。今後、このサービスには様々な機能が追加されます。そういった変化に対応できるよう、柔軟な設計にすることを心がけました。

今回紹介したソースはGitHubに公開しております。

https://github.com/noriyukitakei/pluggablemq

アバター画像
About 武井 宜行 269 Articles
Microsoft MVP for Azure🌟「最新の技術を楽しくわかりやすく」をモットーにブログtech-lab.sios.jp)で情報を発信🎤得意分野はAzureによるクラウドネイティブな開発(Javaなど)💻「世界一わかりみの深いクラウドネイティブ on Azure」の動画を配信中📹 https://t.co/OMaJYb3pRN
ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。


ご覧いただきありがとうございます。
ブログの最新情報はSNSでも発信しております。
ぜひTwitterのフォロー&Facebookページにいいねをお願い致します!



>> 雑誌等の執筆依頼を受付しております。
   ご希望の方はお気軽にお問い合わせください!

Be the first to comment

Leave a Reply

Your email address will not be published.


*


質問はこちら 閉じる