こんにちは、サイオステクノロジー技術部の武井です。今回は、弊社が提供するサービス「サイオスAzure課金管理サービス」(詳しくはこちら)に導入されている非同期処理の実装をご紹介したいと思います。(非同期処理の詳細については、こちらの記事「Azure Storageキューによる非同期処理」をご参考下さい)
サイオスAzure課金管理サービスの非同期処理
サイオスAzure課金 管理サービスでは、メール送信、ユーザー作成、仮想マシン作成など時間のかかるものは非同期で 行っております。メール送信、ユーザー作成、仮想マシン作成など、様々な種類の非同期処理を効率よく実装するための仕組みを今回作成しました。ポイントは以下になります。
- 実装者に非同期処理のしくみを意識させない。
- 後から追加した機能が、他の処理に与える影響を最小限にするために、プラガブルな実装にする。
Spring Boot及び非同期処理を実現するためのFutureフレームワークを使って実現しました。
詳しい仕組み
詳しい仕組みは以下のとおりです。
- Workerプロセスの中に複数のスレッドが起動しており、キューにメッセージが入ってきたら、取り出す。
- メッセージのJSONの中の「serviceId」の値より、あらかじめ「serviceA」という名前で Application ContextにDependency Injection(以降、DI)されているインスタンス(Serviceインターフェースを実装)を取り出す。
- 先程取得したインスタンスのexecメソッドを実行する。
実装者は、Serviceインターフェースを実装したクラスを作成し、Bean名をサービスIDとしてDIするだけで非同期処理に新しい機能を追加できます。つまり、非同期処理のメカニズム自体を意識する必要はありません。
実装方法
以下、実装方法になります。実際のコードをご紹介します。
まず、パッケージ構成は以下になります。
■ JsonBase.java
キューに入るJSONの中で、サービスIDの部分だけを定義しているクラスになります。メッセージのJSONの「serviceId」のフィールドは、このクラスのインスタンスにマッピングされます。他のフィールドは無視したいので@JsonIgnoreProperties(ignoreUnknown = true)のアノテーションを付与しています。
@JsonIgnoreProperties(ignoreUnknown = true) public class JsonBase { private String serviceId; public String getServiceId() { return serviceId; } public void setServiceId(String serviceId) { this.serviceId = serviceId; } }
■ MailJson.java
キューに入るJSONの全体を定義しているクラスになります。JsonBase.javaを継承しています。このクラスはSerivce(=キューを処理する機能)と1対1であり、実際にServiceが処理するJSONをマッピングするクラスを定義します。
public class MailJson extends JsonBase { private Data data; public Data getData() { return data; } public void setData(Data data) { this.data = data; } public class Data { private String subject; // メールのタイトル private String to; // メールの宛先 private String from; // メールの送信元 private String body; // メールの本文 public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } }
■ Service.java
キューを処理するクラスのインターフェースを定義します。execメソッドだけを定義しています。引数は、キューの中に入っているJSONになります。
public interface Service { /** * メッセージを処理するメソッドで、引数にはメッセージの内容(JSON)を指定する * @param message メッセージ */ public void exec(String message); }
■ SendMailService.java
Service.javaを実装するクラスで、キューを処理するクラスになります。このクラスは、非同期で処理をさせたい機能単位(メール送信、仮想マシン作成、ユーザー作成)で定義します。ここでは例として、メール送信処理を例にします。
public class SendMailService implements Service { private static ObjectMapper objectMapper = new ObjectMapper(); @Override public void exec(String message) { try { // ここでは、JSONの内容を標準出力に出しているだけだが、実際にはメール送信処理等、ロジックを記載する MailJson result = objectMapper.readValue(message,MailJson.class); System.out.println("ServiceId:" + result.getServiceId()); System.out.println("Subject:" + result.getData().getSubject()); System.out.println("From:" + result.getData().getFrom()); System.out.println("To:" + result.getData().getTo()); System.out.println("Body:" + result.getData().getBody()); } catch (IOException e) { e.printStackTrace(); } } }
■ WorkerServiceConfig.java
Serviceインターフェースの実装クラスをDIするためのJava Configを記載します。大切なのはBean名を定義する@Bean(name = “SendMailService”)の部分です。nameに定義する名前は、かならずJSONのserviceIdと同じ名前にします。これは、後に紹介するRetrieveQueue.javaが、JSONのserviceIdと同じ名前のBeanをApplication Contextから取得しようとするからです。
@Configuration public class WorkerServiceConfig { // Bean名には、メッセージのJSONの「serviceId」にある値と同じものを指定する @Bean(name = "SendMailService") public Service getSendMailService() { return new SendMailService(); } }
■ AzureBillingManagerWorkerApplication.java
Workerプロセスのメインクラスになります。ここから先の処理はSpring Boot本家のホームページの以下のURLを参考にしています。詳しくは以下をご参照下さい。
https://spring.io/guides/gs/async-method/
AsyncConfigurerSupportを継承し、getAsyncExecutor()でスレッド処理の定義をしています。
@SpringBootApplication @EnableAsync @Import(WorkerServiceConfig.class) public class AzureBillingManagerWorkerApplication extends AsyncConfigurerSupport { public static void main(String[] args) { SpringApplication.run(AzureBillingManagerWorkerApplication.class, args); } @Override public Executor getAsyncExecutor() { // スレッドプールの設定を記載する ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("Thread-"); executor.initialize(); return executor; } }
■ AppRunner.java
AzureBillingManagerWorkerApplication.javaから呼ばれて、スレッドを生成するクラスです。
@Component public class AppRunner implements CommandLineRunner { private final RetrieveQueue retrieveQueue; public AppRunner(RetrieveQueue retrieveQueue) { this.retrieveQueue = retrieveQueue; } @Override public void run(String... arg0) throws Exception { // 2つのスレッドを生成している CompletableFuture<String> f1 = retrieveQueue.exec(); CompletableFuture<String> f2 = retrieveQueue.exec(); // スレッドが全て終了するまでメイン処理を終了しない while (!(f1.isDone() && f2.isDone())) { Thread.sleep(10); } } }
■ RetrieveQueue.java
キューを監視し、キューにメッセージ(JSON)が入ってきたら、JSONのserviceIdに対応するBeanをApplication Contextから取得し、実行するクラスです。今回非同期で処理させたいのはこのクラスになります。非同期で処理したいメソッドに@Asyncをつけるのがポイントになります。
@Service public class RetrieveQueue { private static final Logger logger = LoggerFactory.getLogger(RetrieveQueue.class); private static final String connection = "XXXXXX"; private static final String queueName = "mail-sender"; private static ObjectMapper objectMapper = new ObjectMapper(); @Autowired private ApplicationContext applicationContext; @Async public CompletableFuture<String> exec() { try { // CONNECTION STRINGの情報をもとにキューの入れ物に接続する CloudStorageAccount storageAccount = CloudStorageAccount.parse(connection); // キューのクライアントを生成する CloudQueueClient queueClient = storageAccount.createCloudQueueClient(); // キュー名をセットする CloudQueue queue = queueClient.getQueueReference(queueName); // 無限ループによりキューを監視する while (true) { // キューからメッセージを取得する CloudQueueMessage retrievedMessage = queue.retrieveMessage(); if (retrievedMessage != null) { // JSONからサービスIDを取得して、Javaオブジェクトにマップする JsonBase result = objectMapper.readValue(retrievedMessage.getMessageContentAsString(), JsonBase.class); // サービスIDに対応したBeanをApplication Contextから取得する com.sios.azure.billing.worker.service.Service service = (com.sios.azure.billing.worker.service.Service) applicationContext .getBean(result.getServiceId()); // サービスに定義された処理を実行する service.exec(retrievedMessage.getMessageContentAsString()); // キューを削除する。 queue.deleteMessage(retrievedMessage); } } } catch (Exception e) { logger.error("Exception occurred while thread is running...", e); } return CompletableFuture.completedFuture("Thread is done!!"); } }
以上が、サイオスAzure課金管理サービスで実装されている非同期処理になります。今後、このサービスには様々な機能が追加されます。そういった変化に対応できるよう、柔軟な設計にすることを心がけました。
今回紹介したソースはGitHubに公開しております。