RabbitMQによる非同期処理

こんにちは。サイオステクノロジー武井です。今回は、非同期処理のためのメッセージキューイングに使われる「RabbitMQ」というオープンソースソフトウェアについてお話したいと思います。

非同期処理とは?

まず非同期処理について、説明します。ユーザー登録やメール送信処理を実現するための一般的なフローは以下の通りと思います。

  1. ユーザーがブラウザから情報を入力する
  2. Webアプリケーションは、入力した情報に基づき、ユーザー登録やメール送信処理などのを行う。
  3. ユーザーのブラウザに処理完了画面が表示される。

ユーザーは2の処理が完了して、完了画面が表示されるまで、待つことになります。しかし、一般的にユーザー登録やメール送信処理などは、ユーザーがその処理結果をすぐに必要なものではありません。

ユーザー登録したからと言って、すぐにそのユーザーが利用することはないと思います。ユーザーへのIDやパスワードの通知などの業務処理がありますから、ある程度処理が遅延しても構いません。極端なことを言えば、ユーザー登録処理を開始してから完了するまで、数時間かかっても、業務上支障がないケースはあります。メールにしても同じと思います。

すぐに結果が必要のない、遅延しても良い処理のために、ユーザーがそのレスポンスをずっとブラウザの前で待ち続けるのは時間のムダです。

Screen Shot 2017-10-06 at 16.56.51

そこで、非同期処理の出番になります。

今までの非同期処理

今までの非同期処理は、ユーザーからリクエストがあったら、別プロセスを起動させ、そのプロセスに別の処理を行わせるものでした。下記のようなフローになります。

Screen Shot 2017-10-06 at 19.22.00

この方法であれば、ユーザーの入力をWebアプリケーションが受け取ると、別のプロセスを起動して、そのプロセスに処理を任せるので、ユーザーにはすぐに完了画面を返すことができます。

しかし、この方法には最大の欠点があります。それは、ユーザーの入力があるたびにプロセスが起動するので、サーバーが高負荷になるという点です。昨今、サーバーリソースは全てクラウド上でまかなう風潮がありますが、あまりに高負荷となると、リソース増強を余儀なくされ、利用料金にも跳ね返って来ます。

そこで、最近の非同期処理は別の形を取っています。

最近の非同期処理

最近の非同期処理は、メッセージ・キューというものを利用します。Webアプリケーションはユーザーからの入力を受け取ると、そのままその情報をメッセージキューに格納します。

Webアプリケーションとは別のプロセスで、Workerプロセスというものが起動しており、常時キューを監視し、キューに情報が入ってきたら、順次処理をして、ユーザーに結果を返します。

Screen Shot 2017-10-06 at 17.27.05

この方法であれば、起動しているプロセスは常にWorkerプロセス1つですので、サーバーのリソースを浪費しません。また、Webアプリケーションが、キューを登録するのも一瞬で終わるので、ユーザーの待ち時間は短くなります。

RabbitMQとは?

メッセージキューイング処理を行うことができるオープンソースソフトウェアです。AMQP(Advanced Message Queuing Protocol)という、メーッセージキューイングのためのオープンな仕様をベースに作成されているため、非常に多くのプラットフォームで扱うことが出来ます。

RabbitMQには下記の登場人物がいます。

Producer
キューを投げる人です。先の図で言えば、Webアプリケーションに相当します。

Consumer
キューを受け取る人です。先の図で言えば、Workerプロセスに相当します。

Message
メッセージです。ProducerやConsumerの間でやり取りされる本文になります。

Queue
キューです。メッセージのいれものです。このいれものにConsumerがアクセスして、Queueの中にあるMessageを取り出します。

Exchange
これは、RabbitMQ(AMQP?)独特の考え方なのですが、ProducerがMessageをRabbitMQ Serverに投げると、まずExchangeが受け取って、Messageに付与されているrouting keyと、Queueに付与されているbinding keyを照合して、適切なQueueに配送します。RabbitMQではProducerは、QueueにMessageを配送するのではなく、Exchangeに配送するのです。イメージは以下のような感じになります。

Screen Shot 2018-02-27 at 20.16.13

Queue1にはorange、Queue2にはappleというbinding keyが付与されています。

  1. Producerは、それぞれorange、appleというrouting keyが付与されているMessageをRabbitMQ Serverに投げます。
  2. Exchangeはキューを受け取り、Messageに付与されているrouting key、Queueに付与されているbinding keyを見て、routing keyに一致するbinding keyが付与されたQueueにMessageを配送します。つまり、Message1はQueue1に、Message2はQueue2に配送されます。
  3. Consumer1及びConsumer2はそれぞれQueue1及びQueue2からMessage1、Message2を受け取ります。

Exchangeの種類

Exchangeの種類(Queueへの配送ルール)にはいくつかあります。

direct
routing keyとbinding keyが一致したMessageだけQueueに配送します。先程、図に示したものが、まさにdirectになります。

fanout
binding keyを無視して全てのQueueにMessageを配送します。まさにブロードキャストです。

topic
routing keyとbinding keyを比較する際、部分一致を許可します。SQLのlike文みたいなものです。like文は%しか使えませんが、RabbitMQは2つの特殊文字が使えます。

*:1つの単語を置き換える

#:0個以上の単語を置き換える

また、部分一致する場合は、ピリオドで区切った単位で行います。言葉ではわかりにくいので、図にしてみました。

Screen Shot 2018-02-27 at 20.47.20

上記のような場合を想定します。

  • Queue1のbinding key:*.apple.*
  • Queue2のbinding key:fast.#

■ Message1の場合

Message1はQueue1とQueue2の両方に届きます。理由は以下のとおりです。

Screen Shot 2018-02-27 at 21.48.29


Screen Shot 2018-02-27 at 21.49.34

 

■ Message2の場合

Message2はQueue1のみにしか届きません。理由は以下のとおりです。

Screen Shot 2018-02-27 at 21.52.30


Screen Shot 2018-02-27 at 21.59.44

header

Messageのヘッダー情報をもとに、配送するQueueを選択します。

RabbitMQのインストール

前置き長くてすみません。やっとRabbitMQのインストールです。

epelリポジトリをインストールします。

# rpm -Uvh https://download.fedoraproject.org/pub/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm

RabbitMQの記述言語erlangをインストールします。

# yum install erlang

RabbitMQをインストールします。

# yum install rabbitmq-server

RabbitMQを起動します。

# systemctl start rabbitmq-server

RabbitMQの管理画面にアクセスするためのプラグインを有効にします。

# rabbitmq-plugins enable rabbitmq_management

以下のURLでRabbitMQの管理画面にアクセスできます。
https://[RabbitMQ ServerのFQDN]:15672/

シンプルな実装例

ここからはRabbitMQを用いた実装例をいくつか示してみたいと思います。Queueが1つしかなく、Producer、Consumerもそれぞれ一つの場合です。
Screen Shot 2018-02-27 at 22.18.17

RabbitMQのライブラリをインストールします。Mavenを使っている前提とします。pom.xmlに以下の内容を追記して下さい。

    <dependency>
      <groupid>com.rabbitmq</groupid>
      <artifactid>amqp-client</artifactid>
      <version>5.1.2</version>
    </dependency>

まず、Producer側を実装します。

public class Producer {

	private final static String QUEUE_NAME = "Queue";

	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory(); // (1)
		connectionFactory.setUsername("guest"); // (2)
		connectionFactory.setPassword("guest"); // (3)
		connectionFactory.setHost("localhost"); // (4)

		Connection connection = connectionFactory.newConnection(); // (5)
		Channel channel = connection.createChannel(); // (6)
		channel.queueDeclare(QUEUE_NAME, true, false, false, null); // (7)
		
		String message1 = "Hello";
		channel.basicPublish("", QUEUE_NAME, null, message1.getBytes()); // (8)

		channel.close(); // (9)
		connection.close(); // (10)
	}

}

(1)は、RabbitMQ Serverへのコネクションを生成するためのFactory Instanceを生成しています。

(2)は、RabbitMQへ接続するためのユーザー名を定義しています。

(3)は、RabbitMQへ接続するためのパスワードを定義しています。

(4)は、RabbitMQの接続先ホスト名を定義しています。

(5)は、RabbitMQへのコネクションを生成しています。

(6)は、RabbitMQへ接続するためのチャンネルを生成しています。

(7)は、Queueを生成しています。主な引数としては、第1引数はQueueの名前、第2引数をtrueにすると、RabbitMQを再起動しても、Queueの中身が消えないようになります。falseだと消えてしまいます。

(8)は、MessageをRabbitMQへ送る処理です。第1引数はExchange名、第2引数はrouting keyとなります。あれ?Exchange名が空ですね。しかもrouting keyがQueueの名前になっています。これは、RabbitMQはDefault Exchangeというのが定義されており、Exchange名を空にすると、このDefault Exchangeが使われます。RabbitMQの管理画面には以下のような解説がありました。

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It it not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

Default Exchangeでは、routing keyにQueueの名前を指定すると、その名前のQueueにMessageが配送されるようです。

(9)は、チャンネルを閉じています。

(10)は、コネクションを閉じています。

 

次に、Consumer側を実装します。ほぼほぼProducerと似通っています。

public class ConsumerSimple {

	private final static String QUEUE_NAME = "Queue";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory(); // (1)
		connectionFactory.setUsername("guest"); // (2)
		connectionFactory.setPassword("guest"); // (3)
		connectionFactory.setHost("localhost"); // (4)

		Connection connection = connectionFactory.newConnection(); // (5)

		Channel channel = connection.createChannel(); // (6)

		channel.queueDeclare(QUEUE_NAME, true, false, false, null); // (7)

		Consumer consumer = new DefaultConsumer(channel) { // (8)
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException { // (9)
				String message = new String(body, "UTF-8");
				System.out.println("Received '" + message + "'");
			}
		};
		channel.basicConsume(QUEUE_NAME, true, consumer); // (10)
	}
}

 

(1)は、RabbitMQ Serverへのコネクションを生成するためのFactory Instanceを生成しています。

(2)は、RabbitMQへ接続するためのユーザー名を定義しています。

(3)は、RabbitMQへ接続するためのパスワードを定義しています。

(4)は、RabbitMQの接続先ホスト名を定義しています。

(5)は、RabbitMQへのコネクションを生成しています。

(6)は、RabbitMQへ接続するためのチャンネルを生成しています。

(7)は、Queueを生成しています。主な引数としては、第1引数はQueueの名前、第2引数をtrueにすると、RabbitMQを再起動しても、Queueの中身が消えないようになります。falseだと消えてしまいます。

(8)は、Consumerのインスタンスを生成しています。

(9)は、ConsumerクラスのhandleDeliveryメソッドをオーバーライドしており、このメソッドはConsumerがMessageを受信した場合の処理を実装します。

(10)は、指定したQueueでMessageが受信するのを待ち受けます。

Consumerを起動してから、Producerを起動すると、Consoleに

Hello

と表示されると思います。

ちょっと複雑な実装例

先の例に上げた以下のパターンを実装してみたいと思います。

Screen Shot 2018-02-27 at 20.16.13

まず、Producer側を実装します。「シンプルな実装例」と同じ処理は割愛します。

public class Producer {

	private final static String QUEUE_NAME1 = "Queue1";
	private final static String QUEUE_NAME2 = "Queue2";
	private final static String EXCHANGE_NAME1 = "ex1";
	private final static String EXCHANGE_NAME2 = "ex2";

	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		connectionFactory.setHost("localhost");

		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // (1)
		channel.queueDeclare(QUEUE_NAME2, true, false, false, null); // (2)

		channel.exchangeDeclare(EXCHANGE_NAME1, "direct"); // (3)
		channel.exchangeDeclare(EXCHANGE_NAME2, "direct"); // (4)
		
		channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME1, "orange"); // (5)
		channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME2, "apple"); // (6)

		String message1 = "Hello1";
		String message2 = "Hello2";
		channel.basicPublish(EXCHANGE_NAME1, "orange", null, message1.getBytes()); // (7)
		channel.basicPublish(EXCHANGE_NAME2, "apple", null, message2.getBytes()); // (8)

		channel.close();
		connection.close();
	}

}

(1)は、Queue1を作成しています。

(2)は、Queue2を作成しています。

(3)は、binding keyがorangeの場合のExchangeを作成しています。種別はdirectとしています。

(4)は、binding keyがappleの場合のExchangeを作成しています。種別はdirectとしています。

(5)は、Queueとbinding keyを紐付けています。Queue1とbinding keyのorangeを紐付けて、routing keyがorangeのメッセージが来たら、Queue1に配信されるようにしています。

(6)は、Queueとbinding keyを紐付けています。Queue12とbinding keyのappleを紐付けて、routing keyがappleのメッセージが来たら、Queue2に配信されるようにしています。

(7)は、ex1のExchangeに、routing keyがorangeのメッセージを送っています。

(8)は、ex2のExchangeに、routing keyがappleのメッセージを送っています。

 

次にCosumer1を実装します。

public class Consumer1 {

	private final static String QUEUE_NAME1 = "Queue1";
	private final static String EXCHANGE_NAME1 = "ex1";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		connectionFactory.setHost("localhost");
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // (1)

		channel.exchangeDeclare(EXCHANGE_NAME1, "direct"); // (2)
		
		channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME1, "orange"); // (3)

		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received '" + message + "'");
			}
		};
		channel.basicConsume(QUEUE_NAME1, true, consumer); // (4)
	}
}

(1)は、Queue1という名前のQueueを作成しています。これは、Producer側にも同様の処理がありましたが、Producer側、Consumer側どちらが先に起動しても、きちんとQueueに入るように、Producer側、Consumer側両方にQueueを作成する処理を入れています。以降、解説するExchangeやbindの処理についても同様です。

(2)は、ex1という名前のExchangeを作成しています。

(3)は、Queueとbinding keyを紐付けています。Queue1とbinding keyのorangeを紐付けて、routing keyがorangeのメッセージが来たら、Queue1に配信されるようにしています(Producer側の処理と同じです)。

(4)は、指定したQueue1という名前のQueueでMessageが受信するのを待ち受けます。

 

同様にConsumer2も実装します。Consumer1とほとんど同じです。

public class Consumer2 {

	private final static String QUEUE_NAME2 = "Queue2";
	private final static String EXCHANGE_NAME2 = "ex2";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		connectionFactory.setHost("localhost");
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME2, true, false, false, null); // (1)

		channel.exchangeDeclare(EXCHANGE_NAME2, "direct"); // (2)
		
		channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME2, "apple"); // (3)

		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received '" + message + "'");
			}
		};
		channel.basicConsume(QUEUE_NAME2, true, consumer); // (4)
	}
}

(1)は、Queue2という名前のQueueを作成しています。

(2)は、ex2という名前のExchangeを作成しています。

(3)は、Queueとbinding keyを紐付けています。Queue2とbinding keyのappleを紐付けて、routing keyがorangeのメッセージが来たら、Queue2に配信されるようにしています(Producer側の処理と同じです)。

(4)は、指定したQueue2という名前のQueueでMessageが受信するのを待ち受けます。

 

以上で実装は終わりです。Consumer1とConsumer2を起動して、Producerを実行すると、Consumer1側のconsoleには、

Hello1

と表示され、Consumer2側のConsoleには、

Hello2

と表示されるはずです。

最後に

いかがでしたでしょうか?RabbitMQはオープンな仕様なので、色々な環境で使えるのが嬉しいです。この記事が皆さんのお役に立てれば幸いです。

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

2人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です