こんにちは、サイオステクノロジー技術部 武井です。今回は、.NETでRabbitMQでアクセスしたいと思います。
準備
RabbitMQは.NETクライアントライブラリを提供しています。これを使えばC#やPowerShellからでもRabbitMQにアクセスできます。詳細は以下のページに記載しております。
https://www.rabbitmq.com/dotnet-api-guide.html
今回は、PowerShellで動かしてみましょう。簡単なProducerとConsumerを作ってみます。まずは、.NETのライブラリをゲットします。以下のページで配布されています。
https://www.nuget.org/packages/RabbitMQ.Client
.NETのパッケージ管理システムであるNuGetが必要です。基本的にNuGetはVisualStudioありきなのですが、今回はPowerShellで動かしたいので、dllだけほしいです。そんなときのためにNuGetを単独で動かす方法があります。
まず、以下からNuGetをゲットします。
上記のファイルを適当なところにおいて実行すればOKなのですが、バージョンが古いためエラーになります。以下のコマンドで最新のバージョンにアップグレードして下さい。
PS C:\ > .\nuget.exe update -self
これで準備が整いましたので、現時点の最新のバージョンをダウンロードします。
PS C:\ > .\nuget.exe install RabbitMQ.Client -Version 5.0.1
上記のコマンドを実行したフォルダの配下に以下のdllがダウンロードされています。
RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll
実装
Producer、Consumerを実装します。
まず、Producer側の実装です。
[System.Reflection.Assembly]::LoadFrom("C:\path\to\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll") # (1)
$factory = New-Object RabbitMQ.Client.ConnectionFactory # (2)
$factory.UserName = "guest" # (3)
$factory.Password = "guest" # (4)
$factory.VirtualHost = "/" # (5)
$factory.HostName = "localhost" # (6)
$conn = $factory.CreateConnection() # (7)
$channel = $conn.CreateModel() # (8)
$channel.QueueDeclare("test",$true,$false,$false,$null); # (9)
$messageBodyBytes = [System.Text.Encoding]::UTF8.GetBytes("Hello") # (10)
$channel.BasicPublish("","test",$true,$null,$messageBodyBytes) # (11)
$channel.close # (12)
$conn.close # (13)
(1)は、.NETライブラリのアセンブリの読み込みを行っています。先のDLLをダウンロードしたパスを指定して下さい。
(2)は、RabbitMQ Serverへのコネクションを生成するためのFactoryを生成しています。
(3)は、RabbitMQ Serverに接続するためのユーザー名を指定しています。
(4)は、RabbitMQ Serverに接続するためのパスワードを指定しています。
(5)は、RabbitMQ Serverにのホスト名を指定してます。
(6)は、RabbitMQ Serverに接続するためのユーザー名を指定しています。
(7)は、RabbitMQ Serverに接続しています。
(8)は、RabbitMQ Serverへのチャンネルを生成しています。
(9)は、キューを作成しています。
(10)は、RabbitMQ Serverへ配信するメッセージの内容をバイト配列に変換しています。
(11)は、実際にキューにメッセージを配信する処理です。引数は以下のとおりです。
第1引数:Exchange名
第2引数:Routing Key名
第3引数:メッセージをキューにルーティングできない時にエラーになるフラグ
第4引数:各種設定を格納した変数
第5引数:メッセージをバイト配列に変換したもの
(12)は、チャンネルを閉じています。
(13)は、コネクションを閉じています。
次に、Consumer側の実装です。
[System.Reflection.Assembly]::LoadFrom("C:\path\to\Downloads\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll")
$factory = New-Object RabbitMQ.Client.ConnectionFactory
$factory.UserName = "guest"
$factory.Password = "guest"
$factory.VirtualHost = "/"
$factory.HostName = "localhost"
$conn = $factory.CreateConnection()
$channel = $conn.CreateModel()
$channel.QueueDeclare("test",$true,$false,$false,$null)
$Consumer = New-Object RabbitMQ.Client.QueueingBasicConsumer($channel) # (1)
$channel.BasicConsume("test",$true,"",$false,$true,$null,$Consumer) # (2)
$Delivery = New-Object RabbitMQ.Client.Events.BasicDeliverEventArgs # (3)
while($true) { # (4)
if($Consumer.Queue.Dequeue(10000, [ref]$Delivery)) { # (5)
$Payload = [Text.Encoding]::UTF8.GetString($Delivery.Body) # (6)
Write-Output $Payload
}
}
$channel.close
$conn.close
以下、ソースコードの説明になります。Producer側と共通しているところは省略します。
(1)は、Consumerオブジェクトを生成しています。
(2)は、キューを待ち受けするための設定です。
第1引数:待ち受けするキュー名
第2引数:trueにしておくと自動でAckしてキューから削除します。
第3引数:Consumerにつけるタグです。空にすると、自動で命名されます。
第4引数:クラスタ環境において、ローカルに配信するかどうかを決めるフラグです。trueでとローカルに配信しません。
第5引数:キューへの排他アクセスを定義します。Trueだと、あるコネクションがキューにアクセスしているときに他のコネクションからアクセスするとエラーになります。
第6引数:Consumerオブジェクト
(3)は、キューに配信されたメッセージの中身になります。
(4)は、whileループでキューにメッセージが配信されるまで待機します。
(5)は、キューからメッセージを取り出しています。
(6)は、メッセージからペイロード(メッセージのボディ)を取り出しています。
Consumerを実行した後で、Producerを動かしてみます。
PS C:\ > powershell.exe .\consumer.ps1 GAC Version Location --- ------- -------- False v4.0.30319 C:\path\to\RabbitMQ.Client.5.0.1\lib\net451\RabbitMQ.Client.dll ConsumerCount : 0 MessageCount : 0 QueueName : test amq.ctag-uaYqGu_D8o21nMQW-WD9tA Hello
おお(`・ω・´)シャキーン
できました!!

