.NETでRabbitMQ

こんにちは、サイオステクノロジー技術部 武井です。今回は、.NETでRabbitMQでアクセスしたいと思います。

準備

RabbitMQは.NETクライアントライブラリを提供しています。これを使えばC#やPowerShellからでもRabbitMQにアクセスできます。詳細は以下のページに記載しております。

https://www.rabbitmq.com/dotnet-api-guide.html

今回は、PowerShellで動かしてみましょう。簡単なProducerとConsumerを作ってみます。まずは、.NETのライブラリをゲットします。以下のページで配布されています。

https://www.nuget.org/packages/RabbitMQ.Client

.NETのパッケージ管理システムであるNuGetが必要です。基本的にNuGetはVisualStudioありきなのですが、今回はPowerShellで動かしたいので、dllだけほしいです。そんなときのためにNuGetを単独で動かす方法があります。

まず、以下からNuGetをゲットします。

http://nuget.org/nuget.exe

上記のファイルを適当なところにおいて実行すればOKなのですが、バージョンが古いためエラーになります。以下のコマンドで最新のバージョンにアップグレードして下さい。

PS C:\ > .\nuget.exe update -self

これで準備が整いましたので、現時点の最新のバージョンをダウンロードします。

PS C:\ > .\nuget.exe install RabbitMQ.Client -Version 5.0.1

上記のコマンドを実行したフォルダの配下に以下のdllがダウンロードされています。
RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll

実装

Producer、Consumerを実装します。

まず、Producer側の実装です。

[System.Reflection.Assembly]::LoadFrom("C:\path\to\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll") # (1)

$factory = New-Object RabbitMQ.Client.ConnectionFactory # (2)

$factory.UserName = "guest" # (3)
$factory.Password = "guest" # (4)
$factory.VirtualHost = "/" # (5)
$factory.HostName = "localhost" # (6)

$conn = $factory.CreateConnection() # (7)

$channel = $conn.CreateModel() # (8)

$channel.QueueDeclare("test",$true,$false,$false,$null); # (9)

$messageBodyBytes = [System.Text.Encoding]::UTF8.GetBytes("Hello") # (10)

$channel.BasicPublish("","test",$true,$null,$messageBodyBytes) # (11)

$channel.close # (12)

$conn.close # (13)

(1)は、.NETライブラリのアセンブリの読み込みを行っています。先のDLLをダウンロードしたパスを指定して下さい。

(2)は、RabbitMQ Serverへのコネクションを生成するためのFactoryを生成しています。

(3)は、RabbitMQ Serverに接続するためのユーザー名を指定しています。

(4)は、RabbitMQ Serverに接続するためのパスワードを指定しています。

(5)は、RabbitMQ Serverにのホスト名を指定してます。

(6)は、RabbitMQ Serverに接続するためのユーザー名を指定しています。

(7)は、RabbitMQ Serverに接続しています。

(8)は、RabbitMQ Serverへのチャンネルを生成しています。

(9)は、キューを作成しています。

(10)は、RabbitMQ Serverへ配信するメッセージの内容をバイト配列に変換しています。

(11)は、実際にキューにメッセージを配信する処理です。引数は以下のとおりです。
第1引数:Exchange名
第2引数:Routing Key名
第3引数:メッセージをキューにルーティングできない時にエラーになるフラグ
第4引数:各種設定を格納した変数
第5引数:メッセージをバイト配列に変換したもの

(12)は、チャンネルを閉じています。

(13)は、コネクションを閉じています。

次に、Consumer側の実装です。

[System.Reflection.Assembly]::LoadFrom("C:\path\to\Downloads\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll")

$factory = New-Object RabbitMQ.Client.ConnectionFactory

$factory.UserName = "guest"
$factory.Password = "guest"
$factory.VirtualHost = "/"
$factory.HostName = "localhost"

$conn = $factory.CreateConnection()

$channel = $conn.CreateModel()

$channel.QueueDeclare("test",$true,$false,$false,$null)

$Consumer = New-Object RabbitMQ.Client.QueueingBasicConsumer($channel) # (1)
$channel.BasicConsume("test",$true,"",$false,$true,$null,$Consumer) # (2)

$Delivery = New-Object RabbitMQ.Client.Events.BasicDeliverEventArgs # (3)

while($true) { # (4)
  if($Consumer.Queue.Dequeue(10000, [ref]$Delivery)) { # (5)
    $Payload = [Text.Encoding]::UTF8.GetString($Delivery.Body) # (6)
    Write-Output $Payload
  }
}

$channel.close

$conn.close

以下、ソースコードの説明になります。Producer側と共通しているところは省略します。

(1)は、Consumerオブジェクトを生成しています。

(2)は、キューを待ち受けするための設定です。
第1引数:待ち受けするキュー名
第2引数:trueにしておくと自動でAckしてキューから削除します。
第3引数:Consumerにつけるタグです。空にすると、自動で命名されます。
第4引数:クラスタ環境において、ローカルに配信するかどうかを決めるフラグです。trueでとローカルに配信しません。
第5引数:キューへの排他アクセスを定義します。Trueだと、あるコネクションがキューにアクセスしているときに他のコネクションからアクセスするとエラーになります。
第6引数:Consumerオブジェクト

(3)は、キューに配信されたメッセージの中身になります。

(4)は、whileループでキューにメッセージが配信されるまで待機します。

(5)は、キューからメッセージを取り出しています。

(6)は、メッセージからペイロード(メッセージのボディ)を取り出しています。

Consumerを実行した後で、Producerを動かしてみます。

PS C:\ > powershell.exe .\consumer.ps1

GAC    Version        Location
---    -------        --------
False  v4.0.30319     C:\path\to\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll

ConsumerCount : 0
MessageCount  : 0
QueueName     : test

amq.ctag-uaYqGu_D8o21nMQW-WD9tA
Hello

おお(`・ω・´)シャキーン

できました!!

Be the first to comment

コメント投稿

Your email address will not be published.


*